Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • calibration/pycalibration
1 result
Show changes
Commits on Source (9)
%% Cell type:markdown id: tags:
# Characterization of dark and flat field for Dynamic Flat Field correction
Author: Egor Sobolev
Computation of dark offsets and flat-field principal components
%% Cell type:code id: tags:
``` python
in_folder = "/gpfs/exfel/exp/SPB/202430/p900425/raw" # input folder, required
out_folder = '/gpfs/exfel/data/scratch/esobolev/test/shimadzu' # output folder, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
run_high = 1 # run number in which dark data was recorded, required
run_low = 2 # run number in which flat-field data was recorded, required
operation_mode = "TI_DynamicFF" # Detector operation mode, optional (defaults to "TI_DynamicFF")
operation_mode = "DynamicFF" # Detector operation mode, optional (defaults to "TI_DynamicFF")
# Data files parameters.
karabo_da = ['-1'] # data aggregators
karabo_id = "SPB_MIC_HPVX2" # karabo prefix of Shimadzu HPV-X2 devices
# Database access parameters.
cal_db_interface = "tcp://max-exfl-cal001:8021" # calibration DB interface to use
db_output = True # if True, the notebook sends dark constants to the calibration database
cal_db_interface = "tcp://max-exfl-cal001:8021" # Unused, calibration DB interface to use
db_output = False # if True, the notebook sends dark constants to the calibration database
local_output = True # if True, the notebook saves dark constants locally
# Calibration constants parameters
n_components = 50 # Number of principal components of flat-field to compute (default: 50)
```
%% Cell type:code id: tags:
``` python
import datetime
import os
import warnings
from logging import warning
from shutil import copyfile
from tempfile import NamedTemporaryFile
warnings.filterwarnings('ignore')
import time
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, Markdown
from extra_data import RunDirectory
%matplotlib inline
from cal_tools.step_timing import StepTimer
from cal_tools.tools import (
get_dir_creation_date,
get_random_db_interface,
get_report,
save_dict_to_hdf5,
run_prop_seq_from_path,
save_dict_to_hdf5
)
from cal_tools.restful_config import calibration_client
from cal_tools.shimadzu import ShimadzuHPVX2
import dynflatfield as dffc
from dynflatfield.draw import plot_images, plot_camera_image
```
%% Cell type:code id: tags:
``` python
cal_db_interface = get_random_db_interface(cal_db_interface)
print(f'Calibration database interface: {cal_db_interface}')
print()
cc = calibration_client()
pdus = cc.get_all_phy_det_units_from_detector(
{"detector_identifier": karabo_id})
if not pdus["success"]:
raise ValueException("Failed to retrieve PDUs")
raise ValueError("Failed to retrieve PDUs")
detector_info = pdus['data'][0]['detector']
detector = ShimadzuHPVX2(detector_info["source_name_pattern"])
print(f"Instrument {detector.instrument}")
print(f"Detector in use is {karabo_id}")
modules = {}
for pdu_no, pdu in enumerate(pdus["data"]):
db_module = pdu["physical_name"]
module = pdu["module_number"]
da = pdu["karabo_da"]
if karabo_da[0] != "-1" and da not in karabo_da:
continue
instrument_source_name = detector.instrument_source(module)
print('-', da, db_module, module, instrument_source_name)
modules[da] = dict(
db_module=db_module,
module=module,
raw_source_name=instrument_source_name,
pdu_no=pdu_no,
)
constants = {}
step_timer = StepTimer()
```
%% Cell type:markdown id: tags:
# Offset map
%% Cell type:code id: tags:
``` python
dark_run = run_high
dark_creation_time = get_dir_creation_date(in_folder, dark_run)
print(f"Using {dark_creation_time} as creation time of Offset constant.")
for da, meta in modules.items():
source_name = detector.instrument_source(meta["module"])
image_key = detector.image_key
display(Markdown(f"## {source_name}"))
# read
step_timer.start()
file_da, _, _ = da.partition('/')
dark_dc = RunDirectory(f"{in_folder}/r{dark_run:04d}",
include=f"RAW-R{dark_run:04d}-{file_da}-S*.h5")
if source_name not in dark_dc.all_sources:
print(f"Source {source_name} for module {da} is missed")
continue
raise ValueError(f"Could not find source {source_name} for module {da} in dark data")
dark_dc = dark_dc.select([(source_name, image_key)])
conditions = detector.conditions(dark_dc, meta["module"])
key_data = dark_dc[source_name, image_key]
images_dark = key_data.ndarray()
ntrain, npulse, ny, nx = images_dark.shape
print(f"N image: {ntrain * npulse} (ntrain: {ntrain}, npulse: {npulse})")
print(f"Image size: {ny} x {nx} px")
step_timer.done_step("Read dark images")
# process
step_timer.start()
dark = dffc.process_dark(images_dark)
dark = dffc.process_dark(images_dark) # Amounts to a per-pixel mean right now.
# put results in the dict
module_constants = constants.setdefault(meta["db_module"], {})
module_constants["Offset"] = dict(
conditions=conditions, data=dark, pdu_no=meta["pdu_no"],
creation_time=dark_creation_time
)
step_timer.done_step("Process dark images")
display()
# draw plots
step_timer.start()
plot_camera_image(dark)
plt.show()
step_timer.done_step("Draw offsets")
```
%% Cell type:markdown id: tags:
# Flat-field PCA decomposition
%% Cell type:code id: tags:
``` python
flat_run = run_low
flat_creation_time = get_dir_creation_date(in_folder, flat_run)
print(f"Using {flat_creation_time} as creation time of DynamicFF constant.")
for da, meta in modules.items():
source_name = detector.instrument_source(meta["module"])
image_key = detector.image_key
display(Markdown(f"## {source_name}"))
# read
step_timer.start()
file_da, _, _ = da.partition('/')
flat_dc = RunDirectory(f"{in_folder}/r{flat_run:04d}",
include=f"RAW-R{flat_run:04d}-{file_da}-S*.h5")
if source_name not in flat_dc.all_sources:
print(f"Source {source_name} for module {da} is missed")
continue
raise ValueError(f"Could not find source {source_name} for module {da} in flatfield data")
flat_dc = flat_dc.select([(source_name, image_key)])
conditions = detector.conditions(flat_dc, meta["module"])
dark = constants[meta["db_module"]]["Offset"]["data"]
dark_conditions = constants[meta["db_module"]]["Offset"]["conditions"]
if conditions != dark_conditions:
ValueError("The conditions for flat-field run does not match "
"the dark run conditions. Skip flat-field characterization.")
raise ValueError(f"The conditions for flat-field run {conditions}) do not match "
f"the dark run conditions ({dark_conditions}). Skip flat-field characterization.")
key_data = flat_dc[source_name][image_key]
images_flat = key_data.ndarray()
ntrain, npulse, ny, nx = images_flat.shape
print(f"N image: {ntrain * npulse} (ntrain: {ntrain}, npulse: {npulse})")
print(f"Image size: {ny} x {nx} px")
step_timer.done_step("Read flat-field images")
# process
step_timer.start()
flat, components, explained_variance_ratio = dffc.process_flat(
images_flat, dark, n_components)
flat_data = np.concatenate([flat[None, ...], components])
# put results in the dict
conditions = detector.conditions(flat_dc, meta["module"])
module_constants = constants.setdefault(meta["db_module"], {})
module_constants["DynamicFF"] = dict(
conditions=conditions, data=flat_data, pdu_no=meta["pdu_no"],
creation_time=flat_creation_time
)
step_timer.done_step("Process flat-field images")
# draw plots
step_timer.start()
display(Markdown("### Average flat-field"))
plot_camera_image(flat)
plt.show()
display(Markdown("### Explained variance ratio"))
fig, ax = plt.subplots(1, 1, figsize=(10,4), tight_layout=True)
ax.semilogy(explained_variance_ratio, 'o')
ax.set_xticks(np.arange(len(explained_variance_ratio)))
ax.set_xlabel("Component no.")
ax.set_ylabel("Variance fraction")
plt.show()
display(Markdown("### The first principal components (up to 20)"))
plot_images(components[:20], figsize=(13, 8))
plt.show()
step_timer.done_step("Draw flat-field")
```
%% Cell type:markdown id: tags:
## Calibration constants
%% Cell type:code id: tags:
``` python
step_timer.start()
# Output Folder Creation:
os.makedirs(out_folder, exist_ok=True)
if local_output:
os.makedirs(out_folder, exist_ok=True)
def inject_ccv(in_folder, metadata_folder, runs, calibration, cond, pdu, const_input, begin_at):
print("* Send to db:", const_input)
print(" - in folder:", in_folder)
print(" - metadata folder:", metadata_folder)
print(" - runs:", runs)
print(" -", calibration)
print(" -", cond)
print(" -", begin_at)
for db_module, module_constants in constants.items():
for constant_name, constant in module_constants.items():
conditions = constant["conditions"]
conditions_dict = conditions.make_dict(
conditions.calibration_types[constant_name])
data_to_store = {db_module: {constant_name: {'0': {
'conditions': conditions_dict,
'data': constant["data"],
}}}}
ofile = f"{out_folder}/const_{constant_name}_{db_module}.h5"
if os.path.isfile(ofile):
print(f'File {ofile} already exists and will be overwritten')
save_dict_to_hdf5(data_to_store, ofile)
if db_output:
inject_ccv(
in_folder, metadata_folder, [dark_run, flat_run],
constant_name, conditions, pdus["data"][constant["pdu_no"]],
ofile, constant["creation_time"]
)
with NamedTemporaryFile() as tempf:
save_dict_to_hdf5(data_to_store, tempf)
if db_output:
inject_ccv(
in_folder, metadata_folder, [dark_run, flat_run],
constant_name, conditions, pdus["data"][constant["pdu_no"]],
ofile, constant["creation_time"]
)
if local_output:
ofile = f"{out_folder}/const_{constant_name}_{db_module}.h5"
if os.path.isfile(ofile):
print(f'File {ofile} already exists and will be overwritten')
if not local_output:
os.unlink(ofile)
copyfile(tempf.name, ofile)
```
%% Cell type:code id: tags:
``` python
print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary()
```
......
%% Cell type:markdown id: tags:
# Dynamic Flat-field Offline Correction
Author: Egor Sobolev
Offline dynamic flat-field correction
%% Cell type:code id: tags:
``` python
in_folder = "/gpfs/exfel/exp/SPB/202430/p900425/raw" # input folder, required
out_folder ="/gpfs/exfel/exp/SPB/202430/p900425/scratch/proc/r0003" # output folder, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
run = 3 # which run to read data from, required
# Data files parameters.
karabo_da = ['-1'] # data aggregators
karabo_id = "SPB_MIC_HPVX2" # karabo prefix of Shimadzu HPV-X2 devices
# Database access parameters.
cal_db_interface = "tcp://max-exfl-cal001:8021" # calibration DB interface to use
cal_db_interface = "tcp://max-exfl-cal001:8021" # Unused, calibration DB interface to use
# Correction parameters
n_components = 20 # number of principal components of flat-field to use in correction
downsample_factors = [1, 1] # list of downsample factors for each image dimention (y, x)
num_proc = 32 # number of processes running correction in parallel
```
%% Cell type:code id: tags:
``` python
import os
import h5py
import warnings
from logging import warning
warnings.filterwarnings('ignore')
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, Markdown
from datetime import datetime
from extra_data import RunDirectory, by_id
%matplotlib inline
from cal_tools.step_timing import StepTimer
from cal_tools.files import sequence_trains, DataFile
from cal_tools.tools import get_dir_creation_date
from cal_tools.restful_config import calibration_client, restful_config
from cal_tools.calcat_interface2 import CalibrationData, setup_client
from cal_tools.shimadzu import ShimadzuHPVX2
from dynflatfield import (
DynamicFlatFieldCorrectionCython as DynamicFlatFieldCorrection,
FlatFieldCorrectionFileProcessor
)
from dynflatfield.draw import plot_images, plot_camera_image
```
%% Cell type:code id: tags:
``` python
creation_time = get_dir_creation_date(in_folder, run)
print(f"Creation time is {creation_time}")
cc = calibration_client()
pdus = cc.get_all_phy_det_units_from_detector(
{"detector_identifier": karabo_id})
if not pdus["success"]:
raise ValueException("Failed to retrieve PDUs")
detector_info = pdus['data'][0]['detector']
detector = ShimadzuHPVX2(detector_info["source_name_pattern"])
index_group = detector.image_index_group
image_key = detector.image_key
print(f"Instrument {detector.instrument}")
print(f"Detector in use is {karabo_id}")
modules = {}
for pdu in pdus["data"]:
db_module = pdu["physical_name"]
module = pdu["module_number"]
da = pdu["karabo_da"]
if karabo_da[0] != "-1" and da not in karabo_da:
continue
instrument_source_name = detector.instrument_source(module)
corrected_source_name = detector.corrected_source(module)
print('-', da, db_module, module, instrument_source_name)
modules[da] = dict(
db_module=db_module,
module=module,
raw_source_name=instrument_source_name,
corrected_source_name=corrected_source_name,
)
step_timer = StepTimer()
```
%% Cell type:markdown id: tags:
# Calibration constants
%% Cell type:code id: tags:
``` python
# !!! REMOVE IT for production
# ---------------------------------------------------
from cal_tools.restful_config import restful_config
from cal_tools.calcat_interface2 import setup_client
calcat_config = restful_config.get('calcat')
setup_client( # won't be needed in production
#base_url=calcat_config['base-api-url'].rpartition('/')[0],
base_url='https://in.xfel.eu/test_calibration',
client_id=calcat_config['user-id'],
client_secret=calcat_config['user-secret'],
user_email=calcat_config['user-email'],
)
caldb_root = "/gpfs/exfel/d/cal_tst/caldb_store"
creation_time = datetime.now()
# ===================================================
step_timer.start()
dc = RunDirectory(f"{in_folder}/r{run:04d}")
conditions = detector.conditions(dc)
caldata = CalibrationData.from_condition(
conditions, 'SPB_MIC_HPVX2', event_at=creation_time)
aggregators = {}
corrections = {}
for da in modules:
try:
# !!! REMOVE caldb_root for production
dark = caldata["Offset", da].ndarray(caldb_root=caldb_root)
flat = caldata["DynamicFF", da].ndarray(caldb_root=caldb_root)
components = flat[1:][:n_components]
flat = flat[0]
dffc = DynamicFlatFieldCorrection.from_constants(
dark, flat, components, downsample_factors)
corrections[da] = dffc
file_da, _, _ = da.partition('/')
aggregators.setdefault(file_da, []).append(da)
except (KeyError, FileNotFoundError):
warning(f"Constants are not found for module {da}. "
"The module will not calibrated")
step_timer.done_step("Load calibration constants")
```
%% Cell type:markdown id: tags:
# Correction
%% Cell type:code id: tags:
``` python
# Output Folder Creation:
os.makedirs(out_folder, exist_ok=True)
report = []
for file_da, file_modules in aggregators.items():
dc = RunDirectory(f"{in_folder}/r{run:04d}", f"RAW-R{run:04d}-{file_da}-S*.h5")
# build train IDs
train_ids = set()
process_modules = []
for da in file_modules:
instrument_source = modules[da]["raw_source_name"]
if instrument_source in dc.all_sources:
keydata = dc[instrument_source][image_key].drop_empty_trains()
train_ids.update(keydata.train_ids)
process_modules.append(da)
else:
print(f"Source {instrument_source} for module {da} is missed")
train_ids = np.array(sorted(train_ids))
ts = dc.select_trains(by_id[train_ids]).train_timestamps().astype(np.uint64)
# correct and write sequence files
for seq_id, train_mask in sequence_trains(train_ids, 200):
step_timer.start()
print('* sequience', seq_id)
print('* sequence', seq_id)
seq_train_ids = train_ids[train_mask]
seq_timestamps = ts[train_mask]
dc_seq = dc.select_trains(by_id[seq_train_ids])
ntrains = len(seq_train_ids)
# create output file
channels = [f"{modules[da]['corrected_source_name']}/{index_group}"
for da in process_modules]
f = DataFile.from_details(out_folder, file_da, run, seq_id)
f.create_metadata(like=dc, instrument_channels=channels)
f.create_index(seq_train_ids, timestamps=seq_timestamps)
# create file structure
seq_report = {}
file_datasets = {}
for da in process_modules:
instrument_source = modules[da]["raw_source_name"]
keydata = dc_seq[instrument_source][image_key].drop_empty_trains()
count = keydata.data_counts()
i = np.flatnonzero(count.values)
count = keydata.data_counts(labelled=False)
i = np.flatnonzero(count)
raw_images = keydata.select_trains(np.s_[i]).ndarray()
# not pulse resolved
shape = keydata.shape
count = np.in1d(seq_train_ids, keydata.train_ids).astype(int)
corrected_source = modules[da]["corrected_source_name"]
src = f.create_instrument_source(corrected_source)
src.create_index(index_group=count)
# create key for images
ds_data = src.create_key(image_key, shape=shape, dtype=np.float32)
module_datasets = {image_key: ds_data}
# create keys for image parameters
for key in detector.copy_keys:
keydata = dc_seq[instrument_source][key].drop_empty_trains()
module_datasets[key] = (keydata, src.create_key(
key, shape=keydata.shape, dtype=keydata.dtype))
file_datasets[da] = module_datasets
step_timer.done_step("Create output file")
# correct and write data to file
for da in process_modules:
step_timer.start()
dc_seq = dc.select_trains(by_id[seq_train_ids])
dffc = corrections[da]
instrument_source = modules[da]["raw_source_name"]
proc = FlatFieldCorrectionFileProcessor(dffc, num_proc, instrument_source, image_key)
proc.start_workers()
proc.run(dc_seq)
proc.join_workers()
# not pulse resolved
corrected_images = np.stack(proc.rdr.results, 0)
file_datasets[da][image_key][:] = corrected_images
# copy image parameters
for key in detector.copy_keys:
keydata, ds = file_datasets[da][key]
ds[:] = keydata.ndarray()
seq_report[da] = (raw_images[0, 0], corrected_images[:20, 0])
step_timer.done_step("Correct flat-field")
f.close()
report.append(seq_report)
```
%% Cell type:code id: tags:
``` python
step_timer.start()
if report:
for da, (raw_image, corrected_images) in report[0].items():
source = modules[da]["raw_source_name"]
display(Markdown(f"## {source}"))
display(Markdown("### The first raw image"))
plot_camera_image(raw_images[0, 0])
plt.show()
display(Markdown("### The first corrected image"))
plot_camera_image(corrected_images[0])
plt.show()
display(Markdown("### The first corrected images in the trains (up to 20)"))
plot_images(corrected_images, figsize=(13, 8))
plt.show()
step_timer.done_step("Draw images")
```
%% Cell type:code id: tags:
``` python
print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary()
```
......
......@@ -837,3 +837,13 @@ class DSSCConditions(ConditionsBase):
"Offset": _params,
"Noise": _params,
}
@dataclass
class ShimadzuHPVX2Conditions(ConditionsBase):
burst_frame_count: float
calibration_types = {
'Offset': ['Burst Frame Count'],
'DynamicFF': ['Burst Frame Count'],
}
from dataclasses import dataclass
from cal_tools.calcat_interface2 import ConditionsBase
@dataclass
class ShimadzuHPVX2Conditions(ConditionsBase):
burst_frame_count: float
calibration_types = {
'Offset': ['Burst Frame Count'],
'DynamicFF': ['Burst Frame Count'],
}
from cal_tools.calcat_interface2 import ShimadzuHPVX2Conditions
class ShimadzuHPVX2:
channel = "daqOutput"
image_key = "data.image.pixels"
copy_keys = [
"data.image.binning",
......@@ -37,8 +26,7 @@ class ShimadzuHPVX2:
def conditions(self, dc: "DataCollection", module=None): # noqa: F821
if module is None:
source_pattern = self.source_name_pattern.format(
f"*:{self.channel}")
source_pattern = self.source_name_pattern.format('*')
det_dc = dc.select(source_pattern)
if not det_dc.instrument_sources:
raise ValueError("No detector sources are found")
......@@ -51,12 +39,17 @@ class ShimadzuHPVX2:
return ShimadzuHPVX2Conditions(burst_frame_count=float(num_frames))
def instrument_source(self, module: int):
source_name = self.source_name_pattern.format(module)
return f"{source_name}:{self.channel}"
return self.source_name_pattern.format(modno=module)
def corrected_source(self, module: int):
source_name = self.source_name_pattern.format(module)
source_name = self.source_name_pattern.format(modno=module)
# Replace type with CORR.
parts = source_name.split('/')
parts[1] = "CORR"
source_name = '/'.join(parts)
return f"{source_name}:output"
# Replace channel with output.
source_name = source_name[:source_name.index(':')] + ':output'
return source_name