diff --git a/notebooks/LPD/LPD_Correct_Fast.ipynb b/notebooks/LPD/LPD_Correct_Fast.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..bbd74430163ba8ffcd5fb016f49beee4d75685d3 --- /dev/null +++ b/notebooks/LPD/LPD_Correct_Fast.ipynb @@ -0,0 +1,615 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# LPD Offline Correction #\n", + "\n", + "Author: European XFEL Data Analysis Group" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "ExecuteTime": { + "end_time": "2018-12-03T15:19:56.056417Z", + "start_time": "2018-12-03T15:19:56.003012Z" + } + }, + "outputs": [], + "source": [ + "# Input parameters\n", + "in_folder = \"/gpfs/exfel/exp/FXE/202201/p003073/raw/\" # the folder to read data from, required\n", + "out_folder = \"/gpfs/exfel/data/scratch/schmidtp/random/LPD_test\" # the folder to output to, required\n", + "metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.\n", + "sequences = [-1] # Sequences to correct, use [-1] for all\n", + "modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty\n", + "karabo_da = [''] # Data aggregators names to correct, use [''] for all\n", + "run = 10 # runs to process, required\n", + "\n", + "# Source parameters\n", + "karabo_id = 'FXE_DET_LPD1M-1' # Karabo domain for detector.\n", + "input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source.\n", + "output_source = '' # Output fast data source, empty to use same as input.\n", + "\n", + "# CalCat parameters\n", + "use_dir_creation_date = True # Use the creation date of the directory for database time derivation.\n", + "cal_db_interface = '' # Not needed, compatibility with current webservice.\n", + "cal_db_timeout = 0 # Not needed, compatbility with current webservice.\n", + "cal_db_root = '/gpfs/exfel/d/cal/caldb_store'\n", + "\n", + "# Operating conditions\n", + "mem_cells = 512 # Memory cells, LPD constants are always taken with 512 cells.\n", + "bias_voltage = 250.0 # Detector bias voltage.\n", + "capacitor = '5pF' # Capacitor setting: 5pF or 50pF\n", + "photon_energy = 9.2 # Photon energy in keV.\n", + "category = 0 # Whom to blame.\n", + "\n", + "# Correction parameters\n", + "offset_corr = True # Offset correction.\n", + "rel_gain = True # Gain correction based on RelativeGain constant.\n", + "ff_map = True # Gain correction based on FFMap constant.\n", + "gain_amp_map = True # Gain correction based on GainAmpMap constant.\n", + "\n", + "# Output options\n", + "overwrite = True # set to True if existing data should be overwritten\n", + "chunks_data = 32 # HDF chunk size for pixel data in number of frames.\n", + "chunks_ids = 32 # HDF chunk size for cellId and pulseId datasets.\n", + "\n", + "# Parallelization options\n", + "sequences_per_node = 1 # Sequence files to process per node\n", + "num_workers = 8 # Worker processes per node, 8 is safe on 768G nodes but won't work on 512G.\n", + "num_threads_per_worker = 32 # Number of threads per worker.\n", + "\n", + "def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da):\n", + " from xfel_calibrate.calibrate import balance_sequences as bs\n", + " return bs(in_folder, run, sequences, sequences_per_node, karabo_da)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "ExecuteTime": { + "end_time": "2018-12-03T15:19:56.990566Z", + "start_time": "2018-12-03T15:19:56.058378Z" + } + }, + "outputs": [], + "source": [ + "from collections import OrderedDict\n", + "from pathlib import Path\n", + "from time import perf_counter\n", + "import gc\n", + "import re\n", + "import warnings\n", + "\n", + "import numpy as np\n", + "import h5py\n", + "\n", + "import matplotlib\n", + "matplotlib.use('agg')\n", + "import matplotlib.pyplot as plt\n", + "%matplotlib inline\n", + "\n", + "from calibration_client import CalibrationClient\n", + "from calibration_client.modules import CalibrationConstantVersion\n", + "import extra_data as xd\n", + "import extra_geom as xg\n", + "import pasha as psh\n", + "\n", + "from extra_data.components import LPD1M\n", + "\n", + "from cal_tools.lpdalgs import correct_lpd_frames\n", + "from cal_tools.tools import CalibrationMetadata, get_dir_creation_date, write_compressed_frames\n", + "from cal_tools.files import DataFile\n", + "from cal_tools.restful_config import restful_config" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Prepare environment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "file_re = re.compile(r'^RAW-R(\\d{4})-(\\w+\\d+)-S(\\d{5})$') # This should probably move to cal_tools\n", + "\n", + "run_folder = Path(in_folder) / f'r{run:04d}'\n", + "out_folder = Path(out_folder)\n", + "out_folder.mkdir(exist_ok=True)\n", + "\n", + "output_source = output_source or input_source\n", + "\n", + "cal_db_root = Path(cal_db_root)\n", + "\n", + "metadata = CalibrationMetadata(metadata_folder or out_folder)\n", + "\n", + "if use_dir_creation_date:\n", + " creation_time = get_dir_creation_date(in_folder, run) \n", + "else:\n", + " from datetime import datetime\n", + " creation_time = datetime.now()\n", + " \n", + "print(f'Using {creation_time.isoformat()} as creation time')\n", + "\n", + "# Pick all modules/aggregators or those selected.\n", + "if not karabo_da or karabo_da == ['']:\n", + " if not modules or modules == [-1]:\n", + " modules = list(range(16))\n", + "\n", + " karabo_da = [f'LPD{i:02d}' for i in modules]\n", + " \n", + "# Pick all sequences or those selected.\n", + "if not sequences or sequences == [-1]:\n", + " do_sequence = lambda seq: True\n", + "else:\n", + " do_sequence = [int(x) for x in sequences].__contains__ \n", + " \n", + "# List of detector sources.\n", + "det_inp_sources = [input_source.format(karabo_id=karabo_id, module_index=int(da[-2:])) for da in karabo_da]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Select data to process" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data_to_process = []\n", + "\n", + "for inp_path in run_folder.glob('RAW-*.h5'):\n", + " match = file_re.match(inp_path.stem)\n", + " \n", + " if match[2] not in karabo_da or not do_sequence(int(match[3])):\n", + " continue\n", + " \n", + " outp_path = out_folder / 'CORR-R{run:04d}-{aggregator}-S{seq:05d}.h5'.format(\n", + " run=int(match[1]), aggregator=match[2], seq=int(match[3]))\n", + "\n", + " data_to_process.append((match[2], inp_path, outp_path))\n", + "\n", + "print('Files to process:')\n", + "for data_descr in sorted(data_to_process, key=lambda x: f'{x[0]}{x[1]}'):\n", + " print(f'{data_descr[0]}\\t{data_descr[1]}')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Obtain and prepare calibration constants" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Connect to CalCat.\n", + "calcat_config = restful_config['calcat']\n", + "client = CalibrationClient(\n", + " client_id=calcat_config['user-id'],\n", + " client_secret=calcat_config['user-secret'],\n", + " user_email=calcat_config['user-email'],\n", + " base_api_url=calcat_config['base-api-url'],\n", + " token_url=calcat_config['token-url'],\n", + " refresh_url=calcat_config['refresh-url'],\n", + " auth_url=calcat_config['auth-url'],\n", + " scope='')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dark_calibrations = {\n", + " 1: 'Offset', # np.float32\n", + " 14: 'BadPixelsDark' # should be np.uint32, but is np.float64\n", + "}\n", + "\n", + "dark_condition = [\n", + " dict(parameter_id=1, value=bias_voltage), # Sensor bias voltage\n", + " dict(parameter_id=7, value=mem_cells), # Memory cells\n", + " dict(parameter_id=15, value=capacitor), # Feedback capacitor\n", + " dict(parameter_id=13, value=256), # Pixels X\n", + " dict(parameter_id=14, value=256), # Pixels Y\n", + "]\n", + "\n", + "illuminated_calibrations = {\n", + " 20: 'BadPixelsFF', # np.uint32\n", + " 42: 'GainAmpMap', # np.float32\n", + " 43: 'FFMap', # np.float32\n", + " 44: 'RelativeGain' # np.float32\n", + "}\n", + "\n", + "illuminated_condition = dark_condition.copy()\n", + "illuminated_condition += [\n", + " dict(parameter_id=3, value=photon_energy), # Source energy\n", + " dict(parameter_id=25, value=category) # category\n", + "]\n", + "\n", + "const_data = {}\n", + "const_load_mp = psh.ProcessContext(num_workers=24)\n", + "\n", + "print('Querying calibration database', end='', flush=True)\n", + "start = perf_counter()\n", + "for calibrations, condition in [\n", + " (dark_calibrations, dark_condition),\n", + " (illuminated_calibrations, illuminated_condition)\n", + "]:\n", + " resp = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions(\n", + " client, karabo_id, list(calibrations.keys()),\n", + " {'parameters_conditions_attributes': condition},\n", + " karabo_da='', event_at=creation_time.isoformat(), snapshot_at=None)\n", + "\n", + " if not resp['success']:\n", + " raise RuntimeError(resp)\n", + "\n", + " for ccv in resp['data']:\n", + " cc = ccv['calibration_constant']\n", + " da = ccv['physical_detector_unit']['karabo_da']\n", + " calibration_name = calibrations[cc['calibration_id']]\n", + " \n", + " dtype = np.uint32 if calibration_name.startswith('BadPixels') else np.float32\n", + " \n", + " const_data[(da, calibration_name)] = dict(\n", + " path=Path(ccv['path_to_file']) / ccv['file_name'],\n", + " dataset=ccv['data_set_name'],\n", + " data=const_load_mp.alloc(shape=(256, 256, mem_cells, 3), dtype=dtype)\n", + " )\n", + " print('.', end='', flush=True)\n", + " \n", + "total_time = perf_counter() - start\n", + "print(f'{total_time:.1f}s')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def load_constant_dataset(wid, index, const_descr):\n", + " ccv_entry = const_data[const_descr]\n", + " \n", + " with h5py.File(cal_db_root / ccv_entry['path'], 'r') as fp:\n", + " fp[ccv_entry['dataset'] + '/data'].read_direct(ccv_entry['data'])\n", + " \n", + " print('.', end='', flush=True)\n", + "\n", + "print('Loading calibration data', end='', flush=True)\n", + "start = perf_counter()\n", + "const_load_mp.map(load_constant_dataset, list(const_data.keys()))\n", + "total_time = perf_counter() - start\n", + "\n", + "print(f'{total_time:.1f}s')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# These are intended in order cell, X, Y, gain\n", + "ccv_offsets = {}\n", + "ccv_gains = {}\n", + "ccv_masks = {}\n", + "\n", + "ccv_shape = (mem_cells, 256, 256, 3)\n", + "\n", + "constant_order = {\n", + " 'Offset': (2, 1, 0, 3),\n", + " 'BadPixelsDark': (2, 1, 0, 3),\n", + " 'RelativeGain': (2, 1, 0, 3),\n", + " 'FFMap': (2, 0, 1, 3),\n", + " 'BadPixelsFF': (2, 0, 1, 3),\n", + " 'GainAmpMap': (2, 0, 1, 3),\n", + "}\n", + "\n", + "def prepare_constants(wid, index, aggregator):\n", + " consts = {calibration_name: entry['data']\n", + " for (aggregator_, calibration_name), entry\n", + " in const_data.items()\n", + " if aggregator == aggregator_}\n", + " \n", + " def _prepare_data(calibration_name, dtype):\n", + " return consts[calibration_name] \\\n", + " .transpose(constant_order[calibration_name]) \\\n", + " .astype(dtype, copy=True) # Make sure array is contiguous.\n", + " \n", + " if offset_corr and 'Offset' in consts:\n", + " ccv_offsets[aggregator] = _prepare_data('Offset', np.float32)\n", + " else:\n", + " ccv_offsets[aggregator] = np.zeros(ccv_shape, dtype=np.float32)\n", + " \n", + " ccv_gains[aggregator] = np.ones(ccv_shape, dtype=np.float32)\n", + " \n", + " if 'BadPixelsDark' in consts:\n", + " ccv_masks[aggregator] = _prepare_data('BadPixelsDark', np.uint32)\n", + " else:\n", + " ccv_masks[aggregator] = np.zeros(ccv_shape, dtype=np.uint32)\n", + " \n", + " if rel_gain and 'RelativeGain' in consts:\n", + " ccv_gains[aggregator] *= _prepare_data('RelativeGain', np.float32)\n", + " \n", + " if ff_map and 'FFMap' in consts:\n", + " ccv_gains[aggregator] *= _prepare_data('FFMap', np.float32)\n", + " \n", + " if 'BadPixelsFF' in consts:\n", + " np.bitwise_or(ccv_masks[aggregator], _prepare_data('BadPixelsFF', np.uint32),\n", + " out=ccv_masks[aggregator])\n", + " \n", + " if gain_amp_map and 'GainAmpMap' in consts:\n", + " ccv_gains[aggregator] *= _prepare_data('GainAmpMap', np.float32)\n", + " \n", + " print('.', end='', flush=True)\n", + " \n", + "\n", + "print('Preparing constants', end='', flush=True)\n", + "start = perf_counter()\n", + "psh.ThreadContext(num_workers=len(karabo_da)).map(prepare_constants, karabo_da)\n", + "total_time = perf_counter() - start\n", + "print(f'{total_time:.1f}s')\n", + "\n", + "const_data.clear() # Clear raw constants data now to save memory.\n", + "gc.collect();" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def correct_file(wid, index, work):\n", + " aggregator, inp_path, outp_path = work\n", + " module_index = int(aggregator[-2:])\n", + " \n", + " start = perf_counter()\n", + " dc = xd.H5File(inp_path).select('*', 'image.*', require_all=True)\n", + " inp_source = dc[input_source.format(karabo_id=karabo_id, module_index=module_index)]\n", + " open_time = perf_counter() - start\n", + " \n", + " # Load raw data for this file.\n", + " start = perf_counter()\n", + " in_raw = inp_source['image.data'].ndarray().squeeze()\n", + " in_cell = inp_source['image.cellId'].ndarray().squeeze()\n", + " in_pulse = inp_source['image.pulseId'].ndarray().squeeze()\n", + " read_time = perf_counter() - start\n", + " \n", + " # Allocate output arrays.\n", + " out_data = np.zeros((in_raw.shape[0], 256, 256), dtype=np.float32)\n", + " out_gain = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint8)\n", + " out_mask = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint32)\n", + " \n", + " start = perf_counter()\n", + " correct_lpd_frames(in_raw, in_cell,\n", + " out_data, out_gain, out_mask,\n", + " ccv_offsets[aggregator], ccv_gains[aggregator], ccv_masks[aggregator],\n", + " num_threads=num_threads_per_worker)\n", + " correct_time = perf_counter() - start\n", + " \n", + " image_counts = inp_source['image.data'].data_counts(labelled=False)\n", + " \n", + " start = perf_counter()\n", + " if (not outp_path.exists() or overwrite) and image_counts.sum() > 0:\n", + " fa = dc.files[0]\n", + " sel_trains = np.isin(fa.train_ids, dc.train_ids)\n", + " \n", + " outp_source_name = output_source.format(karabo_id=karabo_id, module_index=module_index)\n", + "\n", + " with DataFile(outp_path, 'w') as outp_file: \n", + " outp_file.create_index(\n", + " train_ids=dc.train_ids,\n", + " timestamps=fa.file['INDEX/timestamp'][sel_trains],\n", + " flags=fa.validity_flag[sel_trains])\n", + " outp_file.create_metadata(like=dc, instrument_channels=(f'{outp_source_name}/image',))\n", + " \n", + " outp_source = outp_file.create_instrument_source(outp_source_name)\n", + " \n", + " outp_source.create_index(image=image_counts)\n", + " outp_source.create_key('image.cellId', data=in_cell,\n", + " chunks=(chunks_ids,))\n", + " outp_source.create_key('image.pulseId', data=in_pulse,\n", + " chunks=(chunks_ids,))\n", + " outp_source.create_key('image.data', data=out_data,\n", + " chunks=(chunks_data, 256, 256))\n", + " write_compressed_frames(\n", + " out_gain, outp_file, f'INSTRUMENT/{outp_source_name}/image/gain', comp_threads=8)\n", + " write_compressed_frames(\n", + " out_mask, outp_file, f'INSTRUMENT/{outp_source_name}/image/mask', comp_threads=8)\n", + " write_time = perf_counter() - start\n", + " \n", + " total_time = open_time + read_time + correct_time + write_time\n", + " frame_rate = in_raw.shape[0] / total_time\n", + " \n", + " print('{}\\t{}\\t{:.3f}\\t{:.3f}\\t{:.3f}\\t{:.3f}\\t{:.3f}\\t{}\\t{:.1f}'.format(\n", + " wid, aggregator, open_time, read_time, correct_time, write_time, total_time,\n", + " in_raw.shape[0], frame_rate))\n", + " \n", + " in_raw = None\n", + " in_cell = None\n", + " in_pulse = None\n", + " out_data = None\n", + " out_gain = None\n", + " out_mask = None\n", + " gc.collect()\n", + "\n", + "print('worker\\tDA\\topen\\tread\\tcorrect\\twrite\\ttotal\\tframes\\trate')\n", + "start = perf_counter()\n", + "psh.ProcessContext(num_workers=num_workers).map(correct_file, data_to_process)\n", + "total_time = perf_counter() - start\n", + "print(f'Total time: {total_time:.1f}s')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Data preview for first train" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "geom = xg.LPD_1MGeometry.from_quad_positions(\n", + " [(11.4, 299), (-11.5, 8), (254.5, -16), (278.5, 275)])\n", + "\n", + "output_paths = [outp_path for _, _, outp_path in data_to_process if outp_path.exists()]\n", + "dc = xd.DataCollection.from_paths(output_paths).select_trains(np.s_[0])\n", + "\n", + "det = LPD1M(dc, detector_name=karabo_id)\n", + "data = det.get_array('image.data')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Intensity histogram across all cells" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "left_edge_ratio = 0.01\n", + "right_edge_ratio = 0.99\n", + "\n", + "fig, ax = plt.subplots(num=1, clear=True, figsize=(15, 6))\n", + "values, bins, _ = ax.hist(np.ravel(data.data), bins=2000, range=(-1500, 2000))\n", + "\n", + "def find_nearest_index(array, value):\n", + " return (np.abs(array - value)).argmin()\n", + "\n", + "cum_values = np.cumsum(values)\n", + "vmin = bins[find_nearest_index(cum_values, cum_values[-1]*left_edge_ratio)]\n", + "vmax = bins[find_nearest_index(cum_values, cum_values[-1]*right_edge_ratio)]\n", + "\n", + "max_value = values.max()\n", + "ax.vlines([vmin, vmax], 0, max_value, color='red', linewidth=5, alpha=0.2)\n", + "ax.text(vmin, max_value, f'{left_edge_ratio*100:.0f}%',\n", + " color='red', ha='center', va='bottom', size='large')\n", + "ax.text(vmax, max_value, f'{right_edge_ratio*100:.0f}%',\n", + " color='red', ha='center', va='bottom', size='large')\n", + "ax.text(vmax+(vmax-vmin)*0.01, max_value/2, 'Colormap interval',\n", + " color='red', rotation=90, ha='left', va='center', size='x-large')\n", + "\n", + "ax.set_xlim(vmin-(vmax-vmin)*0.1, vmax+(vmax-vmin)*0.1)\n", + "ax.set_ylim(0, max_value*1.1)\n", + "pass" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### First memory cell" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fig, ax = plt.subplots(num=2, figsize=(15, 15), clear=True, nrows=1, ncols=1)\n", + "geom.plot_data_fast(data[:, 0, 0], ax=ax, vmin=vmin, vmax=vmax)\n", + "pass" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Train average" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "ExecuteTime": { + "end_time": "2018-11-13T18:24:57.547563Z", + "start_time": "2018-11-13T18:24:56.995005Z" + }, + "scrolled": false + }, + "outputs": [], + "source": [ + "fig, ax = plt.subplots(num=3, figsize=(15, 15), clear=True, nrows=1, ncols=1)\n", + "geom.plot_data_fast(data[:, 0].mean(axis=1), ax=ax, vmin=vmin, vmax=vmax)\n", + "pass" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Lowest gain stage per pixel" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "highest_gain_stage = det.get_array('image.gain', pulses=np.s_[:]).max(axis=(1, 2))\n", + "\n", + "fig, ax = plt.subplots(num=4, figsize=(15, 15), clear=True, nrows=1, ncols=1)\n", + "p = geom.plot_data_fast(highest_gain_stage, ax=ax, vmin=0, vmax=2);\n", + "\n", + "cb = ax.images[0].colorbar\n", + "cb.set_ticks([0, 1, 2])\n", + "cb.set_ticklabels(['High gain', 'Medium gain', 'Low gain'])" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "pycal", + "language": "python", + "name": "pycal" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.11" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/setup.py b/setup.py index 42847f5dc0d7cc35b539ac5e8b2bf9fb54103b1f..9288cbb06d2d110530f36874dba6efe5da47eb0e 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,13 @@ ext_modules = [ extra_compile_args=["-fopenmp", "-march=native"], extra_link_args=["-fopenmp"], ), + Extension( + 'cal_tools.lpdalgs', + ['src/cal_tools/lpdalgs.pyx'], + extra_compile_args=['-O3', '-fopenmp', '-march=native', + '-ftree-vectorize', '-frename-registers'], + extra_link_args=['-fopenmp'], + ) ] @@ -89,6 +96,7 @@ install_requires = [ "sphinx==1.8.5", "tabulate==0.8.6", "traitlets==4.3.3", + "calibration_client==10.0.0", ] if "readthedocs.org" not in sys.executable: diff --git a/src/cal_tools/files.py b/src/cal_tools/files.py new file mode 100644 index 0000000000000000000000000000000000000000..57beac95aab6571ce89ad407df818ffabb6b440f --- /dev/null +++ b/src/cal_tools/files.py @@ -0,0 +1,544 @@ + +from datetime import datetime +from itertools import chain +from numbers import Integral +from pathlib import Path +import re + +import numpy as np +import h5py + + +def get_pulse_offsets(pulses_per_train): + """Compute pulse offsets from pulse counts. + + Given an array of number of pulses per train (INDEX/<source>/count), + computes the offsets (INDEX/<source>/first) for the first pulse of a + train inthe data array. + + Args: + pulses_per_train (array_like): Pulse count per train. + + Returns: + (array_like) Offet of first pulse for each train. + """ + + pulse_offsets = np.zeros_like(pulses_per_train) + np.cumsum(pulses_per_train[:-1], out=pulse_offsets[1:]) + + return pulse_offsets + + +def sequence_trains(train_ids, trains_per_sequence=256): + """Iterate over sequences for a list of trains. + + For pulse-resolved data, sequence_pulses may be used instead. + + Args: + train_ids (array_like): Train IDs to sequence. + trains_per_sequence (int, optional): Number of trains + per sequence, 256 by default. + + Yields: + (int, slice) Current sequence ID, train mask. + """ + + num_trains = len(train_ids) + + for seq_id, start in enumerate(range(0, num_trains, trains_per_sequence)): + train_mask = slice( + *np.s_[start:start+trains_per_sequence].indices(num_trains)) + yield seq_id, train_mask + + +def sequence_pulses(train_ids, pulses_per_train=1, pulse_offsets=None, + trains_per_sequence=256): + """Split trains into sequences. + + Args: + train_ids (array_like): Train IDs to sequence. + pulses_per_train (int or array_like, optional): Pulse count per + train. If scalar, it is assumed to be constant for all + trains. If omitted, it is 1 by default. + pulse_offsets (array_like, optional): Offsets for the first + pulse in each train, computed from pulses_per_train if + omitted. + trains_per_sequence (int, optional): Number of trains + per sequence, 256 by default. + + Yields: + (int, array_like, array_like) + Current sequence ID, train mask, pulse mask. + """ + + if isinstance(pulses_per_train, Integral): + pulses_per_train = np.full_like(train_ids, pulses_per_train, + dtype=np.uint64) + + if pulse_offsets is None: + pulse_offsets = get_pulse_offsets(pulses_per_train) + + for seq_id, train_mask in sequence_trains(train_ids, trains_per_sequence): + start = train_mask.start + stop = train_mask.stop - 1 + pulse_mask = np.s_[ + pulse_offsets[start]:pulse_offsets[stop]+pulses_per_train[stop]] + + yield seq_id, train_mask, pulse_mask + + +def escape_key(key): + """Escapes a key name from Karabo to HDF notation.""" + return key.replace('.', '/') + + +class DataFile(h5py.File): + """European XFEL HDF5 data file. + + This class extends the h5py.File with methods specific to writing + data in the European XFEL file format. The internal state does not + depend on using any of these methods, and the underlying file may be + manipulated by any of the regular h5py methods, too. + + Please refer to + https://extra-data.readthedocs.io/en/latest/data_format.html for + details of the file format. + """ + + filename_format = '{prefix}-R{run:04d}-{aggregator}-S{sequence:05d}.h5' + aggregator_pattern = re.compile(r'^[A-Z]{2,}\d{2}$') + instrument_source_pattern = re.compile(r'^[\w\/-]+:\w+$') + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.__control_sources = set() + self.__instrument_sources = set() + self.__run = 0 + self.__sequence = 0 + + @classmethod + def from_details(cls, folder, aggregator, run, sequence, prefix='CORR', + mode='w', *args, **kwargs): + """Open or create a file based on European XFEL details. + + This methis is a wrapper to construct the filename based its + components. + + Args: + folder (Path, str): Parent location for this file. + aggregator (str): Name of the data aggregator, must satisfy + DataFile.aggregator_pattern. + run (int): Run number. + sequence (int): Sequence number. + prefix (str, optional): First filename component, 'CORR' by + default. + args, kwargs: Any additional arguments are passed on to + h5py.File + + + Returns: + (DataFile) Opened file object. + """ + + if not isinstance(folder, Path): + folder = Path(folder) + + if not cls.aggregator_pattern.match(aggregator): + raise ValueError(f'invalid aggregator format, must satisfy ' + f'{cls.aggregator_pattern.pattern}') + + filename = cls.filename_format.format( + prefix=prefix, aggregator=aggregator, run=run, sequence=sequence) + + self = cls((folder / filename).resolve(), mode, *args, **kwargs) + self.__run = run + self.__sequence = sequence + + return self + + def create_index(self, train_ids, timestamps=None, flags=None, + origins=None): + """Create global INDEX datasets. + + These datasets are agnostic of any source and describe the + trains contained in this file. + + Args: + train_ids (array_like): Train IDs contained in this file. + timestamps (array_like, optional): Timestamp of each train, + 0 if omitted. + flags (array_like, optional): Whether the time server is the + initial origin of each train, 1 if omitted. + origins (array_like, optional): Which source is the initial + origin of each train, -1 (time server) if omitted. + + Returns: + None + """ + + self.create_dataset('INDEX/trainId', data=train_ids, dtype=np.uint64) + + if timestamps is None: + timestamps = np.zeros_like(train_ids, dtype=np.uint64) + elif len(timestamps) != len(train_ids): + raise ValueError('timestamps and train_ids must be same length') + + self.create_dataset('INDEX/timestamp', data=timestamps, + dtype=np.uint64) + + if flags is None: + flags = np.ones_like(train_ids, dtype=np.int32) + elif len(flags) != len(train_ids): + raise ValueError('flags and train_ids must be same length') + + self.create_dataset('INDEX/flag', data=flags, dtype=np.int32) + + if origins is None: + origins = np.full_like(train_ids, -1, dtype=np.int32) + elif len(origins) != len(train_ids): + raise ValueError('origins and train_ids must be same length') + + self.create_dataset('INDEX/origin', data=origins, dtype=np.int32) + + def create_control_source(self, source): + """Create group for a control source ("slow data"). + + Control sources created via this method are not required to be + passed to create_metadata() again. + + Args: + source (str): Karabo device ID. + + Returns: + (ControlSource) Created group in CONTROL. + """ + + self.__control_sources.add(source) + return ControlSource(self.create_group(f'CONTROL/{source}').id, source) + + def create_instrument_source(self, source): + """Create group for an instrument source ("fast data"). + + Instrument sources created via this method are not required to be + passed to create_metadata() again. + + Args: + source (str): Karabp pipeline path, must satisfy + DataFile.instrument_source_pattern. + + Returns: + (InstrumentSource) Created group in INSTRUMENT. + """ + + if not self.instrument_source_pattern.match(source): + raise ValueError(f'invalid source format, must satisfy ' + f'{self.instrument_source_pattern.pattern}') + + self.__instrument_sources.add(source) + return InstrumentSource(self.create_group(f'INSTRUMENT/{source}').id, + source) + + def create_metadata(self, like=None, *, + creation_date=None, update_date=None, proposal=0, + run=None, sequence=None, daq_library='1.x', + karabo_framework='2.x', control_sources=(), + instrument_channels=()): + """Create METADATA datasets. + + Args: + like (DataCollection, optional): Take proposal, run, + daq_library, karabo_framework from an EXtra-data data + collection, overwriting any of these arguments passed. + creation_date (datetime, optional): Creation date and time, + now if omitted. + update_date (datetime, optional): Update date and time, + now if omitted. + proposal (int, optional): Proposal number, 0 if omitted and + no DataCollection passed. + run (int, optional): Run number, 0 if omitted, no + DataCollection is passed or object not created via + from_details. + sequence (int, optional): Sequence number, 0 if omitted and + object not created via from_details. + daq_library (str, optional): daqLibrary field, '1.x' if + omitted and no DataCollection passed. + karabo_framework (str, optional): karaboFramework field, + '2.x' if omitted and no DataCollection is passed. + control_sources (Iterable, optional): Control sources in + this file, sources created via create_control_source are + automatically included. + instrument_channels (Iterable, optional): Instrument + channels (source and first component of data hash) in + this file, channels created via create_instrument_source + are automatically included. + + Returns: + None + """ + + if like is not None: + metadata = like.run_metadata() + proposal = metadata['proposalNumber'] + run = metadata['runNumber'] + daq_library = metadata['daqLibrary'] + karabo_framework = metadata['karaboFramework'] + + else: + if run is None: + run = self.__run + + if sequence is None: + sequence = self.__sequence + + if creation_date is None: + creation_date = datetime.now() + + if update_date is None: + update_date = creation_date + + md_group = self.require_group('METADATA') + md_group.create_dataset( + 'creationDate', shape=(1,), + data=creation_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii')) + md_group.create_dataset('daqLibrary', shape=(1,), + data=daq_library.encode('ascii')) + md_group.create_dataset('dataFormatVersion', shape=(1,), data=b'1.2') + + # Start with the known and specified control sources + sources = {name: 'CONTROL' + for name in chain(self.__control_sources, control_sources)} + + # Add in the specified instrument data channels. + sources.update({full_channel: 'INSTRUMENT' + for full_channel in instrument_channels}) + + # Add in those already in the file, if not already passed. + sources.update({f'{name}/{channel}': 'INSTRUMENT' + for name in self.__instrument_sources + for channel in self[f'INSTRUMENT/{name}']}) + + source_names = sorted(sources.keys()) + + data_sources_shape = (len(sources),) + md_group.create_dataset('dataSources/dataSourceId', + shape=data_sources_shape, + data=[f'{sources[name]}/{name}'.encode('ascii') + for name in source_names]) + md_group.create_dataset('dataSources/deviceId', + shape=data_sources_shape, + data=[name.encode('ascii') + for name in source_names]) + md_group.create_dataset('dataSources/root', shape=data_sources_shape, + data=[sources[name].encode('ascii') + for name in source_names]) + + md_group.create_dataset( + 'karaboFramework', shape=(1,), + data=karabo_framework.encode('ascii')) + md_group.create_dataset( + 'proposalNumber', shape=(1,), dtype=np.uint32, data=proposal) + md_group.create_dataset( + 'runNumber', shape=(1,), dtype=np.uint32, data=run) + md_group.create_dataset( + 'sequenceNumber', shape=(1,), dtype=np.uint32, data=sequence) + md_group.create_dataset( + 'updateDate', shape=(1,), + data=update_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii')) + + +class ControlSource(h5py.Group): + """Group for a control source ("slow data"). + + This class extends h5py.Group with methods specific to writing data + of a control source in the European XFEL file format. The internal + state does not depend on using any of these methods, and the + underlying file may be manipulated by any of the regular h5py + methods, too. + """ + + ascii_dt = h5py.string_dtype('ascii') + + def __init__(self, group_id, source): + super().__init__(group_id) + + self.__source = source + self.__run_group = self.file.create_group(f'RUN/{source}') + + def get_run_group(self): + return self.__run_group + + def get_index_group(self): + return self.file.require_group(f'INDEX/{self.__source}') + + def create_key(self, key, values, timestamps=None, run_entry=None): + """Create datasets for a key varying each train. + + Args: + key (str): Source key, dots are automatically replaced by + slashes. + values (array_like): Source values for each train. + timestamps (array_like, optional): Timestamps for each + source value, 0 if omitted. + run_entry (tuple of array_like, optional): Value and + timestamp for the corresponding value in the RUN + section. The first entry for the train values is used if + omitted. + + Returns: + None + """ + + key = escape_key(key) + + if timestamps is None: + timestamps = np.zeros_like(values, dtype=np.uint64) + elif len(values) != len(timestamps): + raise ValueError('values and timestamp must be the same length') + + self.create_dataset(f'{key}/value', data=values) + self.create_dataset(f'{key}/timestamp', data=timestamps) + + if run_entry is None: + run_entry = (values[0], timestamps[0]) + + self.create_run_key(key, *run_entry) + + def create_run_key(self, key, value, timestamp=None): + """Create datasets for a key constant over a run. + + Args: + key (str): Source key, dots are automatically replaced by + slashes. + value (Any): Key value. + timestamp (int, optional): Timestamp of the value, + 0 if omitted. + + Returns: + None + """ + + # TODO: Some types/shapes are still not fully correct here. + + key = escape_key(key) + + if timestamp is None: + timestamp = 0 + + if isinstance(value, list): + shape = (1, len(value)) + + try: + dtype = type(value[0]) + except IndexError: + # Assume empty lists are string-typed. + dtype = self.ascii_dt + elif isinstance(value, np.ndarray): + shape = value.shape + dtype = value.dtype + else: + shape = (1,) + dtype = type(value) + + if dtype is str: + dtype = self.ascii_dt + + self.__run_group.create_dataset( + f'{key}/value', data=value, shape=shape, dtype=dtype) + self.__run_group.create_dataset( + f'{key}/timestamp', data=timestamp, shape=shape, dtype=np.uint64) + + def create_index(self, num_trains): + """Create source-specific INDEX datasets. + + + Depending on whether this source has train-varying data or not, + different count/first datasets are written. + + Args: + num_trains (int): Total number of trains in this file. + + Returns: + None + """ + + if len(self) > 0: + count_func = np.ones + first_func = np.arange + else: + count_func = np.zeros + first_func = np.zeros + + index_group = self.get_index_group() + index_group.create_dataset( + 'count', data=count_func(num_trains, dtype=np.uint64)) + index_group.create_dataset( + 'first', data=first_func(num_trains, dtype=np.uint64)) + + +class InstrumentSource(h5py.Group): + """Group for an instrument source ("fast data"). + + This class extends h5py.Group with methods specific to writing data + of a control source in the European XFEL file format. The internal + state does not depend on using any of these methods, and the + underlying file may be manipulated by any of the regular h5py + methods, too. + """ + + key_pattern = re.compile(r'^\w+\/[\w\/]+$') + + def __init__(self, group_id, source): + super().__init__(group_id) + + self.__source = source + + def get_index_group(self, channel): + return self.file.require_group(f'INDEX/{self.__source}/{channel}') + + def create_key(self, key, data=None, **kwargs): + """Create dataset for a key. + + Args: + key (str): Source key, dots are automatically replaced by + slashes. + data (array_like, optional): Key data to initialize the + dataset to. + kwargs: Any additional keyword arguments are passed to + create_dataset. + + Returns: + (h5py.Dataset) Created dataset + """ + + key = escape_key(key) + + if not self.key_pattern.match(key): + raise ValueError(f'invalid key format, must satisfy ' + f'{self.key_pattern.pattern}') + + return self.create_dataset(key, data=data, **kwargs) + + def create_index(self, *args, **channels): + """Create source-specific INDEX datasets. + + Instrument data is indexed by channel, which is the first + component in its key. If channels have already been created, the + index may be applied to all channels by passing them as a + positional argument. + """ + + if not channels: + try: + count = int(args[0]) + except IndexError: + raise ValueError('positional arguments required if no ' + 'explicit channels are passed') from None + # Allow ValueError to propagate directly. + + channels = {channel: count for channel in self} + + for channel, count in channels.items(): + index_group = self.get_index_group(channel) + index_group.create_dataset('count', data=count, dtype=np.uint64) + index_group.create_dataset( + 'first', data=get_pulse_offsets(count), dtype=np.uint64) diff --git a/src/cal_tools/lpdalgs.pyx b/src/cal_tools/lpdalgs.pyx new file mode 100644 index 0000000000000000000000000000000000000000..66b8b097c71a4a7cb6b1681e4678e0f04c2f43e7 --- /dev/null +++ b/src/cal_tools/lpdalgs.pyx @@ -0,0 +1,69 @@ +from cython cimport boundscheck, wraparound, cdivision +from cython.view cimport contiguous +from cython.parallel cimport prange + +from cal_tools.enums import BadPixels + +ctypedef unsigned short cell_t +ctypedef unsigned short raw_t +ctypedef float data_t +ctypedef unsigned char gain_t +ctypedef unsigned int mask_t + +cdef mask_t WRONG_GAIN_VALUE = BadPixels.WRONG_GAIN_VALUE, \ + VALUE_OUT_OF_RANGE = BadPixels.VALUE_OUT_OF_RANGE, \ + VALUE_IS_NAN = BadPixels.VALUE_IS_NAN + + +@boundscheck(False) +@wraparound(False) +@cdivision(True) +def correct_lpd_frames( + # (frame, x, y) + raw_t[:, :, ::contiguous] in_raw, + cell_t[::contiguous] in_cell, + + # (frame, x, y) + data_t[:, :, ::contiguous] out_data, + gain_t[:, :, ::contiguous] out_gain, + mask_t[:, :, ::contiguous] out_mask, + + # (cell, x, y, gain) + float[:, :, :, ::contiguous] ccv_offset, + float[:, :, :, ::contiguous] ccv_gain, + mask_t[:, :, :, ::contiguous] ccv_mask, + + int num_threads=1, +): + cdef int frame, ss, fs + cdef cell_t cell + cdef data_t data + cdef gain_t gain + cdef mask_t mask + + for frame in prange(in_raw.shape[0], nogil=True, num_threads=num_threads): + cell = in_cell[frame] + + for ss in range(in_raw.shape[1]): + for fs in range(in_raw.shape[2]): + # Decode intensity and gain from raw data. + data = <data_t>(in_raw[frame, ss, fs] & 0xFFF) + gain = <gain_t>((in_raw[frame, ss, fs] & 0x3000) >> 12) + + if gain <= 2: + mask = ccv_mask[cell, ss, fs, gain] + else: + data = 0.0 + gain = 0 + mask = WRONG_GAIN_VALUE + + data = data - ccv_offset[cell, ss, fs, gain] + data = data * ccv_gain[cell, ss, fs, gain] + + if data > 1e7 or data < -1e7: + data = 0.0 + mask = mask | VALUE_OUT_OF_RANGE + + out_data[frame, ss, fs] = data + out_gain[frame, ss, fs] = gain + out_mask[frame, ss, fs] = mask diff --git a/src/xfel_calibrate/notebooks.py b/src/xfel_calibrate/notebooks.py index 299722f1da4069fc15933f42eeb1774fc58a413c..a26c4afdf8ea3da72a9de9300f0796b016ad517a 100644 --- a/src/xfel_calibrate/notebooks.py +++ b/src/xfel_calibrate/notebooks.py @@ -85,11 +85,11 @@ notebooks = { "cluster cores": 8}, }, "CORRECT": { - "notebook": "notebooks/LPD/LPD_Correct_and_Verify.ipynb", + "notebook": "notebooks/LPD/LPD_Correct_Fast.ipynb", "concurrency": {"parameter": "sequences", "default concurrency": [-1], "use function": "balance_sequences", - "cluster cores": 32}, + "cluster cores": 16}, }, "XGM_MINE": { "notebook": "notebooks/LPD/Mine_RadIntensity_vs_XGM_NBC.ipynb",