Skip to content
Snippets Groups Projects
Commit 2f2af63e authored by Karim Ahmed's avatar Karim Ahmed
Browse files

avoid nb crash when error happens

parent 44cefdcc
No related branches found
No related tags found
1 merge request!303Fix: avoid nb crash when error happens
%% Cell type:markdown id: tags:
# Jungfrau Offline Correction #
Author: European XFEL Detector Group, Version: 0.1
Offline Calibration for the Jungfrau Detector
%% Cell type:code id: tags:
``` python
cluster_profile = "noDB" # cluster profile to use
in_folder = "/gpfs/exfel/exp/SPB/201922/p002566/raw/" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/karnem/test/006" # the folder to output to, required
sequences = [-1] # sequences to correct, set to -1 for all, range allowed
run = 189 # runs to process, required
karabo_id = "SPB_IRDA_JNGFR" # karabo prefix of Jungfrau devices
karabo_da = ['JNGFR02', 'JNGFR03'] # data aggregators
receiver_id = "MODULE_{}" # inset for receiver devices
receiver_control_id = "CONTROL" # inset for control devices
path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # template to use for file name, double escape sequence number
h5path = '/INSTRUMENT/{}/DET/{}:daqOutput/data' # path in H5 file under which images are located
h5path_run = '/RUN/{}/DET/{}' # path to run data
h5path_cntrl = '/CONTROL/{}/DET/{}' # path to control data
karabo_id_control = "" # if control is on a different ID, set to empty string if it is the same a karabo-id
karabo_da_control = "JNGFR01" # file inset for control data
use_dir_creation_date = True # use the creation data of the input dir for database queries
cal_db_interface = "tcp://max-exfl017:8017#8025" #"tcp://max-exfl016:8015#8025" # the database interface to use
cal_db_timeout = 180000 # timeout on caldb requests",
overwrite = True # set to True if existing data should be overwritten
no_relative_gain = False # do not do relative gain correction
bias_voltage = 180 # will be overwritten by value in file
sequences_per_node = 5 # number of sequence files per cluster node if run as slurm job, set to 0 to not run SLURM parallel
photon_energy = 9.2 # photon energy in keV
chunk_size_idim = 1 # chunking size of imaging dimension, adjust if user software is sensitive to this.
integration_time = 4.96 # integration time in us, will be overwritten by value in file
gmapfile = "" # variable is not used but left here for back compatibility
db_module = ["Jungfrau_M035", "Jungfrau_M273"] # ID of module in calibration database
manual_slow_data = False # if true, use manually entered bias_voltage and integration_time values
chunk_size = 0
def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da):
from xfel_calibrate.calibrate import balance_sequences as bs
return bs(in_folder, run, sequences, sequences_per_node, karabo_da)
```
%% Cell type:code id: tags:
``` python
import time
from ipyparallel import Client
from functools import partial
import tabulate
from IPython.display import display, Latex
import copy
import h5py
import os
from cal_tools.tools import (map_modules_from_folder, get_dir_creation_date,
get_constant_from_db_and_time)
from iCalibrationDB import (ConstantMetaData, Constants, Conditions, Detectors, Versions)
from cal_tools.enums import BadPixels
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import warnings
warnings.filterwarnings('ignore')
matplotlib.use('agg')
%matplotlib inline
```
%% Cell type:code id: tags:
``` python
client = Client(profile=cluster_profile)
view = client[:]
view.use_dill()
h5path = h5path.format(karabo_id, receiver_id)
ped_dir = "{}/r{:04d}".format(in_folder, run)
if ped_dir[-1] == "/":
ped_dir = ped_dir[:-1]
if not os.path.exists(out_folder):
os.makedirs(out_folder)
elif not overwrite:
raise AttributeError("Output path exists! Exiting")
fp_name_contr = path_template.format(run, karabo_da_control, 0)
fp_path_contr = '{}/{}'.format(ped_dir, fp_name_contr)
if sequences[0] == -1:
sequences = None
print(f"Run is: {run}")
print(f"HDF5 path: {h5path}")
print(f"Process modules: {karabo_da}")
creation_time = None
if use_dir_creation_date:
creation_time = get_dir_creation_date(in_folder, run)
print(f"Using {creation_time} as creation time")
if karabo_id_control == "":
karabo_id_control = karabo_id
```
%% Cell type:code id: tags:
``` python
def check_memoryCells(file_name, path):
with h5py.File(file_name, 'r') as f:
t_stamp = np.array(f[path + '/storageCells/timestamp'])
st_cells = np.array(f[path + '/storageCells/value'])
sc_start = np.array(f[path + '/storageCellStart/value'])
valid_train = t_stamp > 0
n_scs = st_cells[valid_train][0] + 1
sc_s = sc_start[valid_train][0]
return n_scs, sc_s
```
%% Cell type:code id: tags:
``` python
# set everything up filewise
mmf = map_modules_from_folder(in_folder, run, path_template, karabo_da, sequences)
mapped_files, mod_ids, total_sequences, sequences_qm, _ = mmf
print(f"Processing a total of {total_sequences} sequence files")
table = []
fi = 0
for i, key in enumerate(mapped_files):
for k, f in enumerate(list(mapped_files[key].queue)):
if k == 0:
table.append((fi, karabo_da[i], k, f))
else:
table.append((fi, "", k, f))
fi += 1
md = display(Latex(tabulate.tabulate(table, tablefmt='latex',
headers=["#", "module", "# module", "file"])))
# restore the queue
mmf = map_modules_from_folder(in_folder, run, path_template, karabo_da, sequences)
mapped_files, mod_ids, total_sequences, sequences_qm, _ = mmf
```
%% Cell type:code id: tags:
``` python
if not manual_slow_data:
with h5py.File(fp_path_contr.format(0), 'r') as f:
run_path = h5path_run.format(karabo_id_control, receiver_control_id)
integration_time = float(f[f'{run_path}/exposureTime/value'][()]*1e6)
bias_voltage = int(np.squeeze(f[f'{run_path}/vHighVoltage/value'])[0])
control_path = h5path_cntrl.format(karabo_id_control, receiver_control_id)
this_run_mcells, sc_start = check_memoryCells(fp_path_contr.format(0), control_path)
if this_run_mcells == 1:
memoryCells = 1
print(f'Dark runs in single cell mode\n storage cell start: {sc_start:02d}')
else:
memoryCells = 16
print(f'Dark runs in burst mode\n storage cell start: {sc_start:02d}')
print(f"Integration time is {integration_time} us")
print(f"Bias voltage is {bias_voltage} V")
print(f"Number of memory cells is {memoryCells}")
```
%% Cell type:code id: tags:
``` python
condition = Conditions.Dark.jungfrau(memory_cells=memoryCells,
bias_voltage=bias_voltage,
integration_time=integration_time)
def get_constant_for_module(condition, cal_db_interface, creation_time, cal_db_timeout,
memoryCells, db_module):
""" Get calibration constants for given module of Jungfrau
Function contains all includes to be used with ipCluster
:param condition: Calibration condition
:param cal_db_interface: Interface string, e.g. "tcp://max-exfl016:8015"
:param creation_time: Latest time for constant to be created
:param cal_db_timeout: Timeout for zmq request
:param: memoryCells: Number of used memory cells
:param: db_module: Module of Jungfrau, e.g. "Jungfrau_M035"
:return: offset_map (offset map), mask (mask of bad pixels),
gain_map (map of relative gain factors), db_module (name of DB module),
when (dictionaty: constant - creation time)
"""
from iCalibrationDB import (ConstantMetaData, Constants, Conditions, Detectors, Versions)
from cal_tools.tools import get_constant_from_db_and_time
import numpy as np
when = {}
offset_map, time = \
get_constant_from_db_and_time(getattr(Detectors, db_module),
Constants.jungfrau.Offset(),
condition,
np.zeros((1024, 512, 1, 3)),
cal_db_interface,
creation_time=creation_time,
timeout=cal_db_timeout)
when['Offset'] = time
mask, _ = \
get_constant_from_db_and_time(getattr(Detectors, db_module),
Constants.jungfrau.BadPixelsDark(),
condition,
np.zeros((1024, 512, 1, 3)),
cal_db_interface,
creation_time=creation_time,
timeout=cal_db_timeout)
when['BadPixels'] = time
gain_map, _ = \
get_constant_from_db_and_time(getattr(Detectors, db_module),
Constants.jungfrau.RelativeGain(),
condition,
None,
cal_db_interface,
creation_time=creation_time,
timeout=cal_db_timeout)
when['Gain'] = time
# move from x,y,cell,gain to cell,x,y,gain
offset_map = np.squeeze(offset_map)
mask = np.squeeze(mask)
if memoryCells > 1:
offset_map = np.moveaxis(np.moveaxis(offset_map, 0, 2), 0, 2)
mask = np.moveaxis(np.moveaxis(mask, 0, 2), 0, 2)
if gain_map is not None:
if memoryCells > 1:
gain_map = np.moveaxis(np.moveaxis(gain_map, 0, 2), 0, 1)
else:
gain_map = np.squeeze(gain_map)
gain_map = np.moveaxis(gain_map, 1, 0)
return offset_map, mask, gain_map, db_module, when
# Retrieve Offset, BadPixels and gain constants for a JungFrau module.
# Run ip Cluster parallelization over modules
p = partial(get_constant_for_module, condition, cal_db_interface,
creation_time, cal_db_timeout, memoryCells)
r = view.map_sync(p, db_module)
#r = list(map(p, db_module))
constants = {}
for rr in r:
offset_map, mask, gain_map, db_mod, when = rr
print(f'Constants for module {db_mod}:')
for const in when:
print(f'{const} injected at {when[const]}')
if gain_map is None:
print("No gain map found")
no_relative_gain = True
constants[db_mod] = (offset_map, mask, gain_map)
```
%% Cell type:code id: tags:
``` python
def copy_and_sanitize_non_cal_data(infile, outfile, h5base):
""" Copy and sanitize data in `infile` that is not touched by `correctLPD`
"""
if h5base.startswith("/"):
h5base = h5base[1:]
dont_copy = ["adc", ]
dont_copy = [h5base+"/{}".format(do)
for do in dont_copy]
def visitor(k, item):
if k not in dont_copy:
if isinstance(item, h5py.Group):
outfile.create_group(k)
elif isinstance(item, h5py.Dataset):
group = str(k).split("/")
group = "/".join(group[:-1])
infile.copy(k, outfile[group])
infile.visititems(visitor)
```
%% Cell type:code id: tags:
``` python
# Correct a chunk of images for offset and gain
def correct_chunk(offset_map, mask, gain_map, memoryCells, no_relative_gain, inp):
import numpy as np
import copy
import h5py
fim_data = None
gim_data = None
rim_data = None
msk_data = None
err = ''
try:
d, g, m, ind, copy_sample = inp
g[g==3] = 2
if copy_sample and ind==0:
if memoryCells==1:
rim_data = np.squeeze(copy.copy(d))
else:
rim_data = np.squeeze(copy.copy(d[:,0,...]))
# Select memory cells
if memoryCells>1:
m[m>16] = 0
offset_map_cell = offset_map[m,...]
mask_cell = mask[m,...]
else:
offset_map_cell = offset_map
mask_cell = mask
# Offset correction
offset = np.choose(g, (offset_map_cell[...,0], offset_map_cell[...,1], offset_map_cell[...,2]))
d -= offset
# Gain correction
if not no_relative_gain:
if memoryCells>1:
gain_map_cell = gain_map[m,...]
else:
gain_map_cell = gain_map
cal = np.choose(g, (gain_map_cell[..., 0], gain_map_cell[..., 1], gain_map_cell[..., 2]))
d /= cal
msk = np.choose(g, (mask_cell[...,0], mask_cell[...,1], mask_cell[...,2]))
# Store sample of data for plotting
if copy_sample and ind==0:
if memoryCells==1:
fim_data = np.squeeze(copy.copy(d))
gim_data = np.squeeze(copy.copy(g))
msk_data = np.squeeze(copy.copy(msk))
else:
fim_data = np.squeeze(copy.copy(d[:,1,...]))
gim_data = np.squeeze(copy.copy(g[:,1,...]))
msk_data = np.squeeze(copy.copy(msk[:,1,...]))
except Exception as e:
err = e
return ind, d, msk, rim_data, fim_data, gim_data, msk_data, err
fim_data = {}
gim_data = {}
rim_data = {}
msk_data = {}
# Loop over modules
for i, key in enumerate(mapped_files):
# Loop over sequences for given module
for k, f in enumerate(list(mapped_files[key].queue)):
offset_map, mask, gain_map = constants[db_module[i]]
h5path_f = h5path.format(int(karabo_da[i][-2:]))
with h5py.File(f, 'r') as infile:
# The processed files are saved here in a folder with the run name.
out_file = "{}/{}".format(out_folder, f.split("/")[-1])
out_file = out_file.replace("RAW", "CORR")
print(f'Process file: {f}, with path {h5path_f}')
with h5py.File(out_file, "w") as ofile:
copy_and_sanitize_non_cal_data(infile, ofile, h5path_f)
oshape = infile[h5path_f+"/adc"].shape
print(f'Data shape: {oshape}')
ddset = ofile.create_dataset(h5path_f+"/adc",
oshape,
chunks=(chunk_size_idim, 1, oshape[1], oshape[2]),
dtype=np.float32)
mskset = ofile.create_dataset(h5path_f+"/mask",
oshape,
chunks=(chunk_size_idim, 1, oshape[1], oshape[2]),
dtype=np.uint32,
compression="gzip", compression_opts=1, shuffle=True)
# Run ip Cluster parallelization over chunks of images
inp = []
max_ind = oshape[0]
ind = 0
# If chunk size is not given maximum 12+1 chunks is expected
if chunk_size == 0:
chunk_size = max_ind//12
print(f'Chunk size: {chunk_size}')
ts = time.time()
while ind<max_ind:
d = infile[h5path_f+"/adc"][ind:ind+chunk_size,...].astype(np.float32)
g = infile[h5path_f+"/gain"][ind:ind+chunk_size,...]
m = infile[h5path_f+"/memoryCell"][ind:ind+chunk_size,...]
print(f'To process: {d.shape}')
inp.append((d,g,m, ind, k==0))
ind += chunk_size
print('Preparation time: ', time.time() - ts)
ts = time.time()
print(f'Run {len(inp)} processes')
p = partial(correct_chunk, offset_map, mask, gain_map, memoryCells, no_relative_gain)
r = view.map_sync(p, inp)
#r = list(map(p, inp))
if k==0:
(_,_,_,
rim_data[db_module[i]], fim_data[db_module[i]],
gim_data[db_module[i]], msk_data[db_module[i]], _) = r[0]
print('Correction time: ', time.time() - ts)
ts = time.time()
for rr in r:
ind, cdata, cmask, _,_,_,_, err = rr
data_size = cdata.shape[0]
ddset[ind:ind+data_size,...] = cdata
mskset[ind:ind+data_size,...] = cmask
if err != '':
print(f'Error: {err}')
print('Saving time: ', time.time() - ts)
try:
with h5py.File(out_file, "w") as ofile:
copy_and_sanitize_non_cal_data(infile, ofile, h5path_f)
oshape = infile[h5path_f+"/adc"].shape
print(f'Data shape: {oshape}')
if not oshape[0]:
raise ValueError(f"No image data: shape {oshape}")
ddset = ofile.create_dataset(h5path_f+"/adc",
oshape,
chunks=(chunk_size_idim, 1, oshape[1], oshape[2]),
dtype=np.float32)
mskset = ofile.create_dataset(h5path_f+"/mask",
oshape,
chunks=(chunk_size_idim, 1, oshape[1], oshape[2]),
dtype=np.uint32,
compression="gzip", compression_opts=1, shuffle=True)
# Run ip Cluster parallelization over chunks of images
inp = []
max_ind = oshape[0]
ind = 0
# If chunk size is not given maximum 12+1 chunks is expected
if chunk_size == 0:
chunk_size = max_ind//12
print(f'Chunk size: {chunk_size}')
ts = time.time()
while ind<max_ind:
d = infile[h5path_f+"/adc"][ind:ind+chunk_size,...].astype(np.float32)
g = infile[h5path_f+"/gain"][ind:ind+chunk_size,...]
m = infile[h5path_f+"/memoryCell"][ind:ind+chunk_size,...]
print(f'To process: {d.shape}')
inp.append((d,g,m, ind, k==0))
ind += chunk_size
print('Preparation time: ', time.time() - ts)
ts = time.time()
print(f'Run {len(inp)} processes')
p = partial(correct_chunk, offset_map, mask, gain_map, memoryCells, no_relative_gain)
r = view.map_sync(p, inp)
#r = list(map(p, inp))
if k==0:
(_,_,_,
rim_data[db_module[i]], fim_data[db_module[i]],
gim_data[db_module[i]], msk_data[db_module[i]], _) = r[0]
print('Correction time: ', time.time() - ts)
ts = time.time()
for rr in r:
ind, cdata, cmask, _,_,_,_, err = rr
data_size = cdata.shape[0]
ddset[ind:ind+data_size,...] = cdata
mskset[ind:ind+data_size,...] = cmask
if err != '':
print(f'Error: {err}')
print('Saving time: ', time.time() - ts)
except Exception as e:
print(f"Error: {e}")
```
%% Cell type:code id: tags:
``` python
def do_2d_plot(data, edges, y_axis, x_axis, title):
fig = plt.figure(figsize=(10,10))
ax = fig.add_subplot(111)
extent = [np.min(edges[1]), np.max(edges[1]),np.min(edges[0]), np.max(edges[0])]
im = ax.imshow(data[::-1,:], extent=extent, aspect="auto", norm=LogNorm(vmin=1, vmax=np.max(data)))
ax.set_xlabel(x_axis)
ax.set_ylabel(y_axis)
ax.set_title(title)
cb = fig.colorbar(im)
cb.set_label("Counts")
```
%% Cell type:code id: tags:
``` python
for mod in rim_data:
h, ex, ey = np.histogram2d(rim_data[mod].flatten(), gim_data[mod].flatten(),
bins=[100, 4], range=[[0, 10000], [0,4]])
do_2d_plot(h, (ex, ey), "Signal (ADU)", "Gain Bit Value", f'Module {mod}')
```
%% Cell type:markdown id: tags:
### Mean RAW Preview ###
The per pixel mean of the sequence file of RAW data
%% Cell type:code id: tags:
``` python
for mod in rim_data:
fig = plt.figure(figsize=(20,10))
ax = fig.add_subplot(111)
im = ax.imshow(np.mean(rim_data[mod],axis=0),
vmin=min(0.75*np.median(rim_data[mod][rim_data[mod] > 0]), 2000),
vmax=max(1.5*np.median(rim_data[mod][rim_data[mod] > 0]), 16000), cmap="jet")
ax.set_title(f'Module {mod}')
cb = fig.colorbar(im, ax=ax)
```
%% Cell type:markdown id: tags:
### Mean CORRECTED Preview ###
The per pixel mean of the sequence file of CORR data
%% Cell type:code id: tags:
``` python
for mod in rim_data:
fig = plt.figure(figsize=(20,10))
ax = fig.add_subplot(111)
im = ax.imshow(np.mean(fim_data[mod],axis=0),
vmin=min(0.75*np.median(fim_data[mod][fim_data[mod] > 0]), -0.5),
vmax=max(2.*np.median(fim_data[mod][fim_data[mod] > 0]), 100), cmap="jet")
ax.set_title(f'Module {mod}', size=18)
cb = fig.colorbar(im, ax=ax)
```
%% Cell type:markdown id: tags:
### Single Train Preview ###
A single image from the first train
%% Cell type:code id: tags:
``` python
for mod in rim_data:
fig = plt.figure(figsize=(20,10))
ax = fig.add_subplot(111)
im = ax.imshow(fim_data[mod][0,...],
vmin=min(0.75*np.median(fim_data[mod][0,...]), -0.5),
vmax=max(2.*np.median(fim_data[mod][0,...]), 100), cmap="jet")
ax.set_title(f'Module {mod}', size=18)
cb = fig.colorbar(im, ax=ax)
```
%% Cell type:markdown id: tags:
## Signal Distribution ##
%% Cell type:code id: tags:
``` python
for mod in rim_data:
fig = plt.figure(figsize=(20,10))
ax = fig.add_subplot(211)
h = ax.hist(fim_data[mod].flatten(), bins=1000, range=(-100, 1000), log=True)
l = ax.set_xlabel("Signal (keV)")
l = ax.set_ylabel("Counts")
_ = ax.set_title(f'Module {mod}')
ax = fig.add_subplot(212)
h = ax.hist(fim_data[mod].flatten(), bins=1000, range=(-1000, 10000), log=True)
l = ax.set_xlabel("Signal (keV)")
l = ax.set_ylabel("Counts")
_ = ax.set_title(f'Module {mod}')
```
%% Cell type:markdown id: tags:
### Maximum GAIN Preview ###
The per pixel maximum of the first train of the GAIN data
%% Cell type:code id: tags:
``` python
for mod in rim_data:
fig = plt.figure(figsize=(20,10))
ax = fig.add_subplot(111)
im = ax.imshow(np.max(gim_data[mod], axis=0), vmin=0,
vmax=3, cmap="jet")
ax.set_title(f'Module {mod}', size=18)
cb = fig.colorbar(im, ax=ax)
```
%% Cell type:markdown id: tags:
## Bad Pixels ##
The mask contains dedicated entries for all pixels and memory cells as well as all three gains stages. Each mask entry is encoded in 32 bits as:
%% Cell type:code id: tags:
``` python
table = []
for item in BadPixels:
table.append((item.name, f"{item.value:016b}"))
md = display(Latex(tabulate.tabulate(table, tablefmt='latex', headers=["Bad pixel type", "Bit mask"])))
```
%% Cell type:markdown id: tags:
### Single Image Bad Pixels ###
A single image bad pixel map for the first image of the first train
%% Cell type:code id: tags:
``` python
for mod in rim_data:
fig = plt.figure(figsize=(20,10))
ax = fig.add_subplot(111)
im = ax.imshow(np.log2(msk_data[mod][0,...]), vmin=0, vmax=32, cmap="jet")
ax.set_title(f'Module {mod}', size=18)
cb = fig.colorbar(im, ax=ax)
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment