diff --git a/src/cal_tools/agipdlib.py b/src/cal_tools/agipdlib.py index a587a3cdde3b843848a0f9ec6a70a20c93f79472..bd9e08f768ae46820e43cff44b0ac8db3f56821d 100644 --- a/src/cal_tools/agipdlib.py +++ b/src/cal_tools/agipdlib.py @@ -988,58 +988,7 @@ class AgipdCorrections: data=cntsv, fletcher32=True) - def retrieve_constant_and_time(self, karabo_id: str, karabo_da: str, - const_dict: Dict[str, Any], - cal_db_interface: str, - creation_time: str - ) -> Tuple[Dict[str, Any], Dict[str, Any]]: - """A wrapper around get_constant_from_db_and_time to get - constant image and creation time using a dictionary of - the names of the required constant from the data base - and their parameter operating conditions. - - :param karabo_id: karabo identifier - :param karabo_da: karabo data aggregator - :param const_dict: (Dict) A dictionary with constants to retrieve - and their operating conditions. - e.g.{ "Offset": ["zeros", (128, 512, memory_cells, 3), "Dark"] - "Dark-cond": {Condition-name}: condition-value } - :param cal_db_interface: (Str) The port interface for - calibrationDBRemote - :param creation_time: (Str) Raw data creation time - :return: - cons_data: (np.array) Constant data - when: (Dict) A dictionary with all retrieved constants - and their creation-time. {"Constant-Name": "creation-time"} - """ - when = {} - cons_data = {} - for cname, cval in const_dict.items(): - condition = getattr( - Conditions, cval[2][0]).AGIPD(**cval[2][1]) - cdata, md = get_from_db( - karabo_id=karabo_id, - karabo_da=karabo_da, - constant=getattr(Constants.AGIPD, cname)(), - condition=condition, - empty_constant=getattr(np, cval[0])(cval[1]), - cal_db_interface=cal_db_interface, - creation_time=creation_time, - verbosity=0, - ) - cons_data[cname] = { - "data": cdata, - "variant": md.calibration_constant_version.variant - } - - when[cname] = None - # Read the CCV begin at if constant was retrieved successfully. - if md and md.comm_db_success: - when[cname] = md.calibration_constant_version.begin_at - - return cons_data, when - - def init_constants(self, cons_data, when, module_idx): + def init_constants(self, cons_data, when, module_idx, variant): """ For CI derived gain, a mean multiplication factor of 4.48 compared to medium gain is used, as no reliable CI data for all memory cells @@ -1081,6 +1030,7 @@ class AgipdCorrections: :param when: A dictionary for the creation time of each retrieved constant. :param module_idx: A module_idx index + :param variant: A dictionary for the variant of each retrieved CCV. :return: """ @@ -1088,32 +1038,29 @@ class AgipdCorrections: # assuming this method runs in parallel. calgs_opts = dict(num_threads=os.cpu_count() // len(self.offset)) - calgs.transpose_constant( - self.offset[module_idx], cons_data["Offset"]["data"], **calgs_opts) # noqa - calgs.transpose_constant( - self.noise[module_idx], cons_data["Noise"]["data"], **calgs_opts) + calgs.transpose_constant(self.offset[module_idx], cons_data['Offset'], **calgs_opts) + calgs.transpose_constant(self.noise[module_idx], cons_data['Noise'], **calgs_opts) if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN: - calgs.transpose_constant( - self.thresholds[module_idx], - cons_data["ThresholdsDark"]["data"][..., :3], - **calgs_opts - ) + calgs.transpose_constant(self.thresholds[module_idx], + cons_data['ThresholdsDark'][..., :3], + **calgs_opts) if self.corr_bools.get("low_medium_gap"): t0 = self.thresholds[module_idx][0] t1 = self.thresholds[module_idx][1] t1[t1 <= 1.05 * t0] = np.iinfo(np.uint16).max - bpixels = cons_data["BadPixelsDark"]["data"].astype(np.uint32) + bpixels = cons_data["BadPixelsDark"].astype(np.uint32) if self.corr_bools.get("xray_corr"): if when["BadPixelsFF"]: - bpixels |= cons_data["BadPixelsFF"]["data"].astype(np.uint32)[ - ..., :bpixels.shape[2], None] + bpixels |= cons_data["BadPixelsFF"].astype(np.uint32)[..., + :bpixels.shape[2], # noqa + None] if when["SlopesFF"]: # Checking if constant was retrieved - slopesFF = cons_data["SlopesFF"]["data"] + slopesFF = cons_data["SlopesFF"] # This could be used for backward compatibility # for very old SlopesFF constants if len(slopesFF.shape) == 4: @@ -1150,16 +1097,15 @@ class AgipdCorrections: # add additional bad pixel information if any(self.pc_bools): if when["BadPixelsPC"]: - bppc = np.moveaxis( - cons_data["BadPixelsPC"]["data"].astype(np.uint32), 0, 2) + bppc = np.moveaxis(cons_data["BadPixelsPC"].astype(np.uint32), + 0, 2) bpixels |= bppc[..., :bpixels.shape[2], None] # calculate relative gain from the constants rel_gain = np.ones((128, 512, self.max_cells, 3), np.float32) if when["SlopesPC"]: - slopesPC = cons_data["SlopesPC"]["data"].astype( - np.float32, copy=False) + slopesPC = cons_data["SlopesPC"].astype(np.float32, copy=False) # This will handle some historical data in a different format # constant dimension injected first @@ -1258,6 +1204,7 @@ class AgipdCorrections: # string of the device name. cons_data = dict() when = dict() + variant = dict() db_module = const_yaml[karabo_da]["physical-detector-unit"] for cname, mdata in const_yaml[karabo_da]["constants"].items(): cons_data[cname] = {} @@ -1265,16 +1212,15 @@ class AgipdCorrections: when[cname] = mdata["creation-time"] if when[cname]: with h5py.File(mdata["file-path"], "r") as cf: - cons_data[cname]["data"] = np.copy(cf[f"{base_key}/data"]) + cons_data[cname] = np.copy(cf[f"{base_key}/data"]) # The vairant attribute is missing for old constants. if "variant" in cf[base_key].attrs.keys(): - cons_data[cname]["variant"] = cf[base_key].attrs["variant"] # noqa + variant[cname] = cf[base_key].attrs["variant"] # noqa else: # Create empty constant using the list elements - cons_data[cname]["data"] = getattr( - np, mdata["file-path"][0])(mdata["file-path"][1]) + cons_data[cname] = getattr(np, mdata["file-path"][0])(mdata["file-path"][1]) # noqa - self.init_constants(cons_data, when, module_idx) + self.init_constants(cons_data, when, module_idx, variant) return when @@ -1353,11 +1299,32 @@ class AgipdCorrections: integration_time=integration_time ) - cons_data, when = self.retrieve_constant_and_time( - karabo_id, karabo_da, const_dict, cal_db_interface, creation_time - ) + when = {} + cons_data = {} + variant = {} + + for cname, cval in const_dict.items(): + condition = getattr( + Conditions, cval[2][0]).AGIPD(**cval[2][1]) + cdata, md = get_from_db( + karabo_id=karabo_id, + karabo_da=karabo_da, + constant=getattr(Constants.AGIPD, cname)(), + condition=condition, + empty_constant=getattr(np, cval[0])(cval[1]), + cal_db_interface=cal_db_interface, + creation_time=creation_time, + verbosity=0, + ) + cons_data[cname] = cdata + variant[cname] = md.calibration_constant_version.variant + + when[cname] = None + # Read the CCV begin at if constant was retrieved successfully. + if md and md.comm_db_success: + when[cname] = md.calibration_constant_version.begin_at - self.init_constants(cons_data, when, module_idx) + self.init_constants(cons_data, when, module_idx, variant) return when diff --git a/src/cal_tools/tools.py b/src/cal_tools/tools.py index 3f99b4560356e032373b6cf1f8bd4f4a1cbe701a..81c0d7467b401d75f0ffbb8c618d1f79416fb033 100644 --- a/src/cal_tools/tools.py +++ b/src/cal_tools/tools.py @@ -642,8 +642,8 @@ def send_to_db(db_module: str, karabo_id: str, constant, condition, :param creation_time: Latest time for constant to be created :param timeout: Timeout for zmq request :param ntries: number of tries to contact the database, - ntries is set to 7 so that if the timeout started - at 30s last timeout will be ~ 1h. + ntries is set to 7 so that if the timeout started + at 30s last timeout will be ~ 1h. :param doraise: if True raise errors during communication with DB :param variant: A calibration constant version variant attribute for the constant file. diff --git a/tests/test_cal_tools.py b/tests/test_cal_tools.py index cc82e4b17e70be49ab0e3644522d4bd86e2e93aa..102f613b7b9f3b9bdb032f5ebe5ab218a8d1e098 100644 --- a/tests/test_cal_tools.py +++ b/tests/test_cal_tools.py @@ -335,6 +335,7 @@ def test_get_pdu_from_db(_agipd_const_cond): "CAL_PHYSICAL_DETECTOR_UNIT-2_TEST"] +# TODO add a marker for accessing zmq end_point @pytest.mark.requires_gpfs @pytest.mark.requires_caldb def test_initialize_from_db():