Skip to content
Snippets Groups Projects

Draft: [EPIX100][CORRECT] Use DataFile to write corrected data

Merged Karim Ahmed requested to merge fix/epix_write_aligned_corrected_data into master
1 file
+ 94
60
Compare changes
  • Side-by-side
  • Inline
@@ -41,9 +41,7 @@
"\n",
"# Parameters affecting writing corrected data.\n",
"chunk_size_idim = 1 # H5 chunking size of output data\n",
"\n",
"# Only for testing\n",
"limit_images = 0 # ONLY FOR TESTING. process only first N images, 0 - process all.\n",
"limit_trains = 0 # Process only first N images, 0 - process all.\n",
"\n",
"# Parameters for the calibration database.\n",
"cal_db_interface = \"tcp://max-exfl016:8015#8025\" # calibration DB interface to use\n",
@@ -85,6 +83,8 @@
"source": [
"import tabulate\n",
"import warnings\n",
"from logging import warning\n",
"from sys import exit\n",
"\n",
"import h5py\n",
"import pasha as psh\n",
@@ -96,8 +96,8 @@
"\n",
"from XFELDetAna import xfelpyanatools as xana\n",
"from XFELDetAna import xfelpycaltools as xcal\n",
"from cal_tools import h5_copy_except\n",
"from cal_tools.epix100 import epix100lib\n",
"from cal_tools.files import DataFile\n",
"from cal_tools.tools import (\n",
" calcat_creation_time,\n",
" get_dir_creation_date,\n",
@@ -217,7 +217,7 @@
"# Read control data.\n",
"ctrl_data = epix100lib.epix100Ctrl(\n",
" run_dc=run_dc,\n",
" instrument_src=f\"{karabo_id}/DET/{receiver_template}:daqOutput\",\n",
" instrument_src=instrument_src,\n",
" ctrl_src=f\"{karabo_id}/DET/CONTROL\",\n",
" )\n",
"\n",
@@ -534,7 +534,7 @@
"source": [
"def correct_train(wid, index, tid, d):\n",
"\n",
" d = d[pixel_data[0]][pixel_data[1]][..., np.newaxis].astype(np.float32)\n",
" d = d[..., np.newaxis].astype(np.float32)\n",
" d = np.compress(\n",
" np.any(d > 0, axis=(0, 1)), d, axis=2)\n",
" \n",
@@ -614,8 +614,6 @@
"metadata": {},
"outputs": [],
"source": [
"pixel_data = (instrument_src, \"data.image.pixels\")\n",
"\n",
"# 10 is a number chosen after testing 1 ... 71 parallel threads\n",
"context = psh.context.ThreadContext(num_workers=10)"
]
@@ -628,77 +626,113 @@
},
"outputs": [],
"source": [
"empty_seq = 0\n",
"\n",
"for f in seq_files:\n",
"\n",
" seq_dc = H5File(f)\n",
"\n",
" n_imgs = seq_dc.get_data_counts(*pixel_data).shape[0]\n",
" # Save corrected data in an output file with name\n",
" # of corresponding raw sequence file.\n",
" out_file = out_folder / f.name.replace(\"RAW\", \"CORR\")\n",
"\n",
" # Data shape in seq_dc excluding trains with empty images. \n",
" dshape = seq_dc[pixel_data].shape\n",
" dataset_chunk = ((chunk_size_idim,) + dshape[1:]) # e.g. (1, pixels_x, pixels_y) \n",
"\n",
" if n_imgs - dshape[0] != 0:\n",
" print(f\"- WARNING: {f} has {n_imgs - dshape[0]} trains with empty data.\")\n",
" ishape = seq_dc[instrument_src, \"data.image.pixels\"].shape\n",
" corr_ntrains = ishape[0]\n",
" all_train_ids = seq_dc.train_ids\n",
"\n",
" # Raise a WARNING if this sequence has no trains to correct.\n",
" # Otherwise, print number of trains with no data.\n",
" if corr_ntrains == 0:\n",
" warning(f\"No trains to correct for {f.name}: \"\n",
" \"Skipping the processing of this file.\")\n",
" empty_seq += 1\n",
" continue\n",
" elif len(all_train_ids) != corr_ntrains:\n",
" print(f\"{f.name} has {len(all_train_ids) - corr_ntrains} trains with missing data.\")\n",
"\n",
" # This parameter is only used for testing.\n",
" if limit_images > 0:\n",
" n_imgs = min(n_imgs, limit_images)\n",
" if limit_trains > 0:\n",
" print(f\"\\nCorrected trains are limited to: {limit_trains} trains\")\n",
" corr_ntrains = min(corr_ntrains, limit_trains)\n",
" oshape = (corr_ntrains, *ishape[1:])\n",
"\n",
" data = context.alloc(shape=dshape, dtype=np.float32)\n",
" data = context.alloc(shape=oshape, dtype=np.float32)\n",
"\n",
" if pattern_classification:\n",
" data_clu = context.alloc(shape=dshape, dtype=np.float32)\n",
" data_patterns = context.alloc(shape=dshape, dtype=np.int32)\n",
" data_clu = context.alloc(shape=oshape, dtype=np.float32)\n",
" data_patterns = context.alloc(shape=oshape, dtype=np.int32)\n",
"\n",
" step_timer.start()\n",
" step_timer.start() # Correct data. \n",
"\n",
" context.map(\n",
" correct_train, seq_dc.select(\n",
" *pixel_data, require_all=True).select_trains(np.s_[:n_imgs])\n",
" )\n",
" step_timer.done_step(f'Correcting {n_imgs} trains.')\n",
" # Overwrite seq_dc after eliminating empty trains or/and applying limited images.\n",
" seq_dc = seq_dc.select(\n",
" instrument_src, \"*\", require_all=True).select_trains(np.s_[:corr_ntrains])\n",
"\n",
" # Store detector h5 information in the corrected file\n",
" # and deselect data to correct and store later.\n",
" step_timer.start()\n",
" pixel_data = seq_dc[instrument_src, \"data.image.pixels\"]\n",
" context.map(correct_train, pixel_data)\n",
"\n",
" out_file = out_folder / f.name.replace(\"RAW\", \"CORR\")\n",
" data_path = \"INSTRUMENT/\"+instrument_src+\"/data/image\"\n",
" pixels_path = f\"{data_path}/pixels\"\n",
" \n",
" # First copy all raw data source to the corrected file,\n",
" # while excluding the raw data image /data/image/pixels.\n",
" with h5py.File(out_file, 'w') as ofile:\n",
" # Copy RAW non-calibrated sources.\n",
" with h5py.File(f, 'r') as sfile:\n",
" h5_copy_except.h5_copy_except_paths(\n",
" sfile, ofile,\n",
" [pixels_path])\n",
"\n",
" # Create dataset in CORR h5 file and add corrected images.\n",
" dataset = ofile.create_dataset(\n",
" pixels_path,\n",
" data=data,\n",
" chunks=dataset_chunk,\n",
" dtype=np.float32)\n",
" step_timer.done_step(f'Correcting {corr_ntrains} trains.')\n",
"\n",
"\n",
" step_timer.start() # Write corrected data.\n",
"\n",
" # Create CORR files and add corrected data sections.\n",
" image_counts = seq_dc[instrument_src, \"data.image.pixels\"].data_counts(labelled=False)\n",
"\n",
" # Write corrected data.\n",
" with DataFile(out_file, 'w') as ofile:\n",
" dataset_chunk = ((chunk_size_idim,) + oshape[1:]) # e.g. (1, pixels_x, pixels_y) \n",
"\n",
" # Create INDEX datasets.\n",
" ofile.create_index(seq_dc.train_ids, from_file=seq_dc.files[0])\n",
" # Create Instrument section to later add corrected datasets.\n",
" outp_source = ofile.create_instrument_source(instrument_src)\n",
"\n",
" # Create count/first datasets at INDEX source.\n",
" outp_source.create_index(data=image_counts)\n",
"\n",
" # Store uncorrected RAW image datasets for the corrected trains.\n",
"\n",
" data_raw_fields = [ # /data/\n",
" 'ambTemp', 'analogCurr', 'analogInputVolt', 'backTemp',\n",
" 'digitalInputVolt', 'guardCurr', 'relHumidity',\n",
" ]\n",
" for field in data_raw_fields:\n",
" field_arr = seq_dc[instrument_src, f\"data.{field}\"].ndarray()\n",
"\n",
" outp_source.create_key(\n",
" f\"data.{field}\", data=field_arr,\n",
" chunks=(chunk_size_idim, *field_arr.shape[1:]))\n",
"\n",
" image_raw_fields = [ # /data/image/\n",
" 'binning', 'bitsPerPixel', 'dimTypes', 'dims',\n",
" 'encoding', 'flipX', 'flipY', 'roiOffsets', 'rotation',\n",
" ]\n",
" for field in image_raw_fields:\n",
" field_arr = seq_dc[instrument_src, f\"data.image.{field}\"].ndarray()\n",
"\n",
" outp_source.create_key(\n",
" f\"data.image.{field}\", data=field_arr,\n",
" chunks=(chunk_size_idim, *field_arr.shape[1:]))\n",
"\n",
" # Add main corrected `data.image.pixels` dataset and store corrected data.\n",
" outp_source.create_key(\n",
" \"data.image.pixels\", data=data, chunks=dataset_chunk)\n",
"\n",
" if pattern_classification:\n",
" # Save /data/image/pixels_classified in corrected file.\n",
" datasetc = ofile.create_dataset(\n",
" f\"{data_path}/pixels_classified\",\n",
" data=data_clu,\n",
" chunks=dataset_chunk,\n",
" dtype=np.float32)\n",
"\n",
" # Save /data/image/patterns in corrected file.\n",
" datasetp = ofile.create_dataset(\n",
" f\"{data_path}/patterns\",\n",
" data=data_patterns,\n",
" chunks=dataset_chunk,\n",
" dtype=np.int32)\n",
"\n",
" step_timer.done_step('Storing data.')"
" # Add main corrected `data.image.pixels` dataset and store corrected data.\n",
" outp_source.create_key(\n",
" \"data.image.pixels_classified\", data=data_clu, chunks=dataset_chunk)\n",
" outp_source.create_key(\n",
" \"data.image.patterns\", data=data_clu, chunks=dataset_chunk)\n",
"\n",
" # Create METDATA datasets\n",
" ofile.create_metadata(like=seq_dc)\n",
"\n",
" step_timer.done_step('Storing data.')\n",
"if empty_seq == len(seq_files):\n",
" warning(\"No valid trains for RAW data to correct.\")\n",
" exit(0)"
]
},
{
Loading