diff --git a/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb b/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb index 8dc0ff4d6f29b78e443b3d2702f134e7e90d07bf..1b5e972de4697d65fcd17d3b1389ae109fbb6357 100644 --- a/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb +++ b/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb @@ -100,7 +100,6 @@ " get_dir_creation_date,\n", " get_pdu_from_db,\n", " map_seq_files,\n", - " write_compressed_frames,\n", " CalibrationMetadata,\n", ")\n", "from iCalibrationDB import Conditions, Constants\n", @@ -590,11 +589,10 @@ " outp_source.create_key(\n", " \"data.adc\", data=data_corr,\n", " chunks=(min(chunks_data, data_corr.shape[0]), *oshape[1:]))\n", - "\n", - " write_compressed_frames(\n", - " gain_corr, outp_file, f\"{outp_source.name}/data/gain\", comp_threads=8)\n", - " write_compressed_frames(\n", - " mask_corr, outp_file, f\"{outp_source.name}/data/mask\", comp_threads=8)\n", + " outp_source.create_compressed_key(\n", + " \"data.gain\", data=gain_corr)\n", + " outp_source.create_compressed_key(\n", + " \"data.mask\", data=mask_corr)\n", "\n", " save_reduced_rois(outp_file, data_corr, mask_corr, local_karabo_da)\n", "\n", diff --git a/notebooks/LPD/LPD_Correct_Fast.ipynb b/notebooks/LPD/LPD_Correct_Fast.ipynb index fe517d8dc865178478537b64895580c5d3425142..5406cab07513eb3581330db9b171d254f68f2923 100644 --- a/notebooks/LPD/LPD_Correct_Fast.ipynb +++ b/notebooks/LPD/LPD_Correct_Fast.ipynb @@ -105,11 +105,7 @@ "from extra_data.components import LPD1M\n", "\n", "from cal_tools.lpdalgs import correct_lpd_frames\n", - "from cal_tools.tools import (\n", - " CalibrationMetadata,\n", - " calcat_creation_time,\n", - " write_compressed_frames,\n", - " )\n", + "from cal_tools.tools import CalibrationMetadata, calcat_creation_time\n", "from cal_tools.files import DataFile\n", "from cal_tools.restful_config import restful_config" ] @@ -457,10 +453,8 @@ " chunks=(min(chunks_ids, in_pulse.shape[0]),))\n", " outp_source.create_key('image.data', data=out_data,\n", " chunks=(min(chunks_data, out_data.shape[0]), 256, 256))\n", - " write_compressed_frames(\n", - " out_gain, outp_file, f'INSTRUMENT/{outp_source_name}/image/gain', comp_threads=8)\n", - " write_compressed_frames(\n", - " out_mask, outp_file, f'INSTRUMENT/{outp_source_name}/image/mask', comp_threads=8)\n", + " outp_source.create_compressed_key('image.gain', data=out_gain)\n", + " outp_source.create_compressed_key('image.mask', data=out_mask)\n", " write_time = perf_counter() - start\n", " \n", " total_time = open_time + read_time + correct_time + write_time\n", diff --git a/src/cal_tools/files.py b/src/cal_tools/files.py index 5d19004413595fc7c86445d4770b24b366769b9e..8decff781ace9cb7bcdbddf590f9478de1eeb87f 100644 --- a/src/cal_tools/files.py +++ b/src/cal_tools/files.py @@ -537,6 +537,34 @@ class InstrumentSource(h5py.Group): return self.create_dataset(key, data=data, **kwargs) + def create_compressed_key(self, key, data, comp_threads=8): + """Create a compressed dataset for a key. + + This method makes use of lower-level access in h5py to compress + the data separately in multiple threads and write it directly to + file rather than go through HDF's compression filters. + + Args: + key (str): Source key, dots are automatically replaced by + slashes. + data (np.ndarray): Key data.ss + comp_threads (int, optional): Number of threads to use for + compression, 8 by default. + + Returns: + (h5py.Dataset) Created dataset + """ + + key = escape_key(key) + + if not self.key_pattern.match(key): + raise ValueError(f'invalid key format, must satisfy ' + f'{self.key_pattern.pattern}') + + from cal_tools.tools import write_compressed_frames + return write_compressed_frames(data, self, key, + comp_threads=comp_threads) + def create_index(self, *args, **channels): """Create source-specific INDEX datasets. diff --git a/src/cal_tools/tools.py b/src/cal_tools/tools.py index 615047957eea9b0f42b232902fbacfe74f0918af..2d15866ac2517bc0925e2b6ad5bad10560f5ab36 100644 --- a/src/cal_tools/tools.py +++ b/src/cal_tools/tools.py @@ -951,3 +951,5 @@ def write_compressed_frames( # Each frame is 1 complete chunk chunk_start = (i,) + (0,) * (dataset.ndim - 1) dataset.id.write_direct_chunk(chunk_start, compressed) + + return dataset