diff --git a/notebooks/LPD/LPD_Correct_Fast.ipynb b/notebooks/LPD/LPD_Correct_Fast.ipynb index a2b4dcede623486b75b0589a291593eedbccbaf9..c8024a4211708a0c0001afeea955f0296ed4a6de 100644 --- a/notebooks/LPD/LPD_Correct_Fast.ipynb +++ b/notebooks/LPD/LPD_Correct_Fast.ipynb @@ -29,6 +29,7 @@ "karabo_da = [''] # Data aggregators names to correct, use [''] for all\n", "run = 10 # runs to process, required\n", "\n", + "# Source parameters\n", "karabo_id = \"FXE_DET_LPD1M-1\" # karabo karabo_id\n", "input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source.\n", "output_source = '' # Output fast data source, empty to use same as input\n", @@ -44,7 +45,7 @@ "photon_energy = 9.2 # Photon energy in kEv.\n", "category = 0 # Whom to blame.\n", "\n", - "# Correction parameters.\n", + "# Correction parameters\n", "offset_corr = True # Offset correction.\n", "rel_gain = True # Gain correction based on RelativeGain constant.\n", "ff_map = True # Gain correction based on FFMap constant.\n", @@ -53,7 +54,10 @@ "# Output options\n", "overwrite = True # set to True if existing data should be overwritten\n", "\n", - "sequences_per_node = 1 # sequence files to process per node\n", + "# Parallelization options\n", + "sequences_per_node = 1 # Sequence files to process per node\n", + "num_workers = 8 # Worker processes per node, 8 is safe on 768G nodes but won't work on 512G.\n", + "num_threads_per_worker = 32 # Number of threads per worker.\n", "\n", "def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da):\n", " from xfel_calibrate.calibrate import balance_sequences as bs\n", @@ -94,10 +98,8 @@ "\n", "from extra_data.components import LPD1M\n", "\n", - "from cal_tools.enums import BadPixels\n", "from cal_tools.lpdalgs import correct_lpd_frames\n", "from cal_tools.tools import CalibrationMetadata, get_dir_creation_date, write_compressed_frames\n", - "from cal_tools.h5_copy_except import h5_copy_except_paths\n", "from cal_tools.files import DataFile\n", "from cal_tools.restful_config import restful_config" ] @@ -256,9 +258,9 @@ " (illuminated_calibrations, illuminated_condition)\n", "]:\n", " resp = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions(\n", - " client, 'FXE_DET_LPD1M-1', list(calibrations.keys()),\n", + " client, karabo_id, list(calibrations.keys()),\n", " {'parameters_conditions_attributes': condition},\n", - " karabo_da='', event_at=None, snapshot_at=None)\n", + " karabo_da='', event_at=creation_time.isoformat(), snapshot_at=None)\n", "\n", " if not resp['success']:\n", " raise RuntimeError(resp)\n", @@ -435,17 +437,10 @@ " out_mask = None\n", " gc.collect()\n", "\n", - "# 8/18, 479s\n", - "# 10/18, 464s\n", - " \n", - "num_workers = 5\n", - "num_threads_per_worker = 24\n", "print('worker\\tDA\\topen\\tread\\tcorrect\\twrite\\ttotal\\tframes\\trate')\n", - " \n", "start = perf_counter()\n", "psh.ProcessContext(num_workers=num_workers).map(correct_file, data_to_process)\n", "total_time = perf_counter() - start\n", - "\n", "print(f'Total time: {total_time:.1f}s')" ] },