diff --git a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb index 3af27abfdb61c295d54b37e8adeb138f506fb95f..53d2bea7d08c177e668852490a03a3dcecc8c1be 100644 --- a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb +++ b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb @@ -249,73 +249,6 @@ "## Read and validate the runs control data." ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def read_run_conditions(runs_dict: dict):\n", - " agipd_cond = AgipdCtrl(\n", - " run_dc=runs_dict[\"dc\"],\n", - " image_src=instrument_src_mod,\n", - " ctrl_src=ctrl_src,\n", - " )\n", - " cond_dict[\"runs\"].append(runs_dict[\"number\"])\n", - " if acq_rate == 0:\n", - " cond_dict[\"acq_rate\"].append(agipd_cond.get_acq_rate())\n", - " if mem_cells == 0:\n", - " cond_dict[\"mem_cells\"].append(agipd_cond.get_num_cells())\n", - " if gain_setting == -1: \n", - " cond_dict[\"gain_setting\"].append(\n", - " agipd_cond.get_gain_setting(creation_time))\n", - " if bias_voltage == 0.:\n", - " cond_dict[\"bias_voltage\"].append(\n", - " agipd_cond.get_bias_voltage(karabo_id_control))\n", - " if integration_time == -1:\n", - " cond_dict[\"integration_time\"].append(\n", - " agipd_cond.get_integration_time())\n", - " if gain_mode == -1:\n", - " cond_dict[\"gain_mode\"].append(agipd_cond.get_gain_mode())\n", - " else:\n", - " cond_dict[\"gain_mode\"].append(AgipdGainMode(gain_mode))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def validate_gain_modes(gain_modes: List[AgipdGainMode]):\n", - " # Validate that gain modes are not a mix of adaptive and fixed gain.\n", - " if all(\n", - " gm == AgipdGainMode.ADAPTIVE_GAIN for gm in gain_modes\n", - " ):\n", - " fixed_gain_mode = False\n", - " # Some runs are adaptive by mistake.\n", - " elif any(\n", - " gm == AgipdGainMode.ADAPTIVE_GAIN for gm in gain_modes\n", - " ):\n", - " raise ValueError(\n", - " f\"ERROR: Given runs {run_numbers}\"\n", - " \" have a mix of ADAPTIVE and FIXED gain modes: \"\n", - " f\"{gain_modes}.\"\n", - " )\n", - " elif list(gain_modes) == [\n", - " AgipdGainMode.FIXED_HIGH_GAIN,\n", - " AgipdGainMode.FIXED_MEDIUM_GAIN,\n", - " AgipdGainMode.FIXED_LOW_GAIN\n", - " ]:\n", - " fixed_gain_mode = True\n", - " else:\n", - " raise ValueError(\n", - " \"ERROR: Wrong arrangment of given dark runs. \"\n", - " f\"Given runs' gain_modes are {gain_modes} for runs: {run_numbers}.\"\n", - " )\n", - " return fixed_gain_mode" - ] - }, { "cell_type": "code", "execution_count": null, @@ -329,39 +262,37 @@ "# TODO: what if first module is not available. Maybe only channel 2 available\n", "instrument_src_mod = instrument_src.format(modules[0])\n", "\n", - "cond_dict = dict()\n", - "fixed_gain_mode = None\n", - "\n", - "with multiprocessing.Manager() as manager:\n", - " cond_dict[\"runs\"] = manager.list()\n", - " cond_dict[\"acq_rate\"] = manager.list()\n", - " cond_dict[\"mem_cells\"] = manager.list()\n", - " cond_dict[\"gain_setting\"] = manager.list()\n", - " cond_dict[\"gain_mode\"] = manager.list()\n", - " cond_dict[\"bias_voltage\"] = manager.list()\n", - " cond_dict[\"integration_time\"] = manager.list()\n", - "\n", - " with multiprocessing.Pool(processes=len(modules)) as pool:\n", - " pool.starmap(read_run_conditions, zip(runs_dict.values()))\n", - "\n", - " for cond, vlist in cond_dict.items():\n", - " if cond == \"runs\":\n", - " continue\n", - " elif cond == \"gain_mode\":\n", - " fixed_gain_mode = validate_gain_modes(cond_dict[\"gain_mode\"])\n", - " elif not all(x == vlist[0] for x in vlist):\n", - " # TODO: raise ERROR??\n", - " print(\n", - " f\"WARNING: {cond} is not the same for the runs \"\n", - " f\"{cond_dict['runs']} with values\"\n", - " f\" of {cond_dict[cond]}, respectively.\"\n", - " )\n", - " if cond_dict[\"acq_rate\"]: acq_rate = cond_dict[\"acq_rate\"][0]\n", - " if cond_dict[\"mem_cells\"]: mem_cells = cond_dict[\"mem_cells\"][0]\n", - " if cond_dict[\"gain_setting\"]: gain_setting = cond_dict[\"gain_setting\"][0]\n", - " if cond_dict[\"gain_mode\"]: gain_mode = list(cond_dict[\"gain_mode\"])\n", - " if cond_dict[\"bias_voltage\"]: bias_voltage = cond_dict[\"bias_voltage\"][0]\n", - " if cond_dict[\"integration_time\"]: integration_time = cond_dict[\"integration_time\"][0]" + "run_ctrls = []\n", + "for run, run_dict in runs_dict.items():\n", + " run_ctrls.append(\n", + " AgipdCtrl(\n", + " run_dc=run_dict[\"dc\"],\n", + " image_src=instrument_src_mod,\n", + " ctrl_src=ctrl_src,\n", + " run=run_dict[\"number\"],\n", + " )\n", + " )\n", + "\n", + "agipd_ctrl_dark = AgipdCtrlDark(run_ctrls)\n", + "if mem_cells == 0:\n", + " mem_cells = agipd_ctrl_dark.get_memory_cells()\n", + "\n", + "if acq_rate == 0:\n", + " acq_rate = agipd_ctrl_dark.get_acq_rate()\n", + "\n", + "if bias_voltage == 0:\n", + " bias_voltage = agipd_ctrl_dark.get_bias_voltage(karabo_id_control)\n", + "\n", + "fixed_gain_mode = False\n", + "if gain_mode == -1:\n", + " gain_mode = agipd_ctrl_dark.gain_modes\n", + " fixed_gain_mode = agipd_ctrl_dark.fixed_gain_mode()\n", + "\n", + "if gain_setting == -1:\n", + " gain_setting = agipd_ctrl_dark.get_gain_setting()\n", + "\n", + "if integration_time == -1:\n", + " integration_time = agipd_ctrl_dark.get_integration_time()" ] }, { diff --git a/src/cal_tools/agipdlib.py b/src/cal_tools/agipdlib.py index 60e245ab4527bee12e8e890512fb508b4330565d..0c8a60bd6b8a24af17f28d5d568f09ca9c3fd2ba 100644 --- a/src/cal_tools/agipdlib.py +++ b/src/cal_tools/agipdlib.py @@ -1,6 +1,7 @@ import os import posixpath import zlib +from dataclasses import dataclass, field from datetime import datetime from multiprocessing import Manager from multiprocessing.pool import ThreadPool @@ -11,44 +12,47 @@ import numpy as np import sharedmem from dateutil import parser from extra_data import DataCollection, H5File, by_id, components +from logging import warning from cal_tools import agipdalgs as calgs from cal_tools.agipdutils import ( baseline_correct_via_noise, baseline_correct_via_stripe, + cast_array_inplace, correct_baseline_via_hist, correct_baseline_via_hist_asic, make_noisy_adc_mask, match_asic_borders, melt_snowy_pixels, - cast_array_inplace ) from cal_tools.enums import AgipdGainMode, BadPixels, SnowResolution from cal_tools.h5_copy_except import h5_copy_except_paths +@dataclass class AgipdCtrl: - def __init__( - self, - run_dc: DataCollection, - image_src: str, - ctrl_src: str, - raise_error: bool = True, - ): - """ Initialize AgipdCondition class to read - all required AGIPD parameters. - - :param run_dc: Run data collection with expected sources - to read needed parameters. - :param image_src: H5 source for image data. - :param ctrl_src: H5 source for control (slow) data. - :param raise_error: Boolean to raise errors for missing - sources and keys. - """ - self.run_dc = run_dc - self.image_src = image_src - self.ctrl_src = ctrl_src - self.raise_error = raise_error + """Initialize AgipdCondition class to read all required AGIPD parameters. + + Args: + run_dc (DataCollection): Run data collection with expected sources + to read needed parameters. + image_src (str): H5 source for image data. + ctrl_src (str): H5 source for control (slow) data. + raise_error (bool): Boolean to raise errors for missing + sources and keys. + run: (int, optional): Run number. + """ + run_dc: DataCollection + image_src: str + ctrl_src: str + karabo_id_control: str = None + raise_error: bool = False + run: int = None + + def __post_init__(self): + if self.run is None: + # TODO: check for old runs with Version < 1.0 + self.run = self.run_dc.run_metadata()["runNumber"] def _get_num_cells_ctrl(self) -> Optional[int]: """Get number of cells from CONTROL source.""" @@ -223,7 +227,7 @@ class AgipdCtrl: def get_bias_voltage( self, - karabo_id_control: str, + karabo_id_control: Optional[str] = None, module: Optional[int] = 0 ) -> int: """Read the voltage information from the RUN source of module 0. @@ -240,6 +244,12 @@ class AgipdCtrl: :param module: defaults to module 0 :return: bias voltage """ + if karabo_id_control is None: + karabo_id_control = self.karabo_id_control + if not karabo_id_control: + raise ValueError( + "`karabo_id_control` value is not given. " + "Please pass the Karabo device ID for the control device.") # TODO: Add a breaking fix by passing the source and key through # get_bias_voltage arguments. if "AGIPD1M" in karabo_id_control: @@ -296,6 +306,116 @@ class AgipdCtrl: return 12 +@dataclass +class AgipdCtrlDark: + run_ctrls: List[AgipdCtrl] + runs: list = field(init=False) + + def __post_init__(self): + self.runs = [c.run for c in self.run_ctrls] + self.gain_modes = self.get_gain_modes() + + def _validate_same_value(self, name, values): + if len(set(values)) != 1: + # Should we raise an error and stop processing? + warning( + f"{name} is not the same for all runs {self.runs} with values" + f" of {values}, respectively.") + + def sort_dark_runs(self): + assert len(self.run_ctrls) == 3, f"AGIPD dark runs are expected to be 3. {len(self.run_ctrls)} runs are given." + # TODO: complete sorting + + def fixed_gain_mode(self): + """Check if runs are in fixed gain mode. + + Raises: + ValueError: Runs are a mix of adaptive and fixed gains + ValueError: Unexpected gain modes for the dark runs + + Returns: + bool: runs are in fixed gain mode. + """ + if all(gm == AgipdGainMode.ADAPTIVE_GAIN for gm in self.gain_modes): + return False + # Some runs are adaptive by mistake. + elif any(gm == AgipdGainMode.ADAPTIVE_GAIN for gm in self.gain_modes): + raise ValueError( + f"Given runs {self.runs} have a mix of ADAPTIVE and " + f"FIXED gain modes: {self.gain_modes}.") + elif list(self.gain_modes) == [ + AgipdGainMode.FIXED_HIGH_GAIN, + AgipdGainMode.FIXED_MEDIUM_GAIN, + AgipdGainMode.FIXED_LOW_GAIN + ]: + return True + else: + raise ValueError( + "Wrong arrangement of given dark runs. Given runs' " + f"gain_modes are {self.gain_modes} for runs: {self.runs}.") + + def get_gain_modes(self): + """Get runs' gain modes. + Returns: + list: `AgipdGainMode`s + """ + return [c.get_gain_mode() for c in self.run_ctrls] + + def get_integration_time(self): + """ + Returns: + float: Integration time + """ + integration_times = [c.get_integration_time() for c in self.run_ctrls] + self._validate_same_value("Integration Time", integration_times) + return integration_times[0] + + def get_bias_voltage(self, karabo_id_control: Optional[str] = None): + """ + Args: + karabo_id_control (Optional[str], optional): + Karabo ID for control device. + + Returns: + int: Bias voltage. + """ + bias_voltages = [c.get_bias_voltage(karabo_id_control) for c in self.run_ctrls] + self._validate_same_value("Bias Voltage", bias_voltages) + return bias_voltages[0] + + def get_memory_cells(self): + """ + Returns: + int: number of memory cells. + """ + memory_cells = [c.get_num_cells() for c in self.run_ctrls] + self._validate_same_value("Memory cells", memory_cells) + return memory_cells[0] + + def get_gain_setting(self, creation_time: Optional[datetime] = None): + """ + Args: + creation_time (Optional[datetime], optional): + Creation time for the runs. + + Returns: + float: Gain Setting + """ + gain_settings = [ + c.get_gain_setting(creation_time) for c in self.run_ctrls] + self._validate_same_value("Gain Setting", gain_settings) + return gain_settings[0] + + def get_acq_rate(self): + """ + Returns: + float: Acquisition rate + """ + acquisition_rates = [c.get_acq_rate() for c in self.run_ctrls] + self._validate_same_value("acquisition_rate", acquisition_rates) + return acquisition_rates[0] + + class CellSelection: """Selection of detector memory cells (abstract class)""" row_size = 32