diff --git a/src/cal_tools/agipdlib.py b/src/cal_tools/agipdlib.py index 7dd310093db6aacd4c4d5cc19fb74855d30cd640..262defdcbdfd1b0b3297527babf5b45900030c77 100644 --- a/src/cal_tools/agipdlib.py +++ b/src/cal_tools/agipdlib.py @@ -766,11 +766,25 @@ class AgipdCorrections: seqno = int(tokens[3][1:]) agipd_base = self.h5_data_path.format(modno) - karabo_id, _, channel = agipd_base.split('/') - channel = channel.partition(":")[0] + ":output" - agipd_corr_source = f"{karabo_id}/CORR/{channel}" - - instrument_channels = [f"{agipd_corr_source}/image"] + karabo_id, _, det_channel = agipd_base.split('/') + det_channel = det_channel.partition(":")[0] + ":output" + agipd_corr_source = f"{karabo_id}/CORR/{det_channel}" + + schema = { + agipd_corr_source: ["image"] + } + addons = { + "litpx_counter": f"{karabo_id}/LITPX/{det_channel}" + } + for addon_name, src_name in addons.items(): + addon = data_dict.get(addon_name) + if addon is not None: + src_channels = schema.setdefault(src_name, []) + src_channels.append(addon.channel) + + instrument_channels = [] + for src_name, src_channels in schema.items(): + instrument_channels += [f"{src_name}/{ch}" for ch in src_channels] # backward compatibility BEGIN instrument_channels.append(f"{agipd_base}/image") @@ -815,6 +829,18 @@ class AgipdCorrections: field, shape=arr.shape, dtype=arr.dtype, **kw ) + # create addon sources + required_addon_data = [] + for addon_name, src_name in addons.items(): + addon = data_dict.get(addon_name) + if addon is None: + continue + + src = outfile.create_instrument_source(src_name) + addon.set_num_images(n_img) + addon.create_schema(src, trains, count) + required_addon_data.append((addon, src)) + # Write the corrected data for field in image_fields: if field in self.compress_fields: @@ -824,6 +850,10 @@ class AgipdCorrections: else: image_grp[field][:] = data_dict[field][:n_img] + # write addon data + for addon, src in required_addon_data: + addon.write(src) + def _write_compressed_frames(self, dataset, arr): """Compress gain/mask frames in multiple threads, and save their data @@ -1513,8 +1543,12 @@ class AgipdCorrections: class LitPixelCounter: + channel = "litpx" + output_fields = [ + "cellId", "pulseId", "trainId", "litPixels", "goodPixels"] + def __init__(self, data, threshold=0.8): - self.data = data + self.data = data.copy() for name in ["data", "mask", "cellId", "pulseId", "trainId"]: assert name in data @@ -1523,11 +1557,13 @@ class LitPixelCounter: self.threshold = threshold self.max_images = data["data"].shape[0] + self.num_images = self.max_images self.num_good_px = sharedmem.full(self.max_images, 0, int) self.num_lit_px = sharedmem.full(self.max_images, 0, int) - self.num_images = self.max_images + self.data["litPixels"] = self.num_lit_px + self.data["goodPixels"] = self.num_good_px def set_num_images(self, num_images): self.num_images = num_images @@ -1540,6 +1576,36 @@ class LitPixelCounter: self.image[i] > self.threshold, initial=0, where=mask) self.num_good_px[i] = np.sum(mask) + def create_schema(self, source, file_trains=None, count=None): + if file_trains is None: + file_trains = source.file["INDEX/trainId"][:] + + if count is None: + tid = self.data["trainId"][:self.num_images] + trains, count = np.unique(tid, return_counts=True) + count = count[np.in1d(trains, file_trains)] + + if len(file_trains) != len(count): + raise ValueError( + "The length of data count does not match the number of trains") + if np.sum(count) != self.num_images: + raise ValueError( + "The sum of data count does not match " + "the total number of data entries") + + source.create_index(**{self.channel: count}) + for key in self.output_fields: + source.create_dataset( + f"{self.channel}/{key}", + shape=(self.num_images,), + dtype=self.data[key].dtype + ) + + def write(self, source): + channel = source[self.channel] + for key in self.output_fields: + channel[key][:] = self.data[key][:self.num_images] + def validate_selected_pulses( max_pulses: List[int], max_cells: int