Skip to content
Snippets Groups Projects
Commit ae582d37 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Test loading data & add parallel option

parent a75e9ee1
No related branches found
No related tags found
1 merge request!885Revised CalCat API
...@@ -282,28 +282,39 @@ class ModulesConstantVersions: ...@@ -282,28 +282,39 @@ class ModulesConstantVersions:
if m["karabo_da"] in self.constants if m["karabo_da"] in self.constants
] ]
def ndarray(self, caldb_root=None): def ndarray(self, caldb_root=None, *, parallel=0):
eg_dset = self.constants[self.aggregator_names[0]].dataset_obj(caldb_root) eg_dset = self.constants[self.aggregator_names[0]].dataset_obj(caldb_root)
shape = (len(self.constants),) + eg_dset.shape shape = (len(self.constants),) + eg_dset.shape
arr = np.zeros(shape, eg_dset.dtype)
for i, agg in enumerate(self.aggregator_names): if parallel > 0:
dset = self.constants[agg].dataset_obj(caldb_root) load_ctx = psh.ProcessContext(num_workers=parallel)
dset.read_direct(arr[i]) else:
load_ctx = psh.SerialContext()
arr = psh.alloc(shape, eg_dset.dtype, fill=0)
def _load_constant_dataset(wid, index, mod):
dset = self.constants[mod].dataset_obj(caldb_root)
dset.read_direct(arr[index])
load_ctx.map(_load_constant_dataset, self.aggregator_names)
return arr return arr
def xarray(self, module_naming="da", caldb_root=None): def xarray(self, module_naming="modnum", caldb_root=None, *, parallel=0):
import xarray import xarray
if module_naming == "da": if module_naming == "aggregator":
modules = self.aggregator_names modules = self.aggregator_names
elif module_naming == "modno": elif module_naming == "modnum":
modules = self.module_nums modules = self.module_nums
elif module_naming == "qm": elif module_naming == "qm":
modules = self.qm_names modules = self.qm_names
else: else:
raise ValueError(f"{module_naming=} (must be 'da', 'modno' or 'qm'") raise ValueError(
f"{module_naming=} (must be 'aggregator', 'modnum' or 'qm'"
)
ndarr = self.ndarray(caldb_root) ndarr = self.ndarray(caldb_root, parallel=parallel)
# Dimension labels # Dimension labels
dims = ["module"] + ["dim_%d" % i for i in range(ndarr.ndim - 1)] dims = ["module"] + ["dim_%d" % i for i in range(ndarr.ndim - 1)]
......
import numpy as np
import pytest import pytest
import xarray as xr
from cal_tools.calcat_interface2 import ( from cal_tools.calcat_interface2 import (
CalibrationData, CalibrationData,
...@@ -59,6 +61,33 @@ def test_AGIPD_CalibrationData_metadata_SPB(): ...@@ -59,6 +61,33 @@ def test_AGIPD_CalibrationData_metadata_SPB():
assert isinstance(agipd_cd["Offset"].constants["AGIPD00"], SingleConstantVersion) assert isinstance(agipd_cd["Offset"].constants["AGIPD00"], SingleConstantVersion)
@pytest.mark.requires_gpfs
def test_AGIPD_load_data():
cond = AGIPDConditions(
sensor_bias_voltage=300,
memory_cells=352,
acquisition_rate=1.1,
integration_time=12,
source_energy=9.2,
gain_mode=0,
gain_setting=0,
)
agipd_cd = CalibrationData.from_condition(
cond,
"SPB_DET_AGIPD1M-1",
event_at="2020-01-07 13:26:48.00",
)
arr = agipd_cd["Offset"].select_modules(list(range(4))).xarray()
assert arr.shape == (4, 128, 512, 352, 3)
assert arr.dims[0] == 'module'
np.testing.assert_array_equal(arr.coords['module'], np.arange(0, 4))
assert arr.dtype == np.float64
# Load parallel
arr_p = agipd_cd["Offset"].select_modules(list(range(4))).xarray(parallel=4)
xr.testing.assert_identical(arr_p, arr)
@pytest.mark.requires_gpfs @pytest.mark.requires_gpfs
def test_DSSC_modules_missing(): def test_DSSC_modules_missing():
dssc_cd = CalibrationData.from_condition( dssc_cd = CalibrationData.from_condition(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment