Skip to content
Snippets Groups Projects
Commit 0c2f1d8a authored by Egor Sobolev's avatar Egor Sobolev Committed by Philipp Schmidt
Browse files

Add retrieving calibraion constants from DB in Correct_DynamicFF_NBC.ipynb

parent b074c2ea
No related branches found
No related tags found
1 merge request!939[Generic][Shimadzu] Dynamic flat-field characterization and correction for MHz microscopy
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Dynamic Flat-field Offline Correction # Dynamic Flat-field Offline Correction
Author: Egor Sobolev Author: Egor Sobolev
Offline dynamic flat-field correction Offline dynamic flat-field correction
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
in_folder = "/gpfs/exfel/exp/SPB/202430/p900425/raw" # input folder, required in_folder = "/gpfs/exfel/exp/SPB/202430/p900425/raw" # input folder, required
out_folder = '/gpfs/exfel/data/scratch/esobolev/test/shimadzu' # output folder, required out_folder ="/gpfs/exfel/exp/SPB/202430/p900425/scratch/proc/r0003" # output folder, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
run = 3 # which run to read data from, required run = 3 # which run to read data from, required
# Data files parameters. # Data files parameters.
karabo_da = ['HPVX01/1', 'HPVX01/2'] # data aggregators karabo_da = ['-1'] # data aggregators
karabo_id = "SPB_EHD_MIC" # karabo prefix of Shimadzu HPV-X2 devices karabo_id = "SPB_MIC_HPVX2" # karabo prefix of Shimadzu HPV-X2 devices
#receiver_id = "PNCCD_FMT-0" # inset for receiver devices
#path_template = 'RAW-R{:04d}-{}-S{{:05d}}.h5' # the template to use to access data
instrument_source_template = 'SPB_EHD_MIC/CAM/HPVX2_{module}:daqOutput' # data source path in h5file.
image_key = "data.image.pixels" # image data key in Karabo or exdf notation
# Database access parameters. # Database access parameters.
use_dir_creation_date = True # use dir creation date as data production reference date
cal_db_interface = "tcp://max-exfl-cal001:8021" # calibration DB interface to use cal_db_interface = "tcp://max-exfl-cal001:8021" # calibration DB interface to use
cal_db_timeout = 300000 # timeout on caldb requests
db_output = False # if True, the notebook sends dark constants to the calibration database
local_output = True # if True, the notebook saves dark constants locally
creation_time = "" # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC.00 e.g. 2019-07-04 11:02:41.00
# Correction parameters
n_components = 20 # number of principal components of flat-field to use in correction n_components = 20 # number of principal components of flat-field to use in correction
downsample_factors = [1, 1] # list of downsample factors for each image dimention (y, x) downsample_factors = [1, 1] # list of downsample factors for each image dimention (y, x)
constants_folder = "/gpfs/exfel/data/scratch/esobolev/test/shimadzu"
db_module_template = "Shimadzu_HPVX2_{}"
num_proc = 32 # number of processes running correction in parallel num_proc = 32 # number of processes running correction in parallel
corrected_source_template = 'SPB_EHD_MIC/CORR/HPVX2_{module}:output' # data source path in h5file.
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import os import os
import h5py import h5py
import warnings
from logging import warning
warnings.filterwarnings('ignore')
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from IPython.display import display, Markdown from IPython.display import display, Markdown
from datetime import datetime
from extra_data import RunDirectory, by_id from extra_data import RunDirectory, by_id
%matplotlib inline %matplotlib inline
from cal_tools.step_timing import StepTimer from cal_tools.step_timing import StepTimer
from cal_tools.files import sequence_trains, DataFile from cal_tools.files import sequence_trains, DataFile
from cal_tools.tools import get_dir_creation_date
from cal_tools.restful_config import calibration_client, restful_config
from cal_tools.calcat_interface2 import CalibrationData, setup_client
from cal_tools.shimadzu import ShimadzuHPVX2
from dynflatfield import ( from dynflatfield import (
DynamicFlatFieldCorrectionCython as DynamicFlatFieldCorrection, DynamicFlatFieldCorrectionCython as DynamicFlatFieldCorrection,
FlatFieldCorrectionFileProcessor FlatFieldCorrectionFileProcessor
) )
from dynflatfield.draw import plot_images, plot_camera_image from dynflatfield.draw import plot_images, plot_camera_image
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
index_group = image_key.partition('.')[0] creation_time = get_dir_creation_date(in_folder, run)
instrument, part, component = karabo_id.split('_') print(f"Creation time is {creation_time}")
aggregators = {} cc = calibration_client()
sources = {} pdus = cc.get_all_phy_det_units_from_detector(
source_to_db = {} {"detector_identifier": karabo_id})
print("Sources:")
for da in karabo_da: if not pdus["success"]:
aggr, _, module = da.partition('/') raise ValueException("Failed to retrieve PDUs")
instrument_source_name = instrument_source_template.format(
instrument=instrument, part=part, component=component, detector_info = pdus['data'][0]['detector']
module=module detector = ShimadzuHPVX2(detector_info["source_name_pattern"])
) index_group = detector.image_index_group
corrected_source_name = corrected_source_template.format( image_key = detector.image_key
instrument=instrument, part=part, component=component,
module=module
)
aggregators.setdefault(aggr, []).append(
(instrument_source_name, corrected_source_name))
sources[instrument_source_name] = aggr
source_to_db[instrument_source_name] = db_module_template.format(module)
print('-', instrument_source_name)
print()
print(f"Instrument {detector.instrument}")
print(f"Detector in use is {karabo_id}") print(f"Detector in use is {karabo_id}")
print(f"Instrument {instrument}")
modules = {}
for pdu in pdus["data"]:
db_module = pdu["physical_name"]
module = pdu["module_number"]
da = pdu["karabo_da"]
if karabo_da[0] != "-1" and da not in karabo_da:
continue
instrument_source_name = detector.instrument_source(module)
corrected_source_name = detector.corrected_source(module)
print('-', da, db_module, module, instrument_source_name)
modules[da] = dict(
db_module=db_module,
module=module,
raw_source_name=instrument_source_name,
corrected_source_name=corrected_source_name,
)
step_timer = StepTimer() step_timer = StepTimer()
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Calibration constants # Calibration constants
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
requested_conditions = { # !!! REMOVE IT for production
"frame_size": 1.0, # ---------------------------------------------------
} from cal_tools.restful_config import restful_config
from cal_tools.calcat_interface2 import setup_client
calcat_config = restful_config.get('calcat')
setup_client( # won't be needed in production
#base_url=calcat_config['base-api-url'].rpartition('/')[0],
base_url='https://in.xfel.eu/test_calibration',
client_id=calcat_config['user-id'],
client_secret=calcat_config['user-secret'],
user_email=calcat_config['user-email'],
)
caldb_root = "/gpfs/exfel/d/cal_tst/caldb_store"
creation_time = datetime.now()
# ===================================================
step_timer.start() step_timer.start()
corrections = {} dc = RunDirectory(f"{in_folder}/r{run:04d}")
constant_types = ["Offset", "DynamicFF"] conditions = detector.conditions(dc)
for source, db_module in source_to_db.items():
constants = {}
for constant_name in constant_types:
const_file = f"{constants_folder}/const_{constant_name}_{db_module}.h5"
if not os.path.isfile(const_file):
raise FileNotFoundError(f"{constant_name} constants are not found for {karabo_id}.")
with h5py.File(const_file, 'r') as f:
conditions = dict(
frame_size=int(f["condition/Frame Size/value"][()])
)
data = f["data"][:]
data_creation_time = f["creation_time"][()].decode()
if not all(conditions[key] == value for key, value in requested_conditions.items()):
raise ValueError("Conditions for {constant_name} are not match")
print(f"{source} {db_module} {constant_name}: {data_creation_time}")
constants[constant_name] = data
dark = constants["Offset"]
flat = constants["DynamicFF"][0]
components = constants["DynamicFF"][1:][:n_components]
dffc = DynamicFlatFieldCorrection.from_constants( caldata = CalibrationData.from_condition(
dark, flat, components, downsample_factors) conditions, 'SPB_MIC_HPVX2', event_at=creation_time)
corrections[source] = dffc aggregators = {}
corrections = {}
for da in modules:
try:
# !!! REMOVE caldb_root for production
dark = caldata["Offset", da].ndarray(caldb_root=caldb_root)
flat = caldata["DynamicFF", da].ndarray(caldb_root=caldb_root)
components = flat[1:][:n_components]
flat = flat[0]
dffc = DynamicFlatFieldCorrection.from_constants(
dark, flat, components, downsample_factors)
corrections[da] = dffc
file_da, _, _ = da.partition('/')
aggregators.setdefault(file_da, []).append(da)
except (KeyError, FileNotFoundError):
warning(f"Constants are not found for module {da}. "
"The module will not calibrated")
step_timer.done_step("Load calibration constants") step_timer.done_step("Load calibration constants")
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Correction # Correction
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Output Folder Creation:
os.makedirs(out_folder, exist_ok=True)
report = [] report = []
for aggr, sources in aggregators.items(): for file_da, file_modules in aggregators.items():
dc = RunDirectory(f"{in_folder}/r{run:04d}", f"RAW-R{run:04d}-{aggr}-S*.h5") dc = RunDirectory(f"{in_folder}/r{run:04d}", f"RAW-R{run:04d}-{file_da}-S*.h5")
# build train IDs
train_ids = set() train_ids = set()
keydata_cache = {} process_modules = []
for instrument_source, corrected_source in sources: for da in file_modules:
keydata = dc[instrument_source][image_key].drop_empty_trains() instrument_source = modules[da]["raw_source_name"]
train_ids.update(keydata.train_ids) if instrument_source in dc.all_sources:
keydata_cache[instrument_source] = keydata keydata = dc[instrument_source][image_key].drop_empty_trains()
train_ids.update(keydata.train_ids)
process_modules.append(da)
else:
print(f"Source {instrument_source} for module {da} is missed")
train_ids = np.array(sorted(train_ids)) train_ids = np.array(sorted(train_ids))
ts = dc.select_trains(by_id[train_ids]).train_timestamps().astype(np.uint64) ts = dc.select_trains(by_id[train_ids]).train_timestamps().astype(np.uint64)
# correct and write sequence files
for seq_id, train_mask in sequence_trains(train_ids, 200): for seq_id, train_mask in sequence_trains(train_ids, 200):
step_timer.start() step_timer.start()
print('* sequience', seq_id) print('* sequience', seq_id)
seq_train_ids = train_ids[train_mask] seq_train_ids = train_ids[train_mask]
seq_timestamps = ts[train_mask] seq_timestamps = ts[train_mask]
dc_seq = dc.select_trains(by_id[seq_train_ids]) dc_seq = dc.select_trains(by_id[seq_train_ids])
ntrains = len(seq_train_ids) ntrains = len(seq_train_ids)
# create output file # create output file
channels = [f"{s[1]}/{index_group}" for s in sources] channels = [f"{modules[da]['corrected_source_name']}/{index_group}"
for da in process_modules]
f = DataFile.from_details(out_folder, aggr, run, seq_id) f = DataFile.from_details(out_folder, file_da, run, seq_id)
f.create_metadata(like=dc, instrument_channels=channels) f.create_metadata(like=dc, instrument_channels=channels)
f.create_index(seq_train_ids, timestamps=seq_timestamps) f.create_index(seq_train_ids, timestamps=seq_timestamps)
# create file structure
seq_report = {} seq_report = {}
image_datasets = {} file_datasets = {}
for instrument_source, corrected_source in sources: for da in process_modules:
instrument_source = modules[da]["raw_source_name"]
keydata = dc_seq[instrument_source][image_key].drop_empty_trains() keydata = dc_seq[instrument_source][image_key].drop_empty_trains()
count = keydata.data_counts() count = keydata.data_counts()
i = np.flatnonzero(count.values) i = np.flatnonzero(count.values)
raw_images = keydata.select_trains(np.s_[i]).ndarray() raw_images = keydata.select_trains(np.s_[i]).ndarray()
# not pulse resolved # not pulse resolved
shape = keydata.shape shape = keydata.shape
count = np.in1d(seq_train_ids, keydata.train_ids).astype(int) count = np.in1d(seq_train_ids, keydata.train_ids).astype(int)
corrected_source = modules[da]["corrected_source_name"]
src = f.create_instrument_source(corrected_source) src = f.create_instrument_source(corrected_source)
src.create_index(index_group=count) src.create_index(index_group=count)
# create key for images
ds_data = src.create_key(image_key, shape=shape, dtype=np.float32) ds_data = src.create_key(image_key, shape=shape, dtype=np.float32)
image_datasets[corrected_source] = ds_data module_datasets = {image_key: ds_data}
# create keys for image parameters
for key in detector.copy_keys:
keydata = dc_seq[instrument_source][key].drop_empty_trains()
module_datasets[key] = (keydata, src.create_key(
key, shape=keydata.shape, dtype=keydata.dtype))
file_datasets[da] = module_datasets
step_timer.done_step("Create output file") step_timer.done_step("Create output file")
for instrument_source, corrected_source in sources: # correct and write data to file
for da in process_modules:
step_timer.start() step_timer.start()
dc_seq = dc.select_trains(by_id[seq_train_ids]) dc_seq = dc.select_trains(by_id[seq_train_ids])
dffc = corrections[instrument_source] dffc = corrections[da]
instrument_source = modules[da]["raw_source_name"]
proc = FlatFieldCorrectionFileProcessor(dffc, num_proc, instrument_source, image_key) proc = FlatFieldCorrectionFileProcessor(dffc, num_proc, instrument_source, image_key)
proc.start_workers() proc.start_workers()
proc.run(dc_seq) proc.run(dc_seq)
proc.join_workers() proc.join_workers()
# not pulse resolved # not pulse resolved
corrected_images = np.stack(proc.rdr.results, 0) corrected_images = np.stack(proc.rdr.results, 0)
image_datasets[corrected_source][:] = corrected_images file_datasets[da][image_key][:] = corrected_images
# copy image parameters
for key in detector.copy_keys:
keydata, ds = file_datasets[da][key]
ds[:] = keydata.ndarray()
seq_report[instrument_source] = (raw_images[0, 0], corrected_images[:20, 0]) seq_report[da] = (raw_images[0, 0], corrected_images[:20, 0])
step_timer.done_step("Correct flat-field") step_timer.done_step("Correct flat-field")
f.close() f.close()
report.append(seq_report) report.append(seq_report)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
step_timer.start() step_timer.start()
if report:
for source, (raw_image, corrected_images) in report[0].items(): for da, (raw_image, corrected_images) in report[0].items():
display(Markdown(f"# {source}")) source = modules[da]["raw_source_name"]
display(Markdown(f"## {source}"))
display(Markdown("## The first raw image"))
plot_camera_image(raw_images[0, 0]) display(Markdown("### The first raw image"))
plt.show() plot_camera_image(raw_images[0, 0])
plt.show()
display(Markdown("## The first corrected image"))
plot_camera_image(corrected_images[0]) display(Markdown("### The first corrected image"))
plt.show() plot_camera_image(corrected_images[0])
plt.show()
display(Markdown("## The first corrected images in the trains (up to 20)"))
plot_images(corrected_images, figsize=(13, 8)) display(Markdown("### The first corrected images in the trains (up to 20)"))
plt.show() plot_images(corrected_images, figsize=(13, 8))
plt.show()
step_timer.done_step("Draw images") step_timer.done_step("Draw images")
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
print(f"Total processing time {step_timer.timespan():.01f} s") print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary() step_timer.print_summary()
``` ```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment