# Merging of Current Source Data
Author: Detector Group, Version 1.0

This notebook is used to create for each AGIPD module a single .h5 file containing current source injected data. Current source (CS) data for each module consists of four runs as at a time the current source can injected charge only in every 4th column. The merged .h5 file is then used by notebook "CS_Characterization_unequalClockStep_NBC" to create CS constants.

In [None]:
# cluster_profile = "noDB" # The ipcluster profile to use
in_folder = "/gpfs/exfel/exp/SPB/202331/p900376/raw/" # path to input data, required
out_folder = "/gpfs/exfel/exp/SPB/202331/p900376/scratch/CSmergedFiles/" # path to output to, required
metadata_folder = ""

first_run = 269 # first taken run, it has to be a run with the smallest ITESTC, otherwise define runs manually
runs1 = [-1] # list of runs to use, range allowed ITESTC 65, use -1 for auto-completion
runs2 = [-1] # list of runs to use, ITESTC 80, use -1 for auto-completion
runs3 = [-1] # list of runs to use, ITESTC 120, use -1 for auto-completion
runs4 = [-1] # list of runs to use, ITESTC 170, use -1 for auto-completion
itestc_order = "descending" # order in which ITESTC values were configured, e.g. first run taken with the highest ITESTC value --> "descending"
n_itestc = 5 # number of different ITESTC configurations taken. IMPORTANT, if you use ITESTC configurations which do not have consecutive run numbers, do not use runs auto-completion! Provide run numbers manually!

modules = [-1] # modules to work on, required, range allowed
karabo_da = ["all"]
karabo_id_control = "SPB_IRU_AGIPD1M1"  # karabo-id for the control device e.g. "MID_EXP_AGIPD1M1", or "SPB_IRU_AGIPD1M1"
karabo_id = "SPB_DET_AGIPD1M-1"
ctrl_source_template = '{}/MDL/FPGA_COMP' # path to control information
instrument_source_template = '{}/DET/{}:xtdf'  # path in the HDF5 file to images
receiver_template = "{}CH0" # inset for receiver devices

creation_time = ""  # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC e.g. "2022-06-28 13:00:00"
creation_date_offset = "00:00:00" # add an offset to creation date, e.g. to get different constants
cal_db_interface = "tcp://max-exfl-cal001:8015#8045"  # the database interface to use

bias_voltage = -1 # detector bias voltage
mem_cells = -1  # number of memory cells used, use -1 to use value stored in slow data.
acq_rate = -1. # the detector acquisition rate, use -1. to use value stored in slow data.
gain_setting = -1 # gain setting can have value 0 or 1, use -1 to use value stored in slow data.
integration_time = -1 # integration time, use -1 to use value stored in slow data.

steps = [1, 10, 75] # spacing between integration time steps for each loop
increments = [300, 400, 200] # number of steps within a loop

In [None]:
# Determine the run numbers
if (runs1[0] == -1) & (itestc_order == "ascending"):
    runs1 = range(first_run, first_run+4*4, n_itestc)
    runs2 = range(first_run+1, first_run+1+4*4, n_itestc)
    runs3 = range(first_run+2, first_run+2+4*4, n_itestc)
    runs4 = range(first_run+3, first_run+3+4*4, n_itestc)
    
if (runs1[0] == -1) & (itestc_order == "descending"):
    runs1 = range(first_run, first_run+4*4, n_itestc)
    runs2 = range(first_run-1, first_run-1+4*4, n_itestc)
    runs3 = range(first_run-2, first_run-2+4*4, n_itestc)
    runs4 = range(first_run-3, first_run-3+4*4, n_itestc)
    
print('Runs to be evaluated: ', list(runs1), list(runs2), list(runs3), list(runs4))

In [None]:
import warnings
from datetime import datetime, timedelta
from functools import partial
from dateutil import parser

warnings.filterwarnings('ignore')

import h5py
import matplotlib
import numpy as np

from collections import OrderedDict
import xarray
import matplotlib.pyplot as plt

from extra_data import RunDirectory
import pasha as psh
from concurrent.futures import ProcessPoolExecutor

from cal_tools.agipdlib import AgipdCtrl
from cal_tools.tools import calcat_creation_time
from cal_tools.step_timing import StepTimer

%matplotlib inline

In [None]:
raw_dc = RunDirectory(in_folder+f'/r{first_run:04d}')
if karabo_da == ["all"]:
    if modules[0] == -1:
        strings = [s.split("/")[-1] for s in list(raw_dc.detector_sources)]
        modules = [int(s[:s.index("C")]) for s in strings]
        modules.sort()
    karabo_da = ["AGIPD{:02d}".format(i) for i in modules]
else:
    modules = [int(x[-2:]) for x in karabo_da]


print("Modules: {}".format(modules))
print(f"Detector in use is {karabo_id}")

In [None]:
for module in modules:
    first_run = runs2[0]
    channel = module
    ctrl_src = ctrl_source_template.format(karabo_id_control)
    instrument_src = instrument_source_template.format(karabo_id, receiver_template).format(channel)

    agipd_cond = AgipdCtrl(
        run_dc=RunDirectory(f'{in_folder}/r{first_run:04d}'),
        image_src=instrument_src,
        ctrl_src=ctrl_src,
        raise_error=False,  # to be able to process very old data without gain_setting value
    )

In [None]:
# Evaluate creation time
creation_time = calcat_creation_time(in_folder, first_run, creation_time)
offset = parser.parse(creation_date_offset)
delta = timedelta(hours=offset.hour, minutes=offset.minute, seconds=offset.second)
creation_time += delta
print(f"Creation time: {creation_time}")

# Read AGIPD parameter conditions.
if acq_rate == -1.:
    acq_rate = agipd_cond.get_acq_rate()
if mem_cells == -1:
    mem_cells = agipd_cond.get_num_cells()
# TODO: look for alternative for passing creation_time
if gain_setting == -1:
    gain_setting = agipd_cond.get_gain_setting(creation_time)
if bias_voltage == -1:
    bias_voltage = agipd_cond.get_bias_voltage(karabo_id_control)
if integration_time == -1:
    integration_time = agipd_cond.get_integration_time()

In [None]:
print("Operating conditions are:")
print(f"Bias voltage: {bias_voltage} V")
print(f"Memory cells: {mem_cells}")
print(f"Acquisition rate: {acq_rate} MHz")
print(f"Gain setting: {gain_setting}")
print(f"Integration time: {integration_time}")

In [None]:
def count_min_bursts(in_folder, channel, run):
    """Calculate maximum number of trains in the provided runs."""
    
    bursts = []
    run_folder_path = in_folder.format(run)
    r = RunDirectory(run_folder_path)
    instrument_source = karabo_id+'/DET/{}CH0:xtdf'.format(channel)
    c = r.get_array(instrument_source, 'image.length')
    total_frame = np.count_nonzero(c)
    cells = r.detector_info(instrument_source)['frames_per_train']
    bursts = total_frame // cells

    return bursts

trains = []

for module in modules:
    partial_check = partial(count_min_bursts, in_folder+"/r{:04d}/", module)

    with ProcessPoolExecutor(max_workers=4) as pool:
        bursts1 = list(pool.map(partial_check, runs1))
        bursts2 = list(pool.map(partial_check, runs2))
        bursts3 = list(pool.map(partial_check, runs3))    
    trains.append(np.min(bursts1 + bursts2 + bursts3))
trains = np.min(np.asarray(trains))
print('Number of trains in runs: ',trains)

In [None]:
def check_ASIC(in_folder, runs, trains, channel, tresh):
    """Check which set of runs to use for a given ASIC"""
    
    ASIC_code = []
    
    run_folder_path = in_folder.format(runs[0])
    r = RunDirectory(run_folder_path)

    instrument_source = karabo_id+'/DET/{}CH0:xtdf'.format(channel) 

    d = r.get_array(instrument_source, 'image.data')[:trains*mem_cells, 0, :, :]
    d = d.values.reshape((d.shape[0] // mem_cells, mem_cells, 1, 512, 128))
    d = np.moveaxis(d, 4, 3).astype(np.float32)

    
    for col in range(0, 512, 64):
        #rise_val is used to check increase of signal, 
        #if it is low, data from other file is checked
        rise_val = np.mean(d[-10:, 3, 0, :64, 3+col:64+col:4]) -\
                    np.mean(d[800:810, 3, 0, :64, 3+col:64+col:4])
        if (rise_val < tresh) & (rise_val > -30.):
            ASIC_code.append(2)
            print(f"Signal rise low ({rise_val})")
        else:
            ASIC_code.append(1)
            print(f"Signal rise ok ({rise_val})")
            
    for col in range(0, 512, 64):       
        rise_val = np.mean(d[-10:, 3, 0, 64:, col:64+col:4]) -\
                    np.mean(d[800:810, 3, 0, 64:, col:64+col:4])
        if (rise_val < tresh) & (rise_val > -30.):
            ASIC_code.append(2)
            print(f"Signal rise low ({rise_val})")
        else:
            ASIC_code.append(1)
            print(f"Signal rise ok ({rise_val})")

    return ASIC_code



def defineInitParams(m_num):
    """Specify the ASIC indexing and runs to be used within data array in OrderedDict."""
    
    channel = modules[m_num]
    runs_dict = OrderedDict()
    
    instrument_source = karabo_id+'/DET/{}CH0:xtdf'.format(channel)
    print(instrument_source)
    
    run_lbl = [0, runs1, runs2, runs3, runs4]
    
    for idx, a_c in enumerate(ASIC_code_list[m_num]):
        
        print(f'ASIC {idx} - code: {a_c}')
        
        if idx < 8:
            index = [[idx*64, idx*64 + 64], [0, 64]]
        else:
            counter = 512
            index = [[idx*64-counter, idx*64 + 64 - counter], [64, 128]]
            counter -= 64
       
        run_cllection = [RunDirectory(f'{in_folder}/r{r_num:04d}/') for r_num in run_lbl[a_c]]
        print('Used runs: ',run_lbl[a_c])
        sel = [run_cllect.select(instrument_source, require_all=True) for run_cllect in run_cllection]
        runs_dict[idx] = {'runs': sel,
                          'module': channel,
                          'index': index
                        }

    return runs_dict

In [None]:
ASIC_code_list = []
for module in modules:
    start = datetime.now()

    ASIC_code = check_ASIC(in_folder+"/r{:04d}/", runs1, trains, module, 500.)
    print(ASIC_code)

    if 2 in ASIC_code:
        print('\n')
        ASIC_code = np.asarray(ASIC_code)
        ASIC_code2 = check_ASIC(in_folder+"/r{:04d}/", runs2, trains, module, 100.)
        ASIC_code2 = np.array(ASIC_code2)
        ASIC_code2[np.where(ASIC_code == 1)[0]] = 1
        if 2 in ASIC_code2:
            substitute = np.where(ASIC_code2 == 2)[0]
            ASIC_code[substitute] = 3
    if 3 in ASIC_code:
        print('\n')
        ASIC_code3 = check_ASIC(in_folder+"/r{:04d}/", runs3, trains, module, 100.)
        ASIC_code3 = np.array(ASIC_code3)
        ASIC_code3[np.where(ASIC_code == 1)[0]] = 1
        ASIC_code3[np.where(ASIC_code == 2)[0]] = 5
        if 2 in ASIC_code3:
            substitute = np.where(ASIC_code3 == 2)[0]
            ASIC_code[substitute] = 4
    ASIC_code_list.append(ASIC_code)
    end = datetime.now() - start
    print('\nDuration of check: ', end)

In [None]:
runs_dict_list = []
for module in range(len(modules)):
    runs_dict_list.append(defineInitParams(module))

In [None]:
def process_module(channel, runs_dict, cell_id):
    """Merge runs to create complete image"""

    instrument_source = karabo_id+'/DET/{}CH0:xtdf'.format(runs_dict[channel][0]['module'])
    print(instrument_source)

    context= psh.context.ProcessContext(num_workers=4)
    cs_data = {'analog': context.alloc(shape=(trains, 352, 128, 512), dtype=np.float32),
              }   
    
    def fill_ASIC(worker_id, array_index, asic):
        run_asic = runs_dict[channel][asic]['runs']
        module = runs_dict[channel][asic]['module']
        index = runs_dict[channel][asic]['index']
        print("Processing Module: {}, ASIC: {}, Index: {}".format(module, asic, index))
        
        for counter, r in enumerate(run_asic):
            d = r.get_array(instrument_source, 'image.data')[:trains*352, 0, index[0][0]:index[0][1], index[1][0]:index[1][1]]
            d = d.values.reshape((d.shape[0] // mem_cells, mem_cells, 1, 512//8, 128//2))
            d = np.moveaxis(d, 4, 3)
            d = xarray.DataArray(d, coords={'cell': range(0, 352)}, dims=["train", "cell", "output", "x", "y"])
            d = d.sel(cell=range(cell_id[0], cell_id[1])) # select chunk of mem cells

            if asic < 8:
                for i in range(3-counter, 64, 4):
                    cs_data['analog'][..., index[1][0]:index[1][1], index[0][0]+i] = d[:, :, 0, :, i]
            else:
                for i in range(counter, 64, 4):
                    cs_data['analog'][..., index[1][0]:index[1][1], index[0][0]+i] = d[:, :, 0, :, i]
            
    context.map(fill_ASIC, range(0,16))
     
    return modules[channel], cs_data['analog']

In [None]:
start = datetime.now()

run_lbl = [0, runs1, runs2, runs3, runs4]

for module in range(len(modules)):
    channel = modules[module]
    values, counts = np.unique(ASIC_code_list[module], return_counts=True)
    apendix = []
    for val in values:
        apendix.append('r{}-{}'.format(run_lbl[val][0], run_lbl[val][-1]))

    merged_file = "{}/agipd_CH{:02d}_cs_".format(out_folder, channel)+"_".join(apendix)+'.h5'
    print('Saving to:',merged_file)
    
    with h5py.File(merged_file, "w") as f:
        dset = f.require_dataset("{}/Analog/data".format(channel), (trains, mem_cells, 128, 512), dtype='float32')
        for cell in range(0,352,352):
            cell_id = [cell, cell+352]
            cs_data = process_module(module, runs_dict_list, cell_id) #cs_data[0] - channel, cs_data[1] - analog, cs_data[2] digital
            dset[:, cell_id[0]:cell_id[1], ...] = cs_data[1]
            f.flush()
            
    end = datetime.now() - start
    print(f'Duration of merging for module {channel}: ', end)

### Checking the resulting data after merging

In [None]:
cs_data = []
with h5py.File(merged_file, 'r') as f:
    cs_data.append(np.array(f[f'/{modules[0]}/Analog/data']))

trains = cs_data[0].shape[0]

In [None]:
mc = 4
burst_of_interest = [10, 300, -50]
im_ratio = cs_data[0][0, 0, ...].shape[1] / cs_data[0][0, 0, ...].shape[0]
vmin = 0.9*np.mean(cs_data[0][mc, ...])
vmax = 14000

fig, axs = plt.subplots(len(burst_of_interest), 1, figsize=(10,10))
fig.subplots_adjust(hspace=0.5)
yticks_major=np.arange(0,129,64)
xticks_major=np.arange(0,512,64)
for i, ax in enumerate(axs):
    im = ax.imshow(cs_data[0][burst_of_interest[i], mc, ...], 
                   cmap='jet', vmin=vmin, vmax=vmax)
    cbar = fig.colorbar(im, ax=ax,fraction=0.0355*im_ratio, 
                        pad=0.15, orientation='horizontal')
    cbar.set_label('CS signal (ADU)')
    b = burst_of_interest[i]
    if burst_of_interest[i] < 0:
        b = trains + b
    ax.set_title("Scan point: {}, Cell: {}".format(b, mc))
    ax.set_xticks(xticks_major)
    ax.xaxis.grid(True, which='major', linewidth=0.5, color='grey')
    ax.set_yticks(yticks_major)
    ax.yaxis.grid(True, which='major', linewidth=0.5, color='grey')
    ax.set_xlabel('Columns')
    ax.set_ylabel('Rows')

In [None]:
a = np.arange(0,increments[0],steps[0])
b = np.arange(a[-1]+1,a[-1]+1+increments[1]*steps[1], steps[1])
c = np.arange(b[-1]+1,b[-1]+1+increments[2]*steps[2], steps[2])

The following plots show integrated signal over the scan for a single pixel in each ASIC for a memory cell # 4.

In [None]:
row = 81
diff = np.sum(increments)-trains
mc = 4

fig, axes = plt.subplots(1, 8, figsize=(14, 2), sharey=True, constrained_layout=True) 
for asic, col in enumerate(range(35,485,64), start=8):
    axes[asic-8].plot(a, cs_data[0][:increments[0],mc,row,col], ls="None", marker='.', label='step size: 1 clk')
    axes[asic-8].plot(b, cs_data[0][increments[0]:np.sum(increments[:2]),mc,row,col], ls="None", marker='.', label='step size: 25 clk')
    axes[asic-8].plot(c[:-diff], cs_data[0][np.sum(increments[:2]):,mc,row,col], ls="None", marker='.', label='step size: 100 clk')
    axes[asic-8].set_xlabel(asic)
    axes[asic-8].set_xscale('log')
for ax in axes:
    ax.set_xticks([])
    ax.grid()

In [None]:
row = 31
diff = np.sum(increments)-trains
mc = 4

fig, axes = plt.subplots(1, 8, figsize=(14, 2), sharey=True, constrained_layout=True) 
for asic, col in enumerate(range(35,485,64)):
    axes[asic].plot(a, cs_data[0][:increments[0],mc,row,col], ls="None", marker='.', label='step size: 1 clk')
    axes[asic].plot(b, cs_data[0][increments[0]:np.sum(increments[:2]),mc,row,col], ls="None", marker='.', label='step size: 25 clk')
    axes[asic].plot(c[:-diff], cs_data[0][np.sum(increments[:2]):,mc,row,col], ls="None", marker='.', label='step size: 100 clk')
    axes[asic].set_xlabel(asic)
    axes[asic].set_xscale('log')
for ax in axes:
    ax.set_xticks([])
    ax.grid()