{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# LPD Offline Correction #\n", "\n", "Author: European XFEL Data Analysis Group" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2018-12-03T15:19:56.056417Z", "start_time": "2018-12-03T15:19:56.003012Z" } }, "outputs": [], "source": [ "# Input parameters\n", "in_folder = \"/gpfs/exfel/exp/FXE/202201/p003073/raw/\" # the folder to read data from, required\n", "out_folder = \"/gpfs/exfel/data/scratch/schmidtp/random/LPD_test\" # the folder to output to, required\n", "metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.\n", "sequences = [-1] # Sequences to correct, use [-1] for all\n", "modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty\n", "karabo_da = ['all'] # Data aggregators names to correct, use [''] for all\n", "run = 10 # run to process, required\n", "\n", "# Source parameters\n", "karabo_id = 'FXE_DET_LPD1M-1' # Karabo domain for detector.\n", "input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source.\n", "output_source = '' # Output fast data source, empty to use same as input.\n", "\n", "# CalCat parameters\n", "creation_time = \"\" # The timestamp to use with Calibration DB. Required Format: \"YYYY-MM-DD hh:mm:ss\" e.g. 2019-07-04 11:02:41\n", "cal_db_interface = '' # Not needed, compatibility with current webservice.\n", "cal_db_timeout = 0 # Not needed, compatbility with current webservice.\n", "cal_db_root = '/gpfs/exfel/d/cal/caldb_store'\n", "\n", "# Operating conditions\n", "mem_cells = 512 # Memory cells, LPD constants are always taken with 512 cells.\n", "bias_voltage = 250.0 # Detector bias voltage.\n", "capacitor = '5pF' # Capacitor setting: 5pF or 50pF\n", "photon_energy = 9.2 # Photon energy in keV.\n", "category = 0 # Whom to blame.\n", "\n", "# Correction parameters\n", "offset_corr = True # Offset correction.\n", "rel_gain = True # Gain correction based on RelativeGain constant.\n", "ff_map = True # Gain correction based on FFMap constant.\n", "gain_amp_map = True # Gain correction based on GainAmpMap constant.\n", "\n", "# Output options\n", "overwrite = True # set to True if existing data should be overwritten\n", "chunks_data = 1 # HDF chunk size for pixel data in number of frames.\n", "chunks_ids = 32 # HDF chunk size for cellId and pulseId datasets.\n", "create_virtual_cxi_in = '' # Folder to create virtual CXI files in (for each sequence).\n", "\n", "# Parallelization options\n", "sequences_per_node = 1 # Sequence files to process per node\n", "max_nodes = 8 # Maximum number of SLURM jobs to split correction work into\n", "num_workers = 8 # Worker processes per node, 8 is safe on 768G nodes but won't work on 512G.\n", "num_threads_per_worker = 32 # Number of threads per worker.\n", "\n", "def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes):\n", " from xfel_calibrate.calibrate import balance_sequences as bs\n", " return bs(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes=max_nodes)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2018-12-03T15:19:56.990566Z", "start_time": "2018-12-03T15:19:56.058378Z" } }, "outputs": [], "source": [ "from collections import OrderedDict\n", "from pathlib import Path\n", "from time import perf_counter\n", "import gc\n", "import re\n", "import warnings\n", "\n", "import numpy as np\n", "import h5py\n", "\n", "import matplotlib\n", "matplotlib.use('agg')\n", "import matplotlib.pyplot as plt\n", "%matplotlib inline\n", "\n", "from calibration_client import CalibrationClient\n", "from calibration_client.modules import CalibrationConstantVersion\n", "import extra_data as xd\n", "import extra_geom as xg\n", "import pasha as psh\n", "\n", "from extra_data.components import LPD1M\n", "\n", "from cal_tools.calcat_interface import CalCatError, LPD_CalibrationData\n", "from cal_tools.lpdalgs import correct_lpd_frames\n", "from cal_tools.tools import CalibrationMetadata, calcat_creation_time\n", "from cal_tools.files import DataFile\n", "from cal_tools.restful_config import restful_config" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Prepare environment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "file_re = re.compile(r'^RAW-R(\\d{4})-(\\w+\\d+)-S(\\d{5})$') # This should probably move to cal_tools\n", "\n", "run_folder = Path(in_folder) / f'r{run:04d}'\n", "out_folder = Path(out_folder)\n", "out_folder.mkdir(exist_ok=True)\n", "\n", "output_source = output_source or input_source\n", "\n", "cal_db_root = Path(cal_db_root)\n", "\n", "metadata = CalibrationMetadata(metadata_folder or out_folder)\n", "# Constant paths & timestamps are saved under retrieved-constants in calibration_metadata.yml\n", "retrieved_constants = metadata.setdefault(\"retrieved-constants\", {})\n", "\n", "creation_time = calcat_creation_time(in_folder, run, creation_time)\n", "print(f'Using {creation_time.isoformat()} as creation time')\n", "\n", "# Pick all modules/aggregators or those selected.\n", "if karabo_da == ['all']:\n", " if modules == [-1]:\n", " modules = list(range(16))\n", "\n", " karabo_da = [f'LPD{i:02d}' for i in modules]\n", "else:\n", " modules = [int(x[-2:]) for x in karabo_da]\n", " \n", "# Pick all sequences or those selected.\n", "if not sequences or sequences == [-1]:\n", " do_sequence = lambda seq: True\n", "else:\n", " do_sequence = [int(x) for x in sequences].__contains__ \n", " \n", "# List of detector sources.\n", "det_inp_sources = [input_source.format(karabo_id=karabo_id, module_index=int(da[-2:])) for da in karabo_da]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Select data to process" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_to_process = []\n", "\n", "for inp_path in run_folder.glob('RAW-*.h5'):\n", " match = file_re.match(inp_path.stem)\n", " \n", " if match[2] not in karabo_da or not do_sequence(int(match[3])):\n", " continue\n", " \n", " outp_path = out_folder / 'CORR-R{run:04d}-{aggregator}-S{seq:05d}.h5'.format(\n", " run=int(match[1]), aggregator=match[2], seq=int(match[3]))\n", "\n", " data_to_process.append((match[2], inp_path, outp_path))\n", "\n", "print('Files to process:')\n", "for data_descr in sorted(data_to_process, key=lambda x: f'{x[0]}{x[1]}'):\n", " print(f'{data_descr[0]}\\t{data_descr[1]}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Obtain and prepare calibration constants" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "metadata = CalibrationMetadata(metadata_folder or out_folder)\n", "# Constant paths & timestamps are saved under retrieved-constants in calibration_metadata.yml\n", "const_yaml = metadata.setdefault(\"retrieved-constants\", {})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "const_data = {}\n", "const_load_mp = psh.ProcessContext(num_workers=24)\n", "\n", "if const_yaml: # Read constants from YAML file.\n", " start = perf_counter()\n", " for da, ccvs in const_yaml.items():\n", "\n", " for calibration_name, ccv in ccvs['constants'].items():\n", "\n", " dtype = np.uint32 if calibration_name.startswith('BadPixels') else np.float32\n", "\n", " const_data[(da, calibration_name)] = dict(\n", " path=Path(ccv['path']),\n", " dataset=ccv['dataset'],\n", " data=const_load_mp.alloc(shape=(256, 256, mem_cells, 3), dtype=dtype)\n", " )\n", "else: # Retrieve constants from CALCAT.\n", " print('Querying calibration database', end='', flush=True)\n", " start = perf_counter()\n", " for calibrations, condition in [\n", " (dark_calibrations, dark_condition),\n", " (illuminated_calibrations, illuminated_condition)\n", " ]:\n", " resp = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions(\n", " client, karabo_id, list(calibrations.keys()),\n", " {'parameters_conditions_attributes': condition},\n", " karabo_da='', event_at=creation_time.isoformat()\n", " )\n", "\n", " if not resp['success']:\n", " raise RuntimeError(resp)\n", "\n", " for ccv in resp['data']:\n", " cc = ccv['calibration_constant']\n", " da = ccv['physical_detector_unit']['karabo_da']\n", " calibration_name = calibrations[cc['calibration_id']]\n", " \n", " dtype = np.uint32 if calibration_name.startswith('BadPixels') else np.float32\n", " \n", " const_data[(da, calibration_name)] = dict(\n", " path=Path(ccv['path_to_file']) / ccv['file_name'],\n", " dataset=ccv['data_set_name'],\n", " data=const_load_mp.alloc(shape=(256, 256, mem_cells, 3), dtype=dtype)\n", " )\n", " print('.', end='', flush=True)\n", " \n", "total_time = perf_counter() - start\n", "print(f'{total_time:.1f}s')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def load_constant_dataset(wid, index, const_descr):\n", " ccv_entry = const_data[const_descr]\n", " \n", " with h5py.File(cal_db_root / ccv_entry['path'], 'r') as fp:\n", " fp[ccv_entry['dataset'] + '/data'].read_direct(ccv_entry['data'])\n", " \n", " print('.', end='', flush=True)\n", "\n", "print('Loading calibration data', end='', flush=True)\n", "start = perf_counter()\n", "const_load_mp.map(load_constant_dataset, list(const_data.keys()))\n", "total_time = perf_counter() - start\n", "\n", "print(f'{total_time:.1f}s')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# These are intended in order cell, X, Y, gain\n", "ccv_offsets = {}\n", "ccv_gains = {}\n", "ccv_masks = {}\n", "\n", "ccv_shape = (mem_cells, 256, 256, 3)\n", "\n", "constant_order = {\n", " 'Offset': (2, 1, 0, 3),\n", " 'BadPixelsDark': (2, 1, 0, 3),\n", " 'RelativeGain': (2, 1, 0, 3),\n", " 'FFMap': (2, 0, 1, 3),\n", " 'BadPixelsFF': (2, 0, 1, 3),\n", " 'GainAmpMap': (2, 0, 1, 3),\n", "}\n", "\n", "def prepare_constants(wid, index, aggregator):\n", " consts = {calibration_name: entry['data']\n", " for (aggregator_, calibration_name), entry\n", " in const_data.items()\n", " if aggregator == aggregator_}\n", " \n", " def _prepare_data(calibration_name, dtype):\n", " return consts[calibration_name] \\\n", " .transpose(constant_order[calibration_name]) \\\n", " .astype(dtype, copy=True) # Make sure array is contiguous.\n", " \n", " if offset_corr and 'Offset' in consts:\n", " ccv_offsets[aggregator] = _prepare_data('Offset', np.float32)\n", " else:\n", " ccv_offsets[aggregator] = np.zeros(ccv_shape, dtype=np.float32)\n", " \n", " ccv_gains[aggregator] = np.ones(ccv_shape, dtype=np.float32)\n", " \n", " if 'BadPixelsDark' in consts:\n", " ccv_masks[aggregator] = _prepare_data('BadPixelsDark', np.uint32)\n", " else:\n", " ccv_masks[aggregator] = np.zeros(ccv_shape, dtype=np.uint32)\n", " \n", " if rel_gain and 'RelativeGain' in consts:\n", " ccv_gains[aggregator] *= _prepare_data('RelativeGain', np.float32)\n", " \n", " if ff_map and 'FFMap' in consts:\n", " ccv_gains[aggregator] *= _prepare_data('FFMap', np.float32)\n", " \n", " if 'BadPixelsFF' in consts:\n", " np.bitwise_or(ccv_masks[aggregator], _prepare_data('BadPixelsFF', np.uint32),\n", " out=ccv_masks[aggregator])\n", " \n", " if gain_amp_map and 'GainAmpMap' in consts:\n", " ccv_gains[aggregator] *= _prepare_data('GainAmpMap', np.float32)\n", " \n", " print('.', end='', flush=True)\n", " \n", "\n", "print('Preparing constants', end='', flush=True)\n", "start = perf_counter()\n", "psh.ThreadContext(num_workers=len(karabo_da)).map(prepare_constants, karabo_da)\n", "total_time = perf_counter() - start\n", "print(f'{total_time:.1f}s')\n", "\n", "const_data.clear() # Clear raw constants data now to save memory.\n", "gc.collect();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def correct_file(wid, index, work):\n", " aggregator, inp_path, outp_path = work\n", " module_index = int(aggregator[-2:])\n", " \n", " start = perf_counter()\n", " dc = xd.H5File(inp_path, inc_suspect_trains=False).select('*', 'image.*', require_all=True)\n", " inp_source = dc[input_source.format(karabo_id=karabo_id, module_index=module_index)]\n", " open_time = perf_counter() - start\n", " \n", " # Load raw data for this file.\n", " # Reshaping gets rid of the extra 1-len dimensions without\n", " # mangling the frame axis for an actual frame count of 1.\n", " start = perf_counter()\n", " in_raw = inp_source['image.data'].ndarray().reshape(-1, 256, 256)\n", " in_cell = inp_source['image.cellId'].ndarray().reshape(-1)\n", " in_pulse = inp_source['image.pulseId'].ndarray().reshape(-1)\n", " read_time = perf_counter() - start\n", " \n", " # Allocate output arrays.\n", " out_data = np.zeros((in_raw.shape[0], 256, 256), dtype=np.float32)\n", " out_gain = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint8)\n", " out_mask = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint32)\n", " \n", " start = perf_counter()\n", " correct_lpd_frames(in_raw, in_cell,\n", " out_data, out_gain, out_mask,\n", " ccv_offsets[aggregator], ccv_gains[aggregator], ccv_masks[aggregator],\n", " num_threads=num_threads_per_worker)\n", " correct_time = perf_counter() - start\n", " \n", " image_counts = inp_source['image.data'].data_counts(labelled=False)\n", " \n", " start = perf_counter()\n", " if (not outp_path.exists() or overwrite) and image_counts.sum() > 0:\n", " outp_source_name = output_source.format(karabo_id=karabo_id, module_index=module_index)\n", "\n", " with DataFile(outp_path, 'w') as outp_file: \n", " outp_file.create_index(dc.train_ids, from_file=dc.files[0])\n", " outp_file.create_metadata(like=dc, instrument_channels=(f'{outp_source_name}/image',))\n", " \n", " outp_source = outp_file.create_instrument_source(outp_source_name)\n", " \n", " outp_source.create_index(image=image_counts)\n", " outp_source.create_key('image.cellId', data=in_cell,\n", " chunks=(min(chunks_ids, in_cell.shape[0]),))\n", " outp_source.create_key('image.pulseId', data=in_pulse,\n", " chunks=(min(chunks_ids, in_pulse.shape[0]),))\n", " outp_source.create_key('image.data', data=out_data,\n", " chunks=(min(chunks_data, out_data.shape[0]), 256, 256))\n", " outp_source.create_compressed_key('image.gain', data=out_gain)\n", " outp_source.create_compressed_key('image.mask', data=out_mask)\n", " write_time = perf_counter() - start\n", " \n", " total_time = open_time + read_time + correct_time + write_time\n", " frame_rate = in_raw.shape[0] / total_time\n", " \n", " print('{}\\t{}\\t{:.3f}\\t{:.3f}\\t{:.3f}\\t{:.3f}\\t{:.3f}\\t{}\\t{:.1f}'.format(\n", " wid, aggregator, open_time, read_time, correct_time, write_time, total_time,\n", " in_raw.shape[0], frame_rate))\n", " \n", " in_raw = None\n", " in_cell = None\n", " in_pulse = None\n", " out_data = None\n", " out_gain = None\n", " out_mask = None\n", " gc.collect()\n", "\n", "print('worker\\tDA\\topen\\tread\\tcorrect\\twrite\\ttotal\\tframes\\trate')\n", "start = perf_counter()\n", "psh.ProcessContext(num_workers=num_workers).map(correct_file, data_to_process)\n", "total_time = perf_counter() - start\n", "print(f'Total time: {total_time:.1f}s')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Data preview for first train" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "geom = xg.LPD_1MGeometry.from_quad_positions(\n", " [(11.4, 299), (-11.5, 8), (254.5, -16), (278.5, 275)])\n", "\n", "output_paths = [outp_path for _, _, outp_path in data_to_process if outp_path.exists()]\n", "dc = xd.DataCollection.from_paths(output_paths).select_trains(np.s_[0])\n", "\n", "det = LPD1M(dc, detector_name=karabo_id)\n", "data = det.get_array('image.data')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Intensity histogram across all cells" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "left_edge_ratio = 0.01\n", "right_edge_ratio = 0.99\n", "\n", "fig, ax = plt.subplots(num=1, clear=True, figsize=(15, 6))\n", "values, bins, _ = ax.hist(np.ravel(data.data), bins=2000, range=(-1500, 2000))\n", "\n", "def find_nearest_index(array, value):\n", " return (np.abs(array - value)).argmin()\n", "\n", "cum_values = np.cumsum(values)\n", "vmin = bins[find_nearest_index(cum_values, cum_values[-1]*left_edge_ratio)]\n", "vmax = bins[find_nearest_index(cum_values, cum_values[-1]*right_edge_ratio)]\n", "\n", "max_value = values.max()\n", "ax.vlines([vmin, vmax], 0, max_value, color='red', linewidth=5, alpha=0.2)\n", "ax.text(vmin, max_value, f'{left_edge_ratio*100:.0f}%',\n", " color='red', ha='center', va='bottom', size='large')\n", "ax.text(vmax, max_value, f'{right_edge_ratio*100:.0f}%',\n", " color='red', ha='center', va='bottom', size='large')\n", "ax.text(vmax+(vmax-vmin)*0.01, max_value/2, 'Colormap interval',\n", " color='red', rotation=90, ha='left', va='center', size='x-large')\n", "\n", "ax.set_xlim(vmin-(vmax-vmin)*0.1, vmax+(vmax-vmin)*0.1)\n", "ax.set_ylim(0, max_value*1.1)\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### First memory cell" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, ax = plt.subplots(num=2, figsize=(15, 15), clear=True, nrows=1, ncols=1)\n", "geom.plot_data_fast(data[:, 0, 0], ax=ax, vmin=vmin, vmax=vmax)\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train average" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2018-11-13T18:24:57.547563Z", "start_time": "2018-11-13T18:24:56.995005Z" } }, "outputs": [], "source": [ "fig, ax = plt.subplots(num=3, figsize=(15, 15), clear=True, nrows=1, ncols=1)\n", "geom.plot_data_fast(data[:, 0].mean(axis=1), ax=ax, vmin=vmin, vmax=vmax)\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Lowest gain stage per pixel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "highest_gain_stage = det.get_array('image.gain', pulses=np.s_[:]).max(axis=(1, 2))\n", "\n", "fig, ax = plt.subplots(num=4, figsize=(15, 15), clear=True, nrows=1, ncols=1)\n", "p = geom.plot_data_fast(highest_gain_stage, ax=ax, vmin=0, vmax=2);\n", "\n", "cb = ax.images[0].colorbar\n", "cb.set_ticks([0, 1, 2])\n", "cb.set_ticklabels(['High gain', 'Medium gain', 'Low gain'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create virtual CXI file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if create_virtual_cxi_in:\n", " vcxi_folder = Path(create_virtual_cxi_in.format(\n", " run=run, proposal_folder=str(Path(in_folder).parent)))\n", " vcxi_folder.mkdir(parents=True, exist_ok=True)\n", " \n", " def sort_files_by_seq(by_seq, outp_path):\n", " by_seq.setdefault(int(outp_path.stem[-5:]), []).append(outp_path)\n", " return by_seq\n", " \n", " from functools import reduce\n", " reduce(sort_files_by_seq, output_paths, output_by_seq := {})\n", " \n", " for seq_number, seq_output_paths in output_by_seq.items():\n", " # Create data collection and detector components only for this sequence.\n", " try:\n", " det = LPD1M(xd.DataCollection.from_paths(seq_output_paths), detector_name=karabo_id, min_modules=4)\n", " except ValueError: # Couldn't find enough data for min_modules\n", " continue\n", " det.write_virtual_cxi(vcxi_folder / f'VCXI-LPD-R{run:04d}-S{seq_number:05d}.cxi')" ] } ], "metadata": { "kernelspec": { "display_name": ".cal2_venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.11 (default, Jul 2 2021, 14:23:46) \n[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)]" }, "vscode": { "interpreter": { "hash": "8817b9fbc45d4ec68a62bc54fa52c1c5ac00d5619f7fe87d01def1798ea4889e" } } }, "nbformat": 4, "nbformat_minor": 2 }