From 79cf9be1c79ef25a3c30f2da99847358e5b972f8 Mon Sep 17 00:00:00 2001 From: ahmedk <karim.ahmed@xfel.eu> Date: Tue, 10 Oct 2023 10:39:15 +0200 Subject: [PATCH] Change AgipdCtrlsRuns arguments to raw_folder and runs for usability --- .../Characterize_AGIPD_Gain_Darks_NBC.ipynb | 14 +-- src/cal_tools/agipdlib.py | 34 +++--- tests/test_agipdlib.py | 114 ++++++++---------- 3 files changed, 69 insertions(+), 93 deletions(-) diff --git a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb index cdf00a228..9b770097d 100644 --- a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb +++ b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb @@ -262,18 +262,8 @@ "# TODO: what if first module is not available. Maybe only channel 2 available\n", "instrument_src_mod = instrument_src.format(modules[0])\n", "\n", - "run_ctrls = []\n", - "for run, run_dict in runs_dict.items():\n", - " run_ctrls.append(\n", - " AgipdCtrl(\n", - " run_dc=run_dict[\"dc\"],\n", - " image_src=instrument_src_mod,\n", - " ctrl_src=ctrl_src,\n", - " run=run_dict[\"number\"],\n", - " )\n", - " )\n", - "\n", - "agipd_ctrl_dark = AgipdCtrlRuns(run_ctrls)\n", + "agipd_ctrl_dark = AgipdCtrlRuns(\n", + " raw_folder=in_folder, runs=list(runs_dict.keys))\n", "if mem_cells == 0:\n", " mem_cells = agipd_ctrl_dark.get_memory_cells()\n", "\n", diff --git a/src/cal_tools/agipdlib.py b/src/cal_tools/agipdlib.py index d8b78d524..03ea731d8 100644 --- a/src/cal_tools/agipdlib.py +++ b/src/cal_tools/agipdlib.py @@ -45,14 +45,7 @@ class AgipdCtrl: run_dc: DataCollection image_src: str ctrl_src: str - karabo_id_control: str = None raise_error: bool = False - run: int = None - - def __post_init__(self): - if self.run is None: - # TODO: check for old runs with Version < 1.0 - self.run = self.run_dc.run_metadata()["runNumber"] def _get_num_cells_ctrl(self) -> Optional[int]: """Get number of cells from CONTROL source.""" @@ -227,7 +220,7 @@ class AgipdCtrl: def get_bias_voltage( self, - karabo_id_control: Optional[str] = None, + karabo_id_control: str, module: Optional[int] = 0 ) -> int: """Read the voltage information from the RUN source of module 0. @@ -244,12 +237,6 @@ class AgipdCtrl: :param module: defaults to module 0 :return: bias voltage """ - if karabo_id_control is None: - karabo_id_control = self.karabo_id_control - if not karabo_id_control: - raise ValueError( - "`karabo_id_control` value is not given. " - "Please pass the Karabo device ID for the control device.") # TODO: Add a breaking fix by passing the source and key through # get_bias_voltage arguments. if "AGIPD1M" in karabo_id_control: @@ -305,10 +292,13 @@ class AgipdCtrl: return 12 - +from extra_data import RunDirectory @dataclass class AgipdCtrlRuns: - run_ctrls: List[AgipdCtrl] + raw_folder: str + runs: List[int] + image_src: str + ctrl_src: str adaptive_gain_modes: List[AgipdGainMode] = field( default_factory=lambda: [AgipdGainMode.ADAPTIVE_GAIN] * 3) fixed_gain_modes: List[AgipdGainMode] = field( @@ -319,7 +309,13 @@ class AgipdCtrlRuns: ]) def __post_init__(self): - self.runs = [c.run for c in self.run_ctrls] + # validate that all runs belong to the same + self.run_ctrls = [ + AgipdCtrl( + run_dc=RunDirectory(f"{self.raw_folder}/r{r:04d}"), + image_src=self.image_src, + ctrl_src=self.ctrl_src, + ) for r in self.runs] self.gain_modes = self.get_gain_modes() def _validate_same_value(self, name, values): @@ -384,10 +380,10 @@ class AgipdCtrlRuns: self._validate_same_value("Integration Time", integration_times) return integration_times[0] - def get_bias_voltage(self, karabo_id_control: Optional[str] = None): + def get_bias_voltage(self, karabo_id_control: str = None): """ Args: - karabo_id_control (Optional[str], optional): + karabo_id_control (str): Karabo ID for control device. Returns: diff --git a/tests/test_agipdlib.py b/tests/test_agipdlib.py index 7a2f12f3c..886ab41e6 100644 --- a/tests/test_agipdlib.py +++ b/tests/test_agipdlib.py @@ -209,112 +209,102 @@ Runs used: FIXED: /gpfs/exfel/exp/CALLAB/202130/p900203/raw/[9011,9012,9013] ADAPTIVE: /gpfs/exfel/exp/CALLAB/202130/p900203/raw/[9015,9016,9017] """ -def _initialize_agipd_ctrls( - runs, - raw_folder="/gpfs/exfel/exp/CALLAB/202130/p900203/raw/" -): - return [AgipdCtrl( - run_dc=RunDirectory(f"{raw_folder}/r{r:04d}"), - ctrl_src=CTRL_SRC, - image_src=SPB_AGIPD_INST_SRC, - karabo_id_control=SPB_AGIPD_KARABO_CTRL_ID, - ) for r in runs] - +TEST_RAW_FOLDER = "/gpfs/exfel/exp/CALLAB/202130/p900203/raw/" +SPB_FIXED_RUNS = [9011, 9012, 9013] +SPB_ADAPTIVE_RUNS = [9015, 9016, 9017] +FIXED_CTRL_RUNS = AgipdCtrlRuns( + raw_folder=TEST_RAW_FOLDER, + runs=SPB_FIXED_RUNS, + image_src=SPB_AGIPD_INST_SRC, + ctrl_src=CTRL_SRC, +) +ADAPTIVE_CTRL_RUNS = AgipdCtrlRuns( + raw_folder=TEST_RAW_FOLDER, + runs=SPB_ADAPTIVE_RUNS, + image_src=SPB_AGIPD_INST_SRC, + ctrl_src=CTRL_SRC, +) @pytest.mark.requires_gpfs def test_get_memory_cells_runs(): - fixed_ctrls = _initialize_agipd_ctrls([9011, 9012, 9013]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=fixed_ctrls) - assert 352 == ctrl_runs.get_memory_cells() + assert 352 == FIXED_CTRL_RUNS.get_memory_cells() - adaptive_ctrls = _initialize_agipd_ctrls([9015, 9016, 9017]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=adaptive_ctrls) - assert 352 == ctrl_runs.get_memory_cells() + assert 352 == ADAPTIVE_CTRL_RUNS.get_memory_cells() @pytest.mark.requires_gpfs def test_get_bias_voltage_runs(): - fixed_ctrls = _initialize_agipd_ctrls([9011, 9012, 9013]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=fixed_ctrls) - assert 300 == ctrl_runs.get_bias_voltage() + assert 300 == FIXED_CTRL_RUNS.get_bias_voltage(SPB_AGIPD_KARABO_CTRL_ID) - adaptive_ctrls = _initialize_agipd_ctrls([9015, 9016, 9017]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=adaptive_ctrls) - assert 300 == ctrl_runs.get_bias_voltage() + assert 300 == ADAPTIVE_CTRL_RUNS.get_bias_voltage(SPB_AGIPD_KARABO_CTRL_ID) @pytest.mark.requires_gpfs def test_get_integration_time_runs(): - fixed_ctrls = _initialize_agipd_ctrls([9011, 9012, 9013]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=fixed_ctrls) - assert 12 == ctrl_runs.get_integration_time() + assert 12 == FIXED_CTRL_RUNS.get_integration_time() - adaptive_ctrls = _initialize_agipd_ctrls([9015, 9016, 9017]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=adaptive_ctrls) - assert 20 == ctrl_runs.get_integration_time() + assert 20 == ADAPTIVE_CTRL_RUNS.get_integration_time() @pytest.mark.requires_gpfs def test_get_acquisition_rate_runs(): - fixed_ctrls = _initialize_agipd_ctrls([9011, 9012, 9013]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=fixed_ctrls) - assert 1.1 == ctrl_runs.get_acq_rate() + assert 1.1 == FIXED_CTRL_RUNS.get_acq_rate() - adaptive_ctrls = _initialize_agipd_ctrls([9015, 9016, 9017]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=adaptive_ctrls) - assert 1.1 == ctrl_runs.get_acq_rate() + assert 1.1 == ADAPTIVE_CTRL_RUNS.get_acq_rate() @pytest.mark.requires_gpfs def test_get_gain_setting_runs(): - fixed_ctrls = _initialize_agipd_ctrls([9011, 9012, 9013]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=fixed_ctrls) - assert 0 == ctrl_runs.get_gain_setting() + assert 0 == FIXED_CTRL_RUNS.get_gain_setting() - adaptive_ctrls = _initialize_agipd_ctrls([9015, 9016, 9017]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=adaptive_ctrls) - assert 0 == ctrl_runs.get_gain_setting() + assert 0 == ADAPTIVE_CTRL_RUNS.get_gain_setting() @pytest.mark.requires_gpfs def test_get_gain_mode_runs(): - fixed_ctrls = _initialize_agipd_ctrls([9011, 9012, 9013]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=fixed_ctrls) assert [ AgipdGainMode.FIXED_HIGH_GAIN, AgipdGainMode.FIXED_MEDIUM_GAIN, AgipdGainMode.FIXED_LOW_GAIN - ] == ctrl_runs.get_gain_modes() + ] == FIXED_CTRL_RUNS.get_gain_modes() - adaptive_ctrls = _initialize_agipd_ctrls([9015, 9016, 9017]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=adaptive_ctrls) - assert [AgipdGainMode.ADAPTIVE_GAIN]*3 == ctrl_runs.get_gain_modes() + assert [ + AgipdGainMode.ADAPTIVE_GAIN]*3 == ADAPTIVE_CTRL_RUNS.get_gain_modes() @pytest.mark.requires_gpfs def test_fixed_gain_mode(): - fixed_ctrls = _initialize_agipd_ctrls([9011, 9012, 9013]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=fixed_ctrls) - assert True == ctrl_runs.fixed_gain_mode() + assert True == FIXED_CTRL_RUNS.fixed_gain_mode() - adaptive_ctrls = _initialize_agipd_ctrls([9015, 9016, 9017]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=adaptive_ctrls) - assert False == ctrl_runs.fixed_gain_mode() + assert False == ADAPTIVE_CTRL_RUNS.fixed_gain_mode() @pytest.mark.requires_gpfs def test_raise_fixed_gain_mode(): - adaptive_fixed = _initialize_agipd_ctrls([9011, 9016, 9017]) + adaptive_fixed_ctrls = AgipdCtrlRuns( + raw_folder=TEST_RAW_FOLDER, + runs=[9011, 9016, 9017], + image_src=SPB_AGIPD_INST_SRC, + ctrl_src=CTRL_SRC, + ) with pytest.raises(ValueError) as e: - ctrl_runs = AgipdCtrlRuns(run_ctrls=adaptive_fixed) - ctrl_runs.fixed_gain_mode() + adaptive_fixed_ctrls.fixed_gain_mode() @pytest.mark.requires_gpfs def test_raise_validate_gain_modes(): - adaptive_fixed = _initialize_agipd_ctrls([9011, 9016, 9017]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=adaptive_fixed) + adaptive_fixed_ctrls = AgipdCtrlRuns( + raw_folder=TEST_RAW_FOLDER, + runs=[9011, 9016, 9017], + image_src=SPB_AGIPD_INST_SRC, + ctrl_src=CTRL_SRC, + ) with pytest.raises(ValueError) as e: - ctrl_runs.validate_gain_modes() - unsorted_fixed_gais = _initialize_agipd_ctrls([9013, 9011, 9012]) - ctrl_runs = AgipdCtrlRuns(run_ctrls=unsorted_fixed_gais) + adaptive_fixed_ctrls.validate_gain_modes() + + unsorted_fixed_gain_ctrls = AgipdCtrlRuns( + raw_folder=TEST_RAW_FOLDER, + runs=[9013, 9011, 9012], + image_src=SPB_AGIPD_INST_SRC, + ctrl_src=CTRL_SRC, + ) with pytest.raises(ValueError) as e: - ctrl_runs.validate_gain_modes() + unsorted_fixed_gain_ctrls.validate_gain_modes() -- GitLab