Skip to content
Snippets Groups Projects

Fix/remove unused influx db

Merged Karim Ahmed requested to merge fix/remove_unused_influx_db into master
1 unresolved thread
1 file
+ 0
44
Compare changes
  • Side-by-side
  • Inline
%% Cell type:markdown id: tags:
# LPD Offline Correction #
Author: European XFEL Detector Group, Version: 1.0
%% Cell type:code id: tags:
``` python
cluster_profile = "noDB" # cluster profile to use
in_folder = "/gpfs/exfel/exp/FXE/201931/p900088/raw/" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/karnem/test_1/lpd_correct_006" # the folder to output to, required
sequences = [-1] # sequences to correct, set to -1 for all, range allowed
modules = [-1] # modules to correct, set to -1 for all, range allowed
run = 270 # runs to process, required
karabo_id = "FXE_DET_LPD1M-1" # karabo karabo_id
karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators
receiver_id = "{}CH0" # inset for receiver devices
path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data
h5path = '/INSTRUMENT/{}/DET/{}:xtdf/' # path in the HDF5 file to images
h5path_idx = '/INDEX/{}/DET/{}:xtdf/' # path in the HDF5 file to images
use_dir_creation_date = True # use the creation date of the directory for database time derivation
cal_db_interface = "tcp://max-exfl016:8015#8020" # the database interface to use
cal_db_timeout = 30000 # timeout for calibration db requests in milliseconds
calfile = "/gpfs/exfel/data/scratch/xcal/lpd_store_0519.h5" # path to constants extracted from the db into a file
mem_cells = 512 # memory cells in data
overwrite = True # set to True if existing data should be overwritten
no_relative_gain = False # do not do relative gain correction
no_flat_fields = False # do not do flat field correction
max_pulses = 512 # maximum number of pulses per train
no_non_linear_corrections = False # do not apply non-linear corrections
max_cells_db = 512 # maximum cells for data from the database
rawversion = 2 # raw format version
capacitor = '5pF' # capacitor setting: 5pF or 50pF
photon_energy = 9.2 # the photon energy in keV
nodb = False # set to true if db input is to be avoided
bias_voltage = 250 # detector bias voltage
geometry_file = "/gpfs/exfel/d/cal/exchange/lpdMF_00.h5" # the geometry file to use, MAR 2018
beam_center_offset = [1.5, 1] # offset from the beam center, MAR 2018
sequences_per_node = 1 # sequence files to process per node
dont_mark_non_lin_region = False # do not mark non-linear regions in BP map
linear_between_high_gain = [-5000, 2500] # region in which high gain is considered linear, in ADU
linear_between_med_gain = [300, 3000] # region in which medium gain is considered linear, in ADU
linear_between_low_gain = [300, 3000] # region in which low gain is considered linear, in ADU
nlc_version = 2 # version of NLC to use
def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da):
from xfel_calibrate.calibrate import balance_sequences as bs
return bs(in_folder, run, sequences, sequences_per_node, karabo_da)
```
%% Cell type:code id: tags:
``` python
import sys
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
max_cells = mem_cells
if sequences[0] == -1:
sequences = None
if karabo_da[0] == '-1':
if modules[0] == -1:
modules = list(range(16))
karabo_da = ['LPD{:02d}'.format(i) for i in modules]
else:
modules = [int(x[-2:]) for x in karabo_da]
print("Process modules: ",
', '.join([f"Q{x // 4 + 1}M{x % 4 + 1}" for x in modules]))
do_rel_gain = not no_relative_gain
do_ff = not no_flat_fields
index_v = rawversion
#do_ff = False
#relgain_store = "/gpfs/exfel/d/proc/FXE/201830/p900020/calibration/lpd_ci_store_{}_16_5pf.h5"
print("Applying FF corrections: {}".format(do_ff))
# make sure a cluster is running with ipcluster start --n=32, give it a while to start
import os
import h5py
import numpy as np
import matplotlib
matplotlib.use("agg")
import matplotlib.pyplot as plt
from ipyparallel import Client
from iCalibrationDB import ConstantMetaData, Constants, Conditions, Detectors, Versions
from cal_tools.tools import (gain_map_files, parse_runs, run_prop_seq_from_path,
get_notebook_name, get_dir_creation_date,
get_constant_from_db, map_modules_from_folder)
from cal_tools.influx import InfluxLogger
from cal_tools.enums import BadPixels
from cal_tools.plotting import show_overview, plot_badpix_3d, create_constant_overview
from cal_tools.lpdlib import LpdCorrections
from datetime import datetime
from collections import OrderedDict
from influxdb import InfluxDBClient
from datetime import datetime
print("Connecting to profile {}".format(cluster_profile))
view = Client(profile=cluster_profile)[:]
view.use_dill()
gains = np.arange(3)
cells = np.arange(max_cells)
CHUNK_SIZE = 512
MAX_PAR = 32
if not os.path.exists(out_folder):
os.makedirs(out_folder)
elif not overwrite:
raise AttributeError("Output path exists! Exiting")
creation_time = None
if use_dir_creation_date:
creation_time = get_dir_creation_date(in_folder, run)
else:
creation_time = datetime.now()
print("Using {} as creation time".format(creation_time.isoformat()))
_, proposal, seq = run_prop_seq_from_path(in_folder)
instrument = karabo_id.split("_")[0]
logger = InfluxLogger(detector="LPD", instrument=instrument, mem_cells=mem_cells,
notebook=get_notebook_name(), proposal=proposal)
client = InfluxDBClient('exflqr18318', 8086, 'root', 'root', 'calstats')
mark_non_lin_region = not dont_mark_non_lin_region
linear_between = [linear_between_high_gain, linear_between_med_gain, linear_between_low_gain]
h5path = h5path.format(karabo_id, receiver_id)
h5path_idx = h5path_idx.format(karabo_id, receiver_id)
```
%% Cell type:code id: tags:
``` python
# set everything up filewise
mmf = map_modules_from_folder(in_folder, run, path_template, karabo_da, sequences)
mapped_files, mod_ids, total_sequences, sequences_qm, _ = mmf
MAX_PAR = min(MAX_PAR, total_sequences)
```
%% Cell type:markdown id: tags:
## Processed Files ##
%% Cell type:code id: tags:
``` python
import copy
from IPython.display import HTML, display, Markdown, Latex
import tabulate
print("Processing a total of {} sequence files in chunks of {}".format(total_sequences, MAX_PAR))
table = []
mfc = copy.copy(mapped_files)
ti = 0
for k, files in mfc.items():
i = 0
while not files.empty():
f = files.get()
if i == 0:
table.append((ti, k, i, f))
else:
table.append((ti, "", i, f))
i += 1
ti += 1
md = display(Latex(tabulate.tabulate(table, tablefmt='latex', headers=["#", "module", "# module", "file"])))
# restore the queue
mmf = map_modules_from_folder(in_folder, run, path_template, karabo_da, sequences)
mapped_files, mod_ids, total_sequences, sequences_qm, _ = mmf
```
%% Cell type:code id: tags:
``` python
import copy
from functools import partial
def correct_module(max_cells, do_ff, index_v, CHUNK_SIZE, total_sequences, sequences_qm,
bins_gain_vs_signal, bins_signal_low_range, bins_signal_high_range, max_pulses,
dbparms, fileparms, nodb, no_non_linear_corrections, mark_non_lin_region, linear_between,
nlc_version, h5path, h5path_idx, inp):
import numpy as np
import copy
import h5py
import socket
from datetime import datetime
import re
import os
from influxdb import InfluxDBClient
from cal_tools.enums import BadPixels
from cal_tools.lpdlib import LpdCorrections
client = InfluxDBClient('exflqr18318', 8086, 'root', 'root', 'calstats')
def create_influx_entry(run, proposal, qm, sequence, filesize, chunksize,
total_sequences, success, runtime, reason=""):
return {
"measurement": "run_correction",
"tags": {
"host": socket.gethostname(),
"run": run,
"proposal": proposal,
"mem_cells": max_cells,
"sequence": sequence,
"module": qm,
"filesize": filesize,
"chunksize": chunksize,
"total_sequences": total_sequences,
"sequences_module": sequences_qm[qm],
"detector": "LPD",
"instrument": "FXE",
},
"time": datetime.utcnow().isoformat(),
"fields": {
"success": success,
"reason": reason,
"runtime": runtime,
}
}
hists_signal_low = None
hists_signal_high = None
hists_gain_vs_signal = None
low_edges = None
high_edges = None
signal_edges = None
when = None
qm = None
err = None
try:
start = datetime.now()
success = True
reason = ""
filename, filename_out, channel, qm = inp
infile = h5py.File(filename, "r", driver="core")
outfile = h5py.File(filename_out, "w")
# LPD correction requires path without the leading "/""
if h5path[0] == '/':
h5path = h5path[1:]
if h5path_idx[0] == '/':
h5path_idx = h5path_idx[1:]
try:
lpd_corr = LpdCorrections(infile, outfile, max_cells, channel, max_pulses,
bins_gain_vs_signal, bins_signal_low_range,
bins_signal_high_range, do_ff=do_ff, raw_fmt_version=index_v,
correct_non_linear=(not no_non_linear_corrections),
mark_non_lin_region=mark_non_lin_region, linear_between=linear_between,
nlc_version=nlc_version,
h5_data_path=h5path, h5_index_path=h5path_idx)
try:
lpd_corr.get_valid_image_idx()
except IOError:
return
if not nodb:
when = lpd_corr.initialize_from_db(dbparms, qm, only_dark=(fileparms != ""))
if fileparms != "":
lpd_corr.initialize_from_file(fileparms, qm, with_dark=nodb)
print("Initialized constants")
for irange in lpd_corr.get_iteration_range():
lpd_corr.correct_lpd(irange)
print("All interations finished")
hists, edges = lpd_corr.get_histograms()
hists_signal_low, hists_signal_high, hists_gain_vs_signal = hists
low_edges, high_edges, signal_edges = edges
outfile.close()
infile.close()
print("Closed files")
except Exception as e1:
err = e1
outfile.close()
infile.close()
except Exception as e:
print(e)
success = False
reason = "Error"
err = e
finally:
run = re.findall(r'.*r([0-9]{4}).*', filename)[0]
proposal = re.findall(r'.*p([0-9]{6}).*', filename)[0]
sequence = re.findall(r'.*S([0-9]{5}).*', filename)[0]
filesize = os.path.getsize(filename)
duration = (datetime.now()-start).total_seconds()
influx = create_influx_entry(run, proposal, qm, sequence, filesize, CHUNK_SIZE, total_sequences, success, duration, reason)
#client.write_points([influx])
return hists_signal_low, hists_signal_high, hists_gain_vs_signal, low_edges, high_edges, signal_edges, when, qm, err
done = False
first_files = []
inp = []
left = total_sequences
bins_gain_vs_signal = (100, 4)
bins_signal_low_range = 100
bins_signal_high_range = 100
hists_signal_low = np.zeros((bins_signal_low_range, max_pulses), np.float64)
hists_signal_high = np.zeros((bins_signal_high_range, max_pulses), np.float64)
hists_gain_vs_signal = np.zeros((bins_gain_vs_signal), np.float64)
low_edges, high_edges, signal_edges = None, None, None
dbparms = cal_db_interface, creation_time, max_cells_db, capacitor, bias_voltage, photon_energy, cal_db_timeout
fileparms = calfile
whens = {}
while not done:
dones = []
first = True
for i in range(16):
qm = "Q{}M{}".format(i//4 +1, i % 4 + 1)
if qm in mapped_files and not mapped_files[qm].empty():
fname_in = str(mapped_files[qm].get())
dones.append(mapped_files[qm].empty())
else:
print("Skipping {}".format(qm))
first_files.append((None, None))
continue
fout = os.path.abspath("{}/{}".format(out_folder, (os.path.split(fname_in)[-1]).replace("RAW", "CORR")))
if first:
first_files.append((fname_in, fout))
inp.append((fname_in, fout, i, qm))
first = False
if len(inp) >= min(MAX_PAR, left):
print("Running {} tasks parallel".format(len(inp)))
p = partial(correct_module, max_cells, do_ff, index_v, CHUNK_SIZE, total_sequences, sequences_qm,
bins_gain_vs_signal, bins_signal_low_range, bins_signal_high_range, max_pulses, dbparms,
fileparms, nodb, no_non_linear_corrections, mark_non_lin_region, linear_between, nlc_version,
h5path, h5path_idx)
r = view.map_sync(p, inp)
#r = list(map(p, inp))
inp = []
left -= MAX_PAR
for rr in r:
if rr is not None:
hl, hh, hg, low_edges, high_edges, signal_edges, when, qm, err = rr
whens[qm] = {}
whens[qm]['when'] = when
whens[qm]['err'] = err
if hl is not None: # any one being None will also make the others None
hists_signal_low += hl.astype(np.float64)
hists_signal_high += hh.astype(np.float64)
hists_gain_vs_signal += hg.astype(np.float64)
done = all(dones)
```
%% Cell type:code id: tags:
``` python
print("Offset was injected on: ")
for k, v in whens.items():
if v['err'] is None:
print("{}: {}".format(k, v['when']))
else:
print("{}: {}: {}".format(k, v['when'], v['err']))
```
%% Cell type:code id: tags:
``` python
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib.ticker import LinearLocator, FormatStrFormatter
import numpy as np
%matplotlib inline
def do_3d_plot(data, edges, x_axis, y_axis):
fig = plt.figure(figsize=(10,10))
ax = fig.gca(projection='3d')
# Make data.
X = edges[0][:-1]
Y = edges[1][:-1]
X, Y = np.meshgrid(X, Y)
Z = data.T
# Plot the surface.
surf = ax.plot_surface(X, Y, Z, cmap=cm.coolwarm,
linewidth=0, antialiased=False)
ax.set_xlabel(x_axis)
ax.set_ylabel(y_axis)
ax.set_zlabel("Counts")
```
%% Cell type:markdown id: tags:
## Signal vs. Analogue Gain ##
The following plot shows plots signal vs. gain for the first 1280 images.
%% Cell type:code id: tags:
``` python
do_3d_plot(hists_gain_vs_signal, signal_edges, "Signal (ADU)", "Gain Bit Value")
```
%% Cell type:code id: tags:
``` python
def do_2d_plot(data, edges, y_axis, x_axis):
from matplotlib.colors import LogNorm
fig = plt.figure(figsize=(10,10))
ax = fig.add_subplot(111)
extent = [np.min(edges[1]), np.max(edges[1]),np.min(edges[0]), np.max(edges[0])]
im = ax.imshow(data[::-1,:], extent=extent, aspect="auto", norm=LogNorm(vmin=1, vmax=np.max(data)))
ax.set_xlabel(x_axis)
ax.set_ylabel(y_axis)
cb = fig.colorbar(im)
cb.set_label("Counts")
do_2d_plot(hists_gain_vs_signal, signal_edges, "Signal (ADU)", "Gain Bit Value")
```
%% Cell type:markdown id: tags:
## Mean Intensity per Pulse ##
The following plots show the mean signal for each pulse in a detailed and expanded intensity region.
%% Cell type:code id: tags:
``` python
do_3d_plot(hists_signal_low, low_edges, "Signal (ADU)", "Pulse id")
do_2d_plot(hists_signal_low, low_edges, "Signal (ADU)", "Pulse id")
do_3d_plot(hists_signal_high, high_edges, "Signal (ADU)", "Pulse id")
do_2d_plot(hists_signal_high, high_edges, "Signal (ADU)", "Pulse id")
```
%% Cell type:markdown id: tags:
## Data Preview ##
In the following geometry information from the LPD geometry file is applied. Quadrants are positioned to last known position. No bad pixel masking has been performed.
%% Cell type:code id: tags:
``` python
# geometry information
dc = beam_center_offset
#d_quads = [(-14+dc[0],-300+dc[1]),(10+dc[0],-9+dc[1]),(-256+dc[0],15+dc[1]),(-280+dc[0],-276+dc[1])] # MAR 2018
d_quads = [(-19+dc[0],-300+dc[1]),(10+dc[0],-9+dc[1]),(-256+dc[0],19+dc[1]),(-285+dc[0],-271+dc[1])] # MAY 2019
import cal_tools.metrology as metro
in_files = "{}/CORR*LPD*S{:05d}*.h5".format(out_folder, sequences[0] if sequences else 0)
datapath = "{}/data".format(h5path)
print("Preview is from {}".format(in_files))
```
%% Cell type:code id: tags:
``` python
posarr = metro.positionFileList(in_files, datapath, geometry_file, d_quads, nImages = 10)
maskpath = "{}/mask".format(h5path)
maskedarr = metro.positionFileList(in_files, maskpath, geometry_file, d_quads, nImages = 10)
```
%% Cell type:code id: tags:
``` python
# convert the Carthesian coordinates of the detector to polar coordinates
def mod_cart_to_pol(d, dx, dy, filter_by_val=True):
""" Convert carthesian coords to polar coords
"""
cx, cy = d.shape
x = np.arange(cx)+dx
y = np.arange(cy)+dy
x = np.repeat(x[:,None], cy, axis=1)
y = np.repeat(y[None,:], cx, axis=0)
rho = np.sqrt(x**2 + y**2).flatten()
phi = np.arctan2(y, x).flatten()
flat = d.flatten()
# we also perform a bit of filtering here
if filter_by_val:
good = np.isfinite(flat) & (flat > 0) & (flat < 1e5)
return rho[good], phi[good], flat[good], good
return rho, phi, flat, None
```
%% Cell type:markdown id: tags:
### Single Short Preview ###
A single shot image from cell 5 of the first train
%% Cell type:code id: tags:
``` python
fig = plt.figure(figsize=(15,15))
ax = fig.add_subplot(111)
parr = posarr[5,...]
im=ax.imshow((parr), vmin=0, vmax=max(10*np.median(parr[parr > 0]), 100))
cb = fig.colorbar(im)
cb.set_label("Intensity (ADU")
```
%% Cell type:markdown id: tags:
### Pixel Mean Preview ###
The per pixel mean value of the first 100 images
%% Cell type:code id: tags:
``` python
fig = plt.figure(figsize=(15,15))
ax = fig.add_subplot(111)
parr = np.mean(posarr, axis=0)
im=ax.imshow((parr), vmin=0, vmax=max(10*np.median(parr[parr > 0]), 100))
cb = fig.colorbar(im)
cb.set_label("Intensity (ADU")
```
%% Cell type:markdown id: tags:
### Radial Profile ###
The simple azimuthley integrated profile plotted assumes the beam centered in the hole, it is thus not always fully accurate.
%% Cell type:code id: tags:
``` python
# Here we create histograms of the data in a polar coordinate system.
# We use numpys hist2d function, giving it the polar coordinates of
# each pixels, and weighing that coordinate with the pixel's value.
# We obtain a histogram for each module, according to its position defined
# in the coord_list.
from scipy.stats import binned_statistic_2d
hs = []
bins_nums = []
edges = []
goods = []
bins = 5000
dx, dy = -750, -750
rho, phi, weights, good = mod_cart_to_pol(np.mean(posarr, axis=0), dy, dx, False)
#h, phi_edges, rho_edges = np.histogram2d(phi, rho, bins=(1000,1000),
# range=((-np.pi, np.pi), (0, 1000)),
# weights=weights)
h, phi_edges, rho_edges, bns = binned_statistic_2d(phi, rho, weights, bins=(bins,bins),
range=((-np.pi, np.pi), (0, 1000)),
statistic = "sum")
bins_nums.append(bns)
hs.append(h)
edges.append((phi_edges, rho_edges))
goods.append(good)
```
%% Cell type:code id: tags:
``` python
x = np.arange(bins)/bins*1000*500e-6
y = np.arange(bins)/bins*2.
ds = np.array(hs).sum(axis=0)
```
%% Cell type:code id: tags:
``` python
# With appropriate coordinates given, plotting a profile along the
# vertical axis should give us the positions of the diffraction peaks,
# Here still as distances on the detector plane. With knowledge of the
# detector to sample distance, these could then be converted in
# reciprocal coordinates.
ds[ds == 0] = np.nan
profile = np.nanmedian(ds, axis=0)
fig = plt.figure(figsize=(15,5))
ax = fig.add_subplot(111)
p = ax.plot(x, profile)
l =ax.set_ylabel("Median intensity (arb. units)")
l = ax.set_xlabel("Radial distance (arb. units)")
```
%% Cell type:markdown id: tags:
## Maxium Gain Value Reached ##
The following plot shows the maximum gain value reached. It can be used as an indication of whether the detector went into saturation.
%% Cell type:code id: tags:
``` python
gainpath = "{}/gain".format(h5path)
posarr = metro.positionFileList(in_files, gainpath, geometry_file, d_quads, nImages = 100)
```
%% Cell type:code id: tags:
``` python
fig = plt.figure(figsize=(15,15))
ax = fig.add_subplot(111)
parr = np.max(posarr, axis=0)
im=ax.imshow((parr), vmin=0, vmax=3)
cb = fig.colorbar(im)
cb.set_label("Intensity (ADU")
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
Loading