Skip to content
Snippets Groups Projects
Commit d5d81e9d authored by Philipp Schmidt's avatar Philipp Schmidt
Browse files

Add actual CCV file creation and injection to DynamicFF characterization

parent b4f8ecd2
No related branches found
No related tags found
2 merge requests!995[Generic] Injection code for DynamicFF corrections,!939[Generic][Shimadzu] Dynamic flat-field characterization and correction for MHz microscopy
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Characterization of dark and flat field for Dynamic Flat Field correction # Characterization of dark and flat field for Dynamic Flat Field correction
Author: Egor Sobolev Author: Egor Sobolev
Computation of dark offsets and flat-field principal components Computation of dark offsets and flat-field principal components
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
in_folder = "/gpfs/exfel/exp/SPB/202430/p900425/raw" # input folder, required in_folder = "/gpfs/exfel/exp/SPB/202430/p900425/raw" # input folder, required
out_folder = '/gpfs/exfel/data/scratch/esobolev/test/shimadzu' # output folder, required out_folder = '/gpfs/exfel/data/scratch/esobolev/test/shimadzu' # output folder, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
run_high = 1 # run number in which dark data was recorded, required run_high = 1 # run number in which dark data was recorded, required
run_low = 2 # run number in which flat-field data was recorded, required run_low = 2 # run number in which flat-field data was recorded, required
operation_mode = "PCA_DynamicFF" # Detector operation mode, optional (defaults to "PCA_DynamicFF") operation_mode = "PCA_DynamicFF" # Detector operation mode, optional (defaults to "PCA_DynamicFF")
# Data files parameters. # Data files parameters.
karabo_da = ['-1'] # data aggregators karabo_da = ['-1'] # data aggregators
karabo_id = "SPB_MIC_HPVX2" # karabo prefix of Shimadzu HPV-X2 devices karabo_id = "SPB_MIC_HPVX2" # karabo prefix of Shimadzu HPV-X2 devices
# Database access parameters. # Database access parameters.
cal_db_interface = "tcp://max-exfl-cal001:8021" # Unused, calibration DB interface to use cal_db_interface = "tcp://max-exfl-cal001:8021" # Unused, calibration DB interface to use
cal_db_timeout = 30000 # Unused, calibration DB timeout cal_db_timeout = 30000 # Unused, calibration DB timeout
db_output = False # if True, the notebook sends dark constants to the calibration database db_output = False # if True, the notebook sends dark constants to the calibration database
local_output = True # if True, the notebook saves dark constants locally local_output = True # if True, the notebook saves dark constants locally
# Calibration constants parameters # Calibration constants parameters
n_components = 50 # Number of principal components of flat-field to compute (default: 50) n_components = 50 # Number of principal components of flat-field to compute (default: 50)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import datetime import datetime
import os import os
import warnings import warnings
from logging import warning from logging import warning
from shutil import copyfile from shutil import copyfile
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
warnings.filterwarnings('ignore') warnings.filterwarnings('ignore')
import time import time
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from IPython.display import display, Markdown from IPython.display import display, Markdown
from extra_data import RunDirectory from extra_data import RunDirectory
%matplotlib inline %matplotlib inline
from cal_tools.step_timing import StepTimer from cal_tools.step_timing import StepTimer
from cal_tools.tools import ( from cal_tools.tools import (
get_dir_creation_date, get_dir_creation_date,
run_prop_seq_from_path,
save_dict_to_hdf5 save_dict_to_hdf5
) )
from cal_tools.restful_config import calibration_client, extra_calibration_client from cal_tools.restful_config import calibration_client, extra_calibration_client
from cal_tools.shimadzu import ShimadzuHPVX2 from cal_tools.shimadzu import ShimadzuHPVX2
from cal_tools.constants import write_ccv, inject_ccv
import dynflatfield as dffc import dynflatfield as dffc
from dynflatfield.draw import plot_images, plot_camera_image from dynflatfield.draw import plot_images, plot_camera_image
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
extra_calibration_client() # Configure CalibrationData. extra_calibration_client() # Configure CalibrationData.
cc = calibration_client() cc = calibration_client()
pdus = cc.get_all_phy_det_units_from_detector( pdus = cc.get_all_phy_det_units_from_detector(
{"detector_identifier": karabo_id}) # TODO: Use creation_time for snapshot_at {"detector_identifier": karabo_id}) # TODO: Use creation_time for snapshot_at
if not pdus["success"]: if not pdus["success"]:
raise ValueError("Failed to retrieve PDUs") raise ValueError("Failed to retrieve PDUs")
detector_info = pdus['data'][0]['detector'] detector_info = pdus['data'][0]['detector']
detector = ShimadzuHPVX2(detector_info["source_name_pattern"]) detector = ShimadzuHPVX2(detector_info["source_name_pattern"])
print(f"Instrument {detector.instrument}") print(f"Instrument {detector.instrument}")
print(f"Detector in use is {karabo_id}") print(f"Detector in use is {karabo_id}")
modules = {} modules = {}
for pdu_no, pdu in enumerate(pdus["data"]): for pdu_no, pdu in enumerate(pdus["data"]):
db_module = pdu["physical_name"] db_module = pdu["physical_name"]
module = pdu["module_number"] module = pdu["module_number"]
da = pdu["karabo_da"] da = pdu["karabo_da"]
if karabo_da[0] != "-1" and da not in karabo_da: if karabo_da[0] != "-1" and da not in karabo_da:
continue continue
instrument_source_name = detector.instrument_source(module) instrument_source_name = detector.instrument_source(module)
print('-', da, db_module, module, instrument_source_name) print('-', da, db_module, module, instrument_source_name)
modules[da] = dict( modules[da] = dict(
db_module=db_module, db_module=db_module,
module=module, module=module,
raw_source_name=instrument_source_name, raw_source_name=instrument_source_name,
pdu_no=pdu_no, pdu_no=pdu_no,
) )
constants = {} constants = {}
step_timer = StepTimer() step_timer = StepTimer()
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Offset map # Offset map
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
dark_run = run_high dark_run = run_high
dark_creation_time = get_dir_creation_date(in_folder, dark_run) dark_creation_time = get_dir_creation_date(in_folder, dark_run)
print(f"Using {dark_creation_time} as creation time of Offset constant.") print(f"Using {dark_creation_time} as creation time of Offset constant.")
for da, meta in modules.items(): for da, meta in modules.items():
source_name = detector.instrument_source(meta["module"]) source_name = detector.instrument_source(meta["module"])
image_key = detector.image_key image_key = detector.image_key
display(Markdown(f"## {source_name}")) display(Markdown(f"## {source_name}"))
# read # read
step_timer.start() step_timer.start()
file_da, _, _ = da.partition('/') file_da, _, _ = da.partition('/')
dark_dc = RunDirectory(f"{in_folder}/r{dark_run:04d}", dark_dc = RunDirectory(f"{in_folder}/r{dark_run:04d}",
include=f"RAW-R{dark_run:04d}-{file_da}-S*.h5") include=f"RAW-R{dark_run:04d}-{file_da}-S*.h5")
if source_name not in dark_dc.all_sources: if source_name not in dark_dc.all_sources:
raise ValueError(f"Could not find source {source_name} for module {da} in dark data") raise ValueError(f"Could not find source {source_name} for module {da} in dark data")
dark_dc = dark_dc.select([(source_name, image_key)]) dark_dc = dark_dc.select([(source_name, image_key)])
conditions = detector.conditions(dark_dc, meta["module"]) conditions = detector.conditions(dark_dc, meta["module"])
key_data = dark_dc[source_name, image_key] key_data = dark_dc[source_name, image_key]
images_dark = key_data.ndarray() images_dark = key_data.ndarray()
ntrain, npulse, ny, nx = images_dark.shape ntrain, npulse, ny, nx = images_dark.shape
print(f"N image: {ntrain * npulse} (ntrain: {ntrain}, npulse: {npulse})") print(f"N image: {ntrain * npulse} (ntrain: {ntrain}, npulse: {npulse})")
print(f"Image size: {ny} x {nx} px") print(f"Image size: {ny} x {nx} px")
step_timer.done_step("Read dark images") step_timer.done_step("Read dark images")
# process # process
step_timer.start() step_timer.start()
dark = dffc.process_dark(images_dark) # Amounts to a per-pixel mean right now. dark = dffc.process_dark(images_dark) # Amounts to a per-pixel mean right now.
# put results in the dict # put results in the dict
module_constants = constants.setdefault(meta["db_module"], {}) module_constants = constants.setdefault(meta["db_module"], {})
module_constants["Offset"] = dict( module_constants["Offset"] = dict(
conditions=conditions, data=dark, pdu_no=meta["pdu_no"], conditions=conditions, data=dark, pdu_no=meta["pdu_no"],
creation_time=dark_creation_time creation_time=dark_creation_time, dims=['ss', 'fs']
) )
step_timer.done_step("Process dark images") step_timer.done_step("Process dark images")
display() display()
# draw plots # draw plots
step_timer.start() step_timer.start()
plot_camera_image(dark) plot_camera_image(dark)
plt.show() plt.show()
step_timer.done_step("Draw offsets") step_timer.done_step("Draw offsets")
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Flat-field PCA decomposition # Flat-field PCA decomposition
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
flat_run = run_low flat_run = run_low
flat_creation_time = get_dir_creation_date(in_folder, flat_run) flat_creation_time = get_dir_creation_date(in_folder, flat_run)
print(f"Using {flat_creation_time} as creation time of DynamicFF constant.") print(f"Using {flat_creation_time} as creation time of DynamicFF constant.")
for da, meta in modules.items(): for da, meta in modules.items():
source_name = detector.instrument_source(meta["module"]) source_name = detector.instrument_source(meta["module"])
image_key = detector.image_key image_key = detector.image_key
display(Markdown(f"## {source_name}")) display(Markdown(f"## {source_name}"))
# read # read
step_timer.start() step_timer.start()
file_da, _, _ = da.partition('/') file_da, _, _ = da.partition('/')
flat_dc = RunDirectory(f"{in_folder}/r{flat_run:04d}", flat_dc = RunDirectory(f"{in_folder}/r{flat_run:04d}",
include=f"RAW-R{flat_run:04d}-{file_da}-S*.h5") include=f"RAW-R{flat_run:04d}-{file_da}-S*.h5")
if source_name not in flat_dc.all_sources: if source_name not in flat_dc.all_sources:
raise ValueError(f"Could not find source {source_name} for module {da} in flatfield data") raise ValueError(f"Could not find source {source_name} for module {da} in flatfield data")
flat_dc = flat_dc.select([(source_name, image_key)]) flat_dc = flat_dc.select([(source_name, image_key)])
conditions = detector.conditions(flat_dc, meta["module"]) conditions = detector.conditions(flat_dc, meta["module"])
dark = constants[meta["db_module"]]["Offset"]["data"] dark = constants[meta["db_module"]]["Offset"]["data"]
dark_conditions = constants[meta["db_module"]]["Offset"]["conditions"] dark_conditions = constants[meta["db_module"]]["Offset"]["conditions"]
if conditions != dark_conditions: if conditions != dark_conditions:
raise ValueError(f"The conditions for flat-field run {conditions}) do not match " raise ValueError(f"The conditions for flat-field run {conditions}) do not match "
f"the dark run conditions ({dark_conditions}). Skip flat-field characterization.") f"the dark run conditions ({dark_conditions}). Skip flat-field characterization.")
key_data = flat_dc[source_name][image_key] key_data = flat_dc[source_name][image_key]
images_flat = key_data.ndarray() images_flat = key_data.ndarray()
ntrain, npulse, ny, nx = images_flat.shape ntrain, npulse, ny, nx = images_flat.shape
print(f"N image: {ntrain * npulse} (ntrain: {ntrain}, npulse: {npulse})") print(f"N image: {ntrain * npulse} (ntrain: {ntrain}, npulse: {npulse})")
print(f"Image size: {ny} x {nx} px") print(f"Image size: {ny} x {nx} px")
step_timer.done_step("Read flat-field images") step_timer.done_step("Read flat-field images")
# process # process
step_timer.start() step_timer.start()
flat, components, explained_variance_ratio = dffc.process_flat( flat, components, explained_variance_ratio = dffc.process_flat(
images_flat, dark, n_components) images_flat, dark, n_components)
flat_data = np.concatenate([flat[None, ...], components]) flat_data = np.concatenate([flat[None, ...], components])
# put results in the dict # put results in the dict
conditions = detector.conditions(flat_dc, meta["module"]) conditions = detector.conditions(flat_dc, meta["module"])
module_constants = constants.setdefault(meta["db_module"], {}) module_constants = constants.setdefault(meta["db_module"], {})
module_constants["DynamicFF"] = dict( module_constants["DynamicFF"] = dict(
conditions=conditions, data=flat_data, pdu_no=meta["pdu_no"], conditions=conditions, data=flat_data, pdu_no=meta["pdu_no"],
creation_time=flat_creation_time creation_time=flat_creation_time, dims=['component', 'ss', 'fs']
) )
step_timer.done_step("Process flat-field images") step_timer.done_step("Process flat-field images")
# draw plots # draw plots
step_timer.start() step_timer.start()
display(Markdown("### Average flat-field")) display(Markdown("### Average flat-field"))
plot_camera_image(flat) plot_camera_image(flat)
plt.show() plt.show()
display(Markdown("### Explained variance ratio")) display(Markdown("### Explained variance ratio"))
fig, ax = plt.subplots(1, 1, figsize=(10,4), tight_layout=True) fig, ax = plt.subplots(1, 1, figsize=(10,4), tight_layout=True)
ax.semilogy(explained_variance_ratio, 'o') ax.semilogy(explained_variance_ratio, 'o')
ax.set_xticks(np.arange(len(explained_variance_ratio))) ax.set_xticks(np.arange(len(explained_variance_ratio)))
ax.set_xlabel("Component no.") ax.set_xlabel("Component no.")
ax.set_ylabel("Variance fraction") ax.set_ylabel("Variance fraction")
plt.show() plt.show()
display(Markdown("### The first principal components (up to 20)")) display(Markdown("### The first principal components (up to 20)"))
plot_images(components[:20], figsize=(13, 8)) plot_images(components[:20], figsize=(13, 8))
plt.show() plt.show()
step_timer.done_step("Draw flat-field") step_timer.done_step("Draw flat-field")
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Calibration constants ## Calibration constants
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
step_timer.start() step_timer.start()
_, proposal, _ = run_prop_seq_from_path(in_folder)
# Output Folder Creation: # Output Folder Creation:
if local_output: if local_output:
os.makedirs(out_folder, exist_ok=True) os.makedirs(out_folder, exist_ok=True)
def inject_ccv(in_folder, metadata_folder, runs, calibration, cond, pdu, const_input, begin_at):
print("* Send to db:", const_input)
print(" - in folder:", in_folder)
print(" - metadata folder:", metadata_folder)
print(" - runs:", runs)
print(" -", calibration)
print(" -", cond)
print(" -", begin_at)
for db_module, module_constants in constants.items(): for db_module, module_constants in constants.items():
for constant_name, constant in module_constants.items(): for constant_name, constant in module_constants.items():
conditions = constant["conditions"] conditions = constant["conditions"]
conditions_dict = conditions.make_dict( pdu = pdus["data"][constant["pdu_no"]]
conditions.calibration_types[constant_name])
data_to_store = {db_module: {constant_name: {'0': {
'conditions': conditions_dict,
'data': constant["data"],
}}}}
with NamedTemporaryFile() as tempf: with NamedTemporaryFile() as tempf:
save_dict_to_hdf5(data_to_store, tempf) ccv_root = write_ccv(
tempf.name,
pdu['physical_name'], pdu['uuid'], pdu['detector_type']['name'],
constant_name, conditions, constant['creation_time'],
proposal, [dark_run, flat_run],
constant["data"], constant['dims'])
if db_output: if db_output:
inject_ccv( inject_ccv(tempf.name, ccv_root, metadata_folder)
in_folder, metadata_folder, [dark_run, flat_run],
constant_name, conditions, pdus["data"][constant["pdu_no"]],
ofile, constant["creation_time"]
)
if local_output: if local_output:
ofile = f"{out_folder}/const_{constant_name}_{db_module}.h5" ofile = f"{out_folder}/const_{constant_name}_{db_module}.h5"
if os.path.isfile(ofile): if os.path.isfile(ofile):
print(f'File {ofile} already exists and will be overwritten') print(f'File {ofile} already exists and will be overwritten')
copyfile(tempf.name, ofile) copyfile(tempf.name, ofile)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
print(f"Total processing time {step_timer.timespan():.01f} s") print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary() step_timer.print_summary()
``` ```
......
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Dynamic Flat-field Offline Correction # Dynamic Flat-field Offline Correction
Author: Egor Sobolev Author: Egor Sobolev
Offline dynamic flat-field correction Offline dynamic flat-field correction
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
in_folder = "/gpfs/exfel/exp/SPB/202430/p900425/raw" # input folder, required in_folder = "/gpfs/exfel/exp/SPB/202430/p900425/raw" # input folder, required
out_folder ="/gpfs/exfel/exp/SPB/202430/p900425/scratch/proc/r0003" # output folder, required out_folder ="/gpfs/exfel/exp/SPB/202430/p900425/scratch/proc/r0003" # output folder, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
run = 3 # which run to read data from, required run = 3 # which run to read data from, required
# Data files parameters. # Data files parameters.
karabo_da = ['-1'] # data aggregators karabo_da = ['-1'] # data aggregators
karabo_id = "SPB_MIC_HPVX2" # karabo prefix of Shimadzu HPV-X2 devices karabo_id = "SPB_MIC_HPVX2" # karabo prefix of Shimadzu HPV-X2 devices
# Database access parameters. # Database access parameters.
cal_db_interface = "tcp://max-exfl-cal001:8021" # Unused, calibration DB interface to use cal_db_interface = "tcp://max-exfl-cal001:8021" # Unused, calibration DB interface to use
cal_db_timeout = 30000 # Unused, calibration DB timeout cal_db_timeout = 30000 # Unused, calibration DB timeout
# Correction parameters # Correction parameters
n_components = 20 # number of principal components of flat-field to use in correction n_components = 20 # number of principal components of flat-field to use in correction
downsample_factors = [1, 1] # list of downsample factors for each image dimention (y, x) downsample_factors = [1, 1] # list of downsample factors for each image dimention (y, x)
num_proc = 32 # number of processes running correction in parallel num_proc = 32 # number of processes running correction in parallel
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import os import os
import h5py import h5py
import warnings import warnings
from logging import warning from logging import warning
warnings.filterwarnings('ignore') warnings.filterwarnings('ignore')
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from IPython.display import display, Markdown from IPython.display import display, Markdown
from datetime import datetime from datetime import datetime
from extra_data import RunDirectory, by_id from extra_data import RunDirectory, by_id
%matplotlib inline %matplotlib inline
from cal_tools.step_timing import StepTimer from cal_tools.step_timing import StepTimer
from cal_tools.files import sequence_trains, DataFile from cal_tools.files import sequence_trains, DataFile
from cal_tools.tools import get_dir_creation_date from cal_tools.tools import get_dir_creation_date
from cal_tools.restful_config import calibration_client, extra_calibration_client from cal_tools.restful_config import calibration_client, extra_calibration_client
from cal_tools.calcat_interface2 import CalibrationData from cal_tools.calcat_interface2 import CalibrationData
from cal_tools.shimadzu import ShimadzuHPVX2 from cal_tools.shimadzu import ShimadzuHPVX2
from dynflatfield import ( from dynflatfield import (
DynamicFlatFieldCorrectionCython as DynamicFlatFieldCorrection, DynamicFlatFieldCorrectionCython as DynamicFlatFieldCorrection,
FlatFieldCorrectionFileProcessor FlatFieldCorrectionFileProcessor
) )
from dynflatfield.draw import plot_images, plot_camera_image from dynflatfield.draw import plot_images, plot_camera_image
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
creation_time = get_dir_creation_date(in_folder, run) creation_time = get_dir_creation_date(in_folder, run)
print(f"Creation time is {creation_time}") print(f"Creation time is {creation_time}")
extra_calibration_client() # Configure CalibrationData API. extra_calibration_client() # Configure CalibrationData API.
cc = calibration_client() cc = calibration_client()
pdus = cc.get_all_phy_det_units_from_detector( pdus = cc.get_all_phy_det_units_from_detector(
{"detector_identifier": karabo_id}) # TODO: Use creation_time for snapshot_at {"detector_identifier": karabo_id}) # TODO: Use creation_time for snapshot_at
if not pdus["success"]: if not pdus["success"]:
raise ValueError("Failed to retrieve PDUs") raise ValueError("Failed to retrieve PDUs")
detector_info = pdus['data'][0]['detector'] detector_info = pdus['data'][0]['detector']
detector = ShimadzuHPVX2(detector_info["source_name_pattern"]) detector = ShimadzuHPVX2(detector_info["source_name_pattern"])
index_group = detector.image_index_group index_group = detector.image_index_group
image_key = detector.image_key image_key = detector.image_key
print(f"Instrument {detector.instrument}") print(f"Instrument {detector.instrument}")
print(f"Detector in use is {karabo_id}") print(f"Detector in use is {karabo_id}")
modules = {} modules = {}
for pdu in pdus["data"]: for pdu in pdus["data"]:
db_module = pdu["physical_name"] db_module = pdu["physical_name"]
module = pdu["module_number"] module = pdu["module_number"]
da = pdu["karabo_da"] da = pdu["karabo_da"]
if karabo_da[0] != "-1" and da not in karabo_da: if karabo_da[0] != "-1" and da not in karabo_da:
continue continue
instrument_source_name = detector.instrument_source(module) instrument_source_name = detector.instrument_source(module)
corrected_source_name = detector.corrected_source(module) corrected_source_name = detector.corrected_source(module)
print('-', da, db_module, module, instrument_source_name) print('-', da, db_module, module, instrument_source_name)
modules[da] = dict( modules[da] = dict(
db_module=db_module, db_module=db_module,
module=module, module=module,
raw_source_name=instrument_source_name, raw_source_name=instrument_source_name,
corrected_source_name=corrected_source_name, corrected_source_name=corrected_source_name,
) )
step_timer = StepTimer() step_timer = StepTimer()
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Calibration constants # Calibration constants
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
step_timer.start() step_timer.start()
dc = RunDirectory(f"{in_folder}/r{run:04d}") dc = RunDirectory(f"{in_folder}/r{run:04d}")
conditions = detector.conditions(dc) conditions = detector.conditions(dc)
caldata = CalibrationData.from_condition( caldata = CalibrationData.from_condition(
conditions, 'SPB_MIC_HPVX2', event_at=creation_time) conditions, 'SPB_MIC_HPVX2', event_at=creation_time)
aggregators = {} aggregators = {}
corrections = {} corrections = {}
for da in modules: for da in modules:
try: try:
# !!! REMOVE caldb_root for production dark = caldata["Offset", da].ndarray()
dark = caldata["Offset", da].ndarray(caldb_root=caldb_root) flat = caldata["DynamicFF", da].ndarray()
flat = caldata["DynamicFF", da].ndarray(caldb_root=caldb_root)
components = flat[1:][:n_components] components = flat[1:][:n_components]
flat = flat[0] flat = flat[0]
dffc = DynamicFlatFieldCorrection.from_constants( dffc = DynamicFlatFieldCorrection.from_constants(
dark, flat, components, downsample_factors) dark, flat, components, downsample_factors)
corrections[da] = dffc corrections[da] = dffc
file_da, _, _ = da.partition('/') file_da, _, _ = da.partition('/')
aggregators.setdefault(file_da, []).append(da) aggregators.setdefault(file_da, []).append(da)
except (KeyError, FileNotFoundError): except (KeyError, FileNotFoundError):
warning(f"Constants are not found for module {da}. " warning(f"Constants are not found for module {da}. "
"The module will not calibrated") "The module will not calibrated")
step_timer.done_step("Load calibration constants") step_timer.done_step("Load calibration constants")
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Correction # Correction
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Output Folder Creation: # Output Folder Creation:
os.makedirs(out_folder, exist_ok=True) os.makedirs(out_folder, exist_ok=True)
report = [] report = []
for file_da, file_modules in aggregators.items(): for file_da, file_modules in aggregators.items():
dc = RunDirectory(f"{in_folder}/r{run:04d}", f"RAW-R{run:04d}-{file_da}-S*.h5") dc = RunDirectory(f"{in_folder}/r{run:04d}", f"RAW-R{run:04d}-{file_da}-S*.h5")
# build train IDs # build train IDs
train_ids = set() train_ids = set()
process_modules = [] process_modules = []
for da in file_modules: for da in file_modules:
instrument_source = modules[da]["raw_source_name"] instrument_source = modules[da]["raw_source_name"]
if instrument_source in dc.all_sources: if instrument_source in dc.all_sources:
keydata = dc[instrument_source][image_key].drop_empty_trains() keydata = dc[instrument_source][image_key].drop_empty_trains()
train_ids.update(keydata.train_ids) train_ids.update(keydata.train_ids)
process_modules.append(da) process_modules.append(da)
else: else:
print(f"Source {instrument_source} for module {da} is missed") print(f"Source {instrument_source} for module {da} is missed")
train_ids = np.array(sorted(train_ids)) train_ids = np.array(sorted(train_ids))
ts = dc.select_trains(by_id[train_ids]).train_timestamps().astype(np.uint64) ts = dc.select_trains(by_id[train_ids]).train_timestamps().astype(np.uint64)
# correct and write sequence files # correct and write sequence files
for seq_id, train_mask in sequence_trains(train_ids, 200): for seq_id, train_mask in sequence_trains(train_ids, 200):
step_timer.start() step_timer.start()
print('* sequence', seq_id) print('* sequence', seq_id)
seq_train_ids = train_ids[train_mask] seq_train_ids = train_ids[train_mask]
seq_timestamps = ts[train_mask] seq_timestamps = ts[train_mask]
dc_seq = dc.select_trains(by_id[seq_train_ids]) dc_seq = dc.select_trains(by_id[seq_train_ids])
ntrains = len(seq_train_ids) ntrains = len(seq_train_ids)
# create output file # create output file
channels = [f"{modules[da]['corrected_source_name']}/{index_group}" channels = [f"{modules[da]['corrected_source_name']}/{index_group}"
for da in process_modules] for da in process_modules]
f = DataFile.from_details(out_folder, file_da, run, seq_id) f = DataFile.from_details(out_folder, file_da, run, seq_id)
f.create_metadata(like=dc, instrument_channels=channels) f.create_metadata(like=dc, instrument_channels=channels)
f.create_index(seq_train_ids, timestamps=seq_timestamps) f.create_index(seq_train_ids, timestamps=seq_timestamps)
# create file structure # create file structure
seq_report = {} seq_report = {}
file_datasets = {} file_datasets = {}
for da in process_modules: for da in process_modules:
instrument_source = modules[da]["raw_source_name"] instrument_source = modules[da]["raw_source_name"]
keydata = dc_seq[instrument_source][image_key].drop_empty_trains() keydata = dc_seq[instrument_source][image_key].drop_empty_trains()
count = keydata.data_counts(labelled=False) count = keydata.data_counts(labelled=False)
i = np.flatnonzero(count) i = np.flatnonzero(count)
raw_images = keydata.select_trains(np.s_[i]).ndarray() raw_images = keydata.select_trains(np.s_[i]).ndarray()
# not pulse resolved # not pulse resolved
shape = keydata.shape shape = keydata.shape
count = np.in1d(seq_train_ids, keydata.train_ids).astype(int) count = np.in1d(seq_train_ids, keydata.train_ids).astype(int)
corrected_source = modules[da]["corrected_source_name"] corrected_source = modules[da]["corrected_source_name"]
src = f.create_instrument_source(corrected_source) src = f.create_instrument_source(corrected_source)
src.create_index(index_group=count) src.create_index(index_group=count)
# create key for images # create key for images
ds_data = src.create_key(image_key, shape=shape, dtype=np.float32) ds_data = src.create_key(image_key, shape=shape, dtype=np.float32)
module_datasets = {image_key: ds_data} module_datasets = {image_key: ds_data}
# create keys for image parameters # create keys for image parameters
for key in detector.copy_keys: for key in detector.copy_keys:
keydata = dc_seq[instrument_source][key].drop_empty_trains() keydata = dc_seq[instrument_source][key].drop_empty_trains()
module_datasets[key] = (keydata, src.create_key( module_datasets[key] = (keydata, src.create_key(
key, shape=keydata.shape, dtype=keydata.dtype)) key, shape=keydata.shape, dtype=keydata.dtype))
file_datasets[da] = module_datasets file_datasets[da] = module_datasets
step_timer.done_step("Create output file") step_timer.done_step("Create output file")
# correct and write data to file # correct and write data to file
for da in process_modules: for da in process_modules:
step_timer.start() step_timer.start()
dc_seq = dc.select_trains(by_id[seq_train_ids]) dc_seq = dc.select_trains(by_id[seq_train_ids])
dffc = corrections[da] dffc = corrections[da]
instrument_source = modules[da]["raw_source_name"] instrument_source = modules[da]["raw_source_name"]
proc = FlatFieldCorrectionFileProcessor(dffc, num_proc, instrument_source, image_key) proc = FlatFieldCorrectionFileProcessor(dffc, num_proc, instrument_source, image_key)
proc.start_workers() proc.start_workers()
proc.run(dc_seq) proc.run(dc_seq)
proc.join_workers() proc.join_workers()
# not pulse resolved # not pulse resolved
corrected_images = np.stack(proc.rdr.results, 0) corrected_images = np.stack(proc.rdr.results, 0)
file_datasets[da][image_key][:] = corrected_images file_datasets[da][image_key][:] = corrected_images
# copy image parameters # copy image parameters
for key in detector.copy_keys: for key in detector.copy_keys:
keydata, ds = file_datasets[da][key] keydata, ds = file_datasets[da][key]
ds[:] = keydata.ndarray() ds[:] = keydata.ndarray()
seq_report[da] = (raw_images[0, 0], corrected_images[:20, 0]) seq_report[da] = (raw_images[0, 0], corrected_images[:20, 0])
step_timer.done_step("Correct flat-field") step_timer.done_step("Correct flat-field")
f.close() f.close()
report.append(seq_report) report.append(seq_report)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
step_timer.start() step_timer.start()
if report: if report:
for da, (raw_image, corrected_images) in report[0].items(): for da, (raw_image, corrected_images) in report[0].items():
source = modules[da]["raw_source_name"] source = modules[da]["raw_source_name"]
display(Markdown(f"## {source}")) display(Markdown(f"## {source}"))
display(Markdown("### The first raw image")) display(Markdown("### The first raw image"))
plot_camera_image(raw_images[0, 0]) plot_camera_image(raw_images[0, 0])
plt.show() plt.show()
display(Markdown("### The first corrected image")) display(Markdown("### The first corrected image"))
plot_camera_image(corrected_images[0]) plot_camera_image(corrected_images[0])
plt.show() plt.show()
display(Markdown("### The first corrected images in the trains (up to 20)")) display(Markdown("### The first corrected images in the trains (up to 20)"))
plot_images(corrected_images, figsize=(13, 8)) plot_images(corrected_images, figsize=(13, 8))
plt.show() plt.show()
step_timer.done_step("Draw images") step_timer.done_step("Draw images")
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
print(f"Total processing time {step_timer.timespan():.01f} s") print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary() step_timer.print_summary()
``` ```
......
from datetime import datetime, timezone
from struct import pack, unpack
from pathlib import Path
from shutil import copyfile
from hashlib import md5
import binascii
import time
import numpy as np
import h5py
from calibration_client import CalibrationClient
from cal_tools.calcat_interface2 import _get_default_caldb_root, get_client
from cal_tools.tools import run_prop_seq_from_path
from cal_tools.restful_config import calibration_client
def write_ccv(
const_path,
pdu_name, pdu_uuid, detector_type,
calibration, conditions, created_at, proposal, runs,
data, dims, key='0'
):
"""Write CCV data file.
Args:
const_path (os.PathLike): Path to CCV file to write
pdu_name (str): Physical detector unit name
pdu_uuid (int): Physical detector unit UUID
detector_type (str): Detector type name
calibration (str): Calibration name
conditions (ConditionsBase): Detector operating conditions
created_at (datetime): Validity start for calibration
proposal (int): Raw data proposal the calibration data is
generated from
runs (Iterable of int): Raw data runs the calibration data is
generated from
data (ndarray): Calibration constant data
dims (Iterable of str):
key (str, optional):
Returns:
(str) CCV HDF group name.
"""
if data.ndim != len(dims):
raise ValueError('data.ndims != len(dims)')
with h5py.File(const_path, 'a') as const_file:
const_file.attrs['version'] = 0
pdu_group = const_file.require_group(pdu_name)
pdu_group.attrs['uuid'] = pdu_uuid
pdu_group.attrs['detector_type'] = detector_type
calibration_group = pdu_group.require_group(calibration)
if key is None:
key = str(len(calibration_group))
ccv_group = calibration_group.create_group(key)
ccv_group.attrs['begin_at'] = created_at.isoformat()
ccv_group.attrs['proposal'] = proposal
ccv_group.attrs['runs'] = np.array(runs, dtype=np.int32)
ccv_group_name = ccv_group.name
opcond_group = ccv_group.create_group('operating_condition')
opcond_dict = conditions.make_dict(
conditions.calibration_types[calibration])
for db_name, value in opcond_dict.items():
key = db_name.lower().replace(' ', '_')
dset = opcond_group.create_dataset(key, data=value,
dtype=np.float64)
dset.attrs['lower_deviation'] = 0.0
dset.attrs['upper_deviation'] = 0.0
dset.attrs['database_name'] = db_name
dset = ccv_group.create_dataset('data', data=data)
dset.attrs['dims'] = dims
return ccv_group_name
def inject_ccv(const_src, ccv_root, report_to=None):
"""Inject new CCV into CalCat.
Args:
const_path (str or Path): Path to CCV data file.
ccv_root (str): CCV HDF group name.
report_to (str): Metadata location.
Returns:
None
Raises:
RuntimeError: If CalCat POST request fails.
"""
pdu_name, calibration, key = ccv_root.lstrip('/').split('/')
with h5py.File(const_src, 'r') as const_file:
pdu_group = const_file[pdu_name]
pdu_uuid = pdu_group.attrs['uuid']
detector_type = pdu_group.attrs['detector_type']
ccv_group = const_file[ccv_root]
proposal, runs = ccv_group.attrs['proposal'], ccv_group.attrs['runs']
begin_at_str = ccv_group.attrs['begin_at']
condition_group = ccv_group['operating_condition']
cond_params = []
# It's really not ideal we're mixing conditionS and condition now.
for parameter in condition_group:
param_dset = condition_group[parameter]
cond_params.append({
'parameter_name': param_dset.attrs['database_name'],
'value': float(param_dset[()]),
'lower_deviation_value': param_dset.attrs['lower_deviation'],
'upper_deviation_value': param_dset.attrs['upper_deviation'],
'flg_available': True
})
const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}'
const_filename = f'cal.{time.time()}.h5'
if proposal and len(runs) > 0:
raw_data_location = 'proposal:{} runs: {}'.format(
proposal, ' '.join([str(x) for x in runs]))
else:
pass # Fallback for non-run based constants
# Generate condition name.
unique_name = detector_type[:detector_type.index('-Type')] + ' Def'
cond_hash = md5(pdu_name.encode())
cond_hash.update(int(pdu_uuid).to_bytes(
length=8, byteorder='little', signed=False))
for param_dict in cond_params:
cond_hash.update(str(param_dict['parameter_name']).encode())
cond_hash.update(str(param_dict['value']).encode())
unique_name += binascii.b2a_base64(cond_hash.digest()).decode()
unique_name = unique_name[:60]
# Add PDU "UUID" to parameters.
cond_params.append({
'parameter_name': 'Detector UUID',
'value': unpack('d', pack('q', pdu_uuid))[0],
'lower_deviation_value': 0.0,
'upper_deviation_value': 0.0,
'flg_available': True
})
inject_h = {
'detector_condition': {
'name': unique_name,
'parameters': cond_params
},
'calibration_constant': {
'calibration_name': calibration,
'detector_type_name': detector_type,
'flg_auto_approve': True
},
'calibration_constant_version': {
'raw_data_location': raw_data_location,
'file_name': const_filename,
'path_to_file': const_rel_path,
'data_set_name': f'{pdu_name}/{calibration}/0',
'start_idx': '0',
'end_idx': '0',
'begin_validity_at': begin_at_str,
'end_validity_at': '',
'begin_at': begin_at_str,
'pdu_physical_name': pdu_name,
'flg_good_quality': True
}
}
if report_to:
report_path = Path(report_to).absolute().with_suffix('.pdf')
inject_h['report'] = {
'name': report_path.stem,
'file_path': str(report_path)
}
const_dest = _get_default_caldb_root() / const_rel_path / const_filename
const_dest.parent.mkdir(parents=True, exist_ok=True)
copyfile(const_src, const_dest)
resp = CalibrationClient.inject_new_calibration_constant_version(
calibration_client(), inject_h)
if not resp['success']:
const_dest.unlink() # Delete already copied CCV file.
raise RuntimeError(resp)
def inject_ccv_legacy(in_folder, metadata_folder, runs, calibration, cond,
pdu, const_input, begin_at):
"""Inject new CCV into CalCat.
LEGACY VERSION - DO NOT USE
(from before most details were saved in CCV file itself)
metadata_folder
calibration -> in file, but pass anyway as there could be multiple
const_input -> doh!
Recommended way of running this from pycalibration notebooks:
inject_ccv(
in_folder, # Same as notebook parameter
metadata_folder, # Same as notebook parameter
[run_high, run_mid, run_low],
'Offset',
current_pdu, # Element of calibration_client.from
# get_all_phy_det_units_from_detector
out_folder / 'my-constant-file.h5',
get_dir_creation_date(in_folder, run_high)
)
Args:
in_folder (str or Path): Root for input data, i.e. *not*
containing r{run:04d} folder
metadata_folder (str or Path): Metadata location
runs (Iterable of str or int): Run number(s) used for characterization.
calibration (str): Calibration name, e.g. 'Offset', must exist in CalCat.
cond (ConditionsBase): Operating conditions for injected CCV.
pdu (dict): PDU information, should be passed as given by CalCat.
const_input (str or Path): Path to local calibration constant HDF file.
begin_at (datetime): Begin of calibration constant validity.
Returns:
None
Raises:
RuntimeError: If CalCat POST request fails.
"""
cond_dict = cond.make_dict(cond.calibration_types[calibration])
pdu_name = pdu['physical_name']
detector_type = pdu['detector_type']['name']
const_root = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}'
const_filename = f'cal.{time.time()}.h5'
report_path = Path(metadata_folder).with_suffix('.pdf')
_, proposal, _ = run_prop_seq_from_path(in_folder)
raw_data_location = 'proposal:{} runs: {}'.format(
proposal, ' '.join([str(x) for x in runs]))
# Generate condition name.
unique_name = detector_type[:detector_type.index('-Type')] + ' Def'
cond_hash = hashlib.md5(pdu_name.encode())
cond_hash.update(pdu['uuid'].to_bytes(
length=8, byteorder='little', signed=False))
for key, value in cond_dict.items():
cond_hash.update(str(key).encode())
cond_hash.update(str(value).encode())
unique_name += binascii.b2a_base64(cond_hash.digest()).decode()
unique_name = unique_name[:60]
# Build condition dict for CalCat.
cond_params = [
{
'parameter_name': key,
'value': value,
'lower_deviation_value': 0.0,
'upper_deviation_value': 0.0,
'flg_available': True
}
for key, value in cond_dict.items()
]
# Add PDU "UUID" to parameters.
cond_params.append({
'parameter_name': 'Detector UUID',
'value': unpack('d', pack('q', pdu['uuid']))[0],
'lower_deviation_value': 0.0,
'upper_deviation_value': 0.0,
'flg_available': True
})
inject_h = {
'report': {
'name': report_path.stem,
'file_path': str(report_path)
},
'detector_condition': {
'name': unique_name,
'parameters': cond_params
},
'calibration_constant': {
'calibration_name': calibration,
'detector_type_name': detector_type,
'flg_auto_approve': True
},
'calibration_constant_version': {
'raw_data_location': raw_data_location,
'file_name': const_filename,
'path_to_file': const_root,
'data_set_name': f'{pdu_name}/{calibration}/0',
'start_idx': '0',
'end_idx': '0',
'begin_validity_at': begin_at.isoformat(),
'end_validity_at': '',
'begin_at': begin_at.isoformat(),
'pdu_physical_name': pdu_name,
'flg_good_quality': True
}
}
resp = CalibrationClient.inject_new_calibration_constant_version(
calibration_client(), inject_h)
if not resp['success']:
raise RuntimeError(resp)
const_path = _get_default_caldb_root() / const_root / const_filename
const_path.parent.mkdir(parents=True, exist_ok=True)
copyfile(const_input, const_path)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment