diff --git a/cal_tools/cal_tools/agipdlib.py b/cal_tools/cal_tools/agipdlib.py index c239d5730cf5e8b3c90ee7cde3f89f51e3f825ee..8575e4d0860e2ef7a339d66a6b5941648db7f656 100644 --- a/cal_tools/cal_tools/agipdlib.py +++ b/cal_tools/cal_tools/agipdlib.py @@ -5,7 +5,16 @@ from typing import Any, Dict, Optional, Tuple import h5py import numpy as np import sharedmem -from iCalibrationDB import Conditions, Constants, Detectors +from cal_tools.agipdutils import (assemble_constant_dict, + baseline_correct_via_noise, + baseline_correct_via_stripe, + correct_baseline_via_hist, + correct_baseline_via_hist_asic, + make_noisy_adc_mask, match_asic_borders, + melt_snowy_pixels) +from cal_tools.enums import BadPixels, SnowResolution +from cal_tools.tools import get_constant_from_db_and_time +from iCalibrationDB import Conditions, Constants from cal_tools.agipdutils import * from cal_tools.cython import agipdalgs as calgs @@ -80,7 +89,8 @@ def get_acq_rate(fast_paths: Tuple[str, str, int], def get_gain_setting(fname: str, h5path_ctrl: str) -> int: - """ + """Retrieve Gain setting. + If the data is available from the middlelayer FPGA_COMP device, then it is retrieved from there. If not, the setting is calculated off `setupr` and `patternTypeIndex` @@ -160,8 +170,8 @@ class AgipdCorrections: image/data section :param h5_index_path: path in HDF5 file which is prefixed to the index section - :param corr_bools: A dict with all of the correction booleans requested or - available + :param corr_bools: A dict with all of the correction booleans requested + or available The following example shows a typical use case: .. code-block:: python @@ -203,7 +213,7 @@ class AgipdCorrections: self.rng_pulses = max_pulses # avoid list(range(*[0]])) self.pulses_lst = list(range(*max_pulses)) \ - if not (len(max_pulses) == 1 and max_pulses[0] == 0) else max_pulses #noqa + if not (len(max_pulses) == 1 and max_pulses[0] == 0) else max_pulses # noqa self.max_cells = max_cells # Correction parameters @@ -498,11 +508,11 @@ class AgipdCorrections: # force into high or medium gain if requested if self.corr_bools.get('force_mg_if_below'): gain[(gain == 2) & ( - (data - offsetb[1]) < self.mg_hard_threshold)] = 1 + (data - offsetb[1]) < self.mg_hard_threshold)] = 1 if self.corr_bools.get('force_hg_if_below'): gain[(gain > 0) & ( - (data - offsetb[0]) < self.hg_hard_threshold)] = 0 + (data - offsetb[0]) < self.hg_hard_threshold)] = 0 # choose constants according to gain setting off = calgs.gain_choose(gain, offsetb) @@ -512,7 +522,7 @@ class AgipdCorrections: data -= off del off - def baseline_correction(self, i_proc:int, first:int, last:int): + def baseline_correction(self, i_proc: int, first: int, last: int): """ Perform image-wise base-line shift correction for data in shared memory via histogram or stripe @@ -536,14 +546,12 @@ class AgipdCorrections: # output is saved in sharedmem to pass for correct_agipd() # as this function takes about 3 seconds. self.shared_dict[i_proc]['msk'][first:last] = \ - calgs.gain_choose_int(gain, - self.mask[module_idx][:, cellid]) # noqa + calgs.gain_choose_int(gain, self.mask[module_idx][:, cellid]) if hasattr(self, "rel_gain"): # Get the correct rel_gain depending on cell-id self.shared_dict[i_proc]['rel_corr'][first:last] = \ - calgs.gain_choose(gain, - self.rel_gain[module_idx][:, cellid]) # noqa + calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid]) # do this image wise, as the shift is per image for i in range(data.shape[0]): @@ -647,9 +655,9 @@ class AgipdCorrections: # after calculating it while offset correcting. if self.corr_bools.get('melt_snow'): _ = melt_snowy_pixels(self.shared_dict[i_proc]['raw_data'][first:last], # noqa - data, gain, - self.shared_dict[i_proc]['t0_rgain'][first:last], # noqa - self.snow_resolution) + data, gain, + self.shared_dict[i_proc]['t0_rgain'][first:last], # noqa + self.snow_resolution) # Inner ASIC borders are matched to the same signal level if self.corr_bools.get("match_asics"): @@ -712,7 +720,7 @@ class AgipdCorrections: valid_indices = np.concatenate([np.arange(validf[i], validf[i]+validc[i]) for i in range(validf.size)], - axis=0) + axis=0) valid_indices = np.squeeze(valid_indices).astype(np.int32) elif index_v == 1: @@ -753,8 +761,8 @@ class AgipdCorrections: allpulses = data_dict['pulseId'][:n_img] # Initializing can_calibrate array - can_calibrate = self.choose_selected_pulses(allpulses, - can_calibrate=[True]*len(allpulses)) + can_calibrate = self.choose_selected_pulses( + allpulses, can_calibrate=[True]*len(allpulses)) # Only select data corresponding to selected pulses # and overwrite data in shared-memory leaving @@ -779,7 +787,7 @@ class AgipdCorrections: return n_img def validate_selected_pulses(self, allpulses: np.array - ) -> Tuple[int, int, int]: + ) -> Tuple[int, int, int]: """Validate the selected pulses given from the notebook Validate that the given range of pulses to correct @@ -816,7 +824,6 @@ class AgipdCorrections: def choose_selected_pulses(self, allpulses: np.array, can_calibrate: np.array) -> np.array: - """ Choose given selected pulse from pulseId array of raw data. The selected pulses range is validated then @@ -831,7 +838,7 @@ class AgipdCorrections: """ (first_pulse, last_pulse, - pulse_step) = self.validate_selected_pulses(allpulses) + pulse_step) = self.validate_selected_pulses(allpulses) # collect the pulses to be calibrated cal_pulses = allpulses[first_pulse: last_pulse: pulse_step] @@ -853,7 +860,8 @@ class AgipdCorrections: return can_calibrate def gen_valid_range(self, first_index: int, last_index: int, - max_cells: int, allcells: np.array, allpulses: np.array, + max_cells: int, allcells: np.array, + allpulses: np.array, valid_indices: Optional[np.array] = None, apply_sel_pulses: Optional[bool] = True ) -> np.array: @@ -890,8 +898,8 @@ class AgipdCorrections: return if apply_sel_pulses: - can_calibrate = self.choose_selected_pulses(allpulses, - can_calibrate=can_calibrate) + can_calibrate = self.choose_selected_pulses( + allpulses, can_calibrate=can_calibrate) if valid_indices is None: firange = np.arange(first_index, last_index) else: @@ -1075,7 +1083,7 @@ class AgipdCorrections: self.offset[module_idx][...] = cons_data["Offset"].transpose()[...] self.noise[module_idx][...] = cons_data["Noise"].transpose()[...] - self.thresholds[module_idx][...] = cons_data["ThresholdsDark"].transpose()[:3,...] # noqa + self.thresholds[module_idx][...] = cons_data["ThresholdsDark"].transpose()[:3, ...] # noqa if self.corr_bools.get("low_medium_gap"): t0 = self.thresholds[module_idx][0] @@ -1090,7 +1098,7 @@ class AgipdCorrections: :bpixels.shape[2], # noqa None] - if when["SlopesFF"]: # Checking if constant was retrieved + if when["SlopesFF"]: # Checking if constant was retrieved slopesFF = cons_data["SlopesFF"] # This could be used for backward compatibility @@ -1100,18 +1108,20 @@ class AgipdCorrections: # This is for backward compatability for old FF constants # (128, 512, mem_cells) if slopesFF.shape[-1] == 2: - xray_cor = np.squeeze(slopesFF[...,0]) + xray_cor = np.squeeze(slopesFF[..., 0]) xray_cor_med = np.nanmedian(xray_cor) - xray_cor[np.isnan(xray_cor)]= xray_cor_med - xray_cor[(xray_cor<0.8) | (xray_cor>1.2)] = xray_cor_med + xray_cor[np.isnan(xray_cor)] = xray_cor_med + xray_cor[(xray_cor < 0.8) | ( + xray_cor > 1.2)] = xray_cor_med xray_cor = np.dstack([xray_cor]*self.max_cells) else: # Memory cell resolved xray_cor correction xray_cor = slopesFF # (128, 512, mem_cells) if xray_cor.shape[-1] < self.max_cells: - # In case of having new constant with less memory cells, - # due to lack of enough FF data or during development. - # xray_cor should be expanded by last memory cell. + # When working with new constant with fewer memory + # cells, eg. lacking enough FF data or during + # development, xray_cor must be expand its last memory + # cell to maintain a consistent shape. xray_cor = np.dstack(xray_cor, np.dstack([xray_cor[..., -1]] * (self.max_cells - xray_cor.shape[-1]))) # noqa @@ -1151,11 +1161,11 @@ class AgipdCorrections: pc_med_l = slopesPC[..., :self.max_cells, 4] # calculate median for slopes - pc_high_med = np.nanmedian(pc_high_m, axis=(0,1)) - pc_med_med = np.nanmedian(pc_med_m, axis=(0,1)) + pc_high_med = np.nanmedian(pc_high_m, axis=(0, 1)) + pc_med_med = np.nanmedian(pc_med_m, axis=(0, 1)) # calculate median for intercepts: - pc_high_l_med = np.nanmedian(pc_high_l, axis=(0,1)) - pc_med_l_med = np.nanmedian(pc_med_l, axis=(0,1)) + pc_high_l_med = np.nanmedian(pc_high_l, axis=(0, 1)) + pc_med_l_med = np.nanmedian(pc_med_l, axis=(0, 1)) # sanitize PC data # (it should be done already on the level of constants) diff --git a/cal_tools/cal_tools/agipdutils.py b/cal_tools/cal_tools/agipdutils.py index 7ae00abbe9ce44caa1e862e68369e7d68358c780..6ed7130528650c553b8643a2d4b5f08419f7d8c6 100644 --- a/cal_tools/cal_tools/agipdutils.py +++ b/cal_tools/cal_tools/agipdutils.py @@ -1,7 +1,9 @@ import copy +from typing import Tuple import numpy as np -from scipy.signal import cwt, find_peaks_cwt, ricker +from cal_tools.enums import BadPixels, SnowResolution +from scipy.signal import cwt, ricker from sklearn.mixture import GaussianMixture from sklearn.preprocessing import StandardScaler @@ -250,8 +252,10 @@ def correct_baseline_via_hist(d, pcm, g): return d, 0 it += 1 - def min_hist_distance(pc, bins=100, ran=(-10000, 10000), dec=20, - minbin=10): + def min_hist_distance(pc: int, + bins: int = 100, + ran: Tuple[int, int] = (-10000, 10000), + minbin: int = 10) -> float: hh, e = np.histogram(dd[g == 0] - pc, bins=bins, range=ran) hm, e = np.histogram((dd[g == 1] - pc) * pcm[g == 1], bins=bins, range=ran) diff --git a/cal_tools/cal_tools/tools.py b/cal_tools/cal_tools/tools.py index f112c3cf089b62279ee18692ca719e37fd37ed7a..d4015a558d56e6ea3a2daa5d24ba9fb104378b70 100644 --- a/cal_tools/cal_tools/tools.py +++ b/cal_tools/cal_tools/tools.py @@ -275,7 +275,9 @@ def get_dir_creation_date(directory: Union[str, Path], run: int, rfile = sorted(rfiles, key=path.getmtime)[0] with h5py.File(rfile, 'r') as fin: cdate = fin['METADATA/creationDate'][0].decode() - cdate = datetime.datetime.strptime(cdate, "%Y%m%dT%H%M%SZ") + cdate = datetime.datetime.strptime( + cdate, + "%Y%m%dT%H%M%SZ").replace(tzinfo=datetime.timezone.utc) return cdate except (IndexError, IOError, ValueError): ntries -= 1 diff --git a/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb b/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb index 00da5997abba27a77fd5aa54890785ae082ef76c..f1c6c165d58328b8be8e5fbc215825c9d6309f52 100644 --- a/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb +++ b/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb @@ -17,37 +17,35 @@ "metadata": {}, "outputs": [], "source": [ - "cluster_profile = \"noDB\" # cluster profile to use\n", - "in_folder = \"/gpfs/exfel/exp/FXE/201901/p002210/raw\" # the folder to read data from, required\n", - "out_folder = \"/gpfs/exfel/data/scratch/ahmedk/test/jf\" # the folder to output to, required\n", - "sequences = [-1] # sequences to correct, set to -1 for all, range allowed\n", - "run = 249 # runs to process, required\n", - "\n", - "karabo_id = \"FXE_XAD_JF1M\" # karabo prefix of Jungfrau devices\n", - "karabo_da = ['JNGFR01'] # data aggregators\n", - "receiver_id = \"RECEIVER-{}\" # inset for receiver devices\n", - "receiver_control_id = \"CONTROL\" # inset for control devices\n", - "path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # template to use for file name, double escape sequence number\n", + "in_folder = \"/gpfs/exfel/exp/CALLAB/202031/p900113/raw\" # the folder to read data from, required\n", + "out_folder = \"/gpfs/exfel/data/scratch/hammerd/issue-242\" # the folder to output to, required\n", + "sequences = [-1] # sequences to correct, set to [-1] for all, range allowed\n", + "run = 9979 # run to process, required\n", + "\n", + "karabo_id = \"SPB_IRDA_JF4M\" # karabo prefix of Jungfrau devices\n", + "karabo_da = ['JNGFR01'] # data aggregators\n", + "receiver_id = \"JNGFR{:02d}\" # inset for receiver devices\n", + "receiver_control_id = \"CONTROL\" # inset for control devices\n", + "path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # template to use for file name\n", "h5path = '/INSTRUMENT/{}/DET/{}:daqOutput/data' # path in H5 file under which images are located\n", - "h5path_run = '/RUN/{}/DET/{}' # path to run data\n", - "h5path_cntrl = '/CONTROL/{}/DET/{}' # path to control data\n", + "h5path_run = '/RUN/{}/DET/{}' # path to run data\n", + "h5path_cntrl = '/CONTROL/{}/DET/{}' # path to control data\n", "karabo_id_control = \"\" # if control is on a different ID, set to empty string if it is the same a karabo-id\n", - "karabo_da_control = \"JNGFR01\" # file inset for control data\n", + "karabo_da_control = \"JNGFRCTRL00\" # file inset for control data\n", "\n", - "use_dir_creation_date = True # use the creation data of the input dir for database queries\n", - "cal_db_interface = \"tcp://max-exfl016:8017#8025\" #\"tcp://max-exfl016:8015#8025\" # the database interface to use\n", - "cal_db_timeout = 180000 # timeout on caldb requests\",\n", + "use_dir_creation_date = True # use the creation data of the input dir for database queries\n", + "cal_db_interface = \"tcp://max-exfl016:8017#8025\" # the database interface to use\n", + "cal_db_timeout = 180000 # timeout on caldb requests\n", "\n", - "overwrite = True # set to True if existing data should be overwritten\n", - "no_relative_gain = False # do not do relative gain correction\n", - "bias_voltage = 180 # will be overwritten by value in file\n", - "sequences_per_node = 5 # number of sequence files per cluster node if run as slurm job, set to 0 to not run SLURM parallel\n", - "photon_energy = 9.2 # photon energy in keV\n", + "overwrite = True # set to True if existing data should be overwritten\n", + "no_relative_gain = False # do not do relative gain correction\n", + "bias_voltage = 180 # will be overwritten by value in file\n", + "sequences_per_node = 5 # number of sequence files per cluster node if run as slurm job, set to 0 to not run SLURM parallel\n", + "photon_energy = 9.2 # photon energy in keV\n", "chunk_size_idim = 1 # chunking size of imaging dimension, adjust if user software is sensitive to this.\n", - "integration_time = 4.96 # integration time in us, will be overwritten by value in file\n", - "mem_cells = 0. # leave memory cells equal 0, as it is saved in control information starting 2019.\n", - "gmapfile = \"\" # variable is not used but left here for back compatibility\n", - "db_module = [\"Jungfrau_M233\"] # ID of module in calibration database\n", + "integration_time = 4.96 # integration time in us, will be overwritten by value in file\n", + "mem_cells = 0 # leave memory cells equal 0, as it is saved in control information starting 2019.\n", + "db_module = [\"Jungfrau_M275\"] # ID of module in calibration database\n", "manual_slow_data = False # if true, use manually entered bias_voltage and integration_time values\n", "chunk_size = 0\n", "\n", @@ -62,23 +60,25 @@ "metadata": {}, "outputs": [], "source": [ + "import copy\n", + "import multiprocessing\n", "import time\n", - "from ipyparallel import Client\n", + "import warnings\n", "from functools import partial\n", - "import tabulate\n", - "from IPython.display import display, Latex\n", - "import copy\n", + "from pathlib import Path\n", + "\n", "import h5py\n", - "import os\n", - "from cal_tools.tools import (map_modules_from_folder, get_dir_creation_date,\n", - " get_constant_from_db_and_time)\n", - "from iCalibrationDB import (ConstantMetaData, Constants, Conditions, Detectors, Versions)\n", - "from cal_tools.enums import BadPixels\n", - "import numpy as np\n", "import matplotlib\n", "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "import tabulate\n", + "from cal_tools.enums import BadPixels\n", + "from cal_tools.tools import (get_constant_from_db_and_time,\n", + " get_dir_creation_date, map_modules_from_folder)\n", + "from iCalibrationDB import Conditions, Constants\n", + "from IPython.display import Latex, display\n", "from matplotlib.colors import LogNorm\n", - "import warnings\n", + "\n", "warnings.filterwarnings('ignore')\n", "\n", "matplotlib.use('agg')\n", @@ -91,23 +91,18 @@ "metadata": {}, "outputs": [], "source": [ - "client = Client(profile=cluster_profile)\n", - "view = client[:]\n", - "view.use_dill()\n", - "\n", + "in_folder = Path(in_folder)\n", + "out_folder = Path(out_folder)\n", + "ped_dir = in_folder / f'r{run:04d}'\n", "h5path = h5path.format(karabo_id, receiver_id)\n", - "ped_dir = \"{}/r{:04d}\".format(in_folder, run)\n", "\n", - "if ped_dir[-1] == \"/\":\n", - " ped_dir = ped_dir[:-1]\n", - "\n", - "if not os.path.exists(out_folder):\n", - " os.makedirs(out_folder)\n", - "elif not overwrite:\n", + "if out_folder.exists() and not overwrite:\n", " raise AttributeError(\"Output path exists! Exiting\")\n", + "else:\n", + " out_folder.mkdir(parents=True, exist_ok=True)\n", "\n", "fp_name_contr = path_template.format(run, karabo_da_control, 0)\n", - "fp_path_contr = '{}/{}'.format(ped_dir, fp_name_contr)\n", + "fp_path_contr = ped_dir / fp_name_contr\n", "\n", "if sequences[0] == -1:\n", " sequences = None\n", @@ -131,7 +126,7 @@ "metadata": {}, "outputs": [], "source": [ - "def check_memoryCells(file_name, path):\n", + "def check_memory_cells(file_name, path):\n", " with h5py.File(file_name, 'r') as f:\n", " t_stamp = np.array(f[path + '/storageCells/timestamp'])\n", " st_cells = np.array(f[path + '/storageCells/value'])\n", @@ -151,8 +146,9 @@ "outputs": [], "source": [ "# set everything up filewise\n", - "mmf = map_modules_from_folder(in_folder, run, path_template, karabo_da, sequences)\n", - "mapped_files, mod_ids, total_sequences, sequences_qm, _ = mmf\n", + "mapped_files, mod_ids, total_sequences, sequences_qm, _ = map_modules_from_folder(\n", + " in_folder, run, path_template, karabo_da, sequences\n", + ")\n", "\n", "print(f\"Processing a total of {total_sequences} sequence files\")\n", "table = []\n", @@ -169,8 +165,9 @@ " headers=[\"#\", \"module\", \"# module\", \"file\"])))\n", "\n", "# restore the queue\n", - "mmf = map_modules_from_folder(in_folder, run, path_template, karabo_da, sequences)\n", - "mapped_files, mod_ids, total_sequences, sequences_qm, _ = mmf" + "mapped_files, mod_ids, total_sequences, sequences_qm, _ = map_modules_from_folder(\n", + " in_folder, run, path_template, karabo_da, sequences\n", + ")" ] }, { @@ -180,7 +177,7 @@ "outputs": [], "source": [ "if not manual_slow_data:\n", - " with h5py.File(fp_path_contr.format(0), 'r') as f:\n", + " with h5py.File(fp_path_contr, 'r') as f:\n", " run_path = h5path_run.format(karabo_id_control, receiver_control_id)\n", " integration_time = float(f[f'{run_path}/exposureTime/value'][()]*1e6)\n", " bias_voltage = int(np.squeeze(f[f'{run_path}/vHighVoltage/value'])[0])\n", @@ -188,26 +185,26 @@ "\n", "control_path = h5path_cntrl.format(karabo_id_control, receiver_control_id)\n", "try:\n", - " this_run_mcells, sc_start = check_memoryCells(fp_path_contr.format(0), control_path)\n", + " this_run_mcells, sc_start = check_memory_cells(fp_path_contr, control_path)\n", " if this_run_mcells == 1:\n", - " memoryCells = 1\n", + " memory_cells = 1\n", " print(f'Dark runs in single cell mode\\n storage cell start: {sc_start:02d}')\n", " else:\n", - " memoryCells = 16\n", + " memory_cells = 16\n", " print(f'Dark runs in burst mode\\n storage cell start: {sc_start:02d}')\n", "except Exception as e:\n", " if \"Unable to open object\" in str(e):\n", " if mem_cells==0:\n", - " memoryCells = 1\n", + " memory_cells = 1\n", " else:\n", - " memoryCells = mem_cells\n", - " print(f'Set memory cells to {memoryCells} as it is not saved in control information.')\n", + " memory_cells = mem_cells\n", + " print(f'Set memory cells to {memory_cells} as it is not saved in control information.')\n", " else:\n", " print(f\"Error trying to access memory cell from contol information: {e}\")\n", "\n", "print(f\"Integration time is {integration_time} us\")\n", "print(f\"Bias voltage is {bias_voltage} V\")\n", - "print(f\"Number of memory cells is {memoryCells}\")" + "print(f\"Number of memory cells is {memory_cells}\")" ] }, { @@ -216,70 +213,63 @@ "metadata": {}, "outputs": [], "source": [ - "condition = Conditions.Dark.jungfrau(memory_cells=memoryCells,\n", - " bias_voltage=bias_voltage,\n", - " integration_time=integration_time)\n", + "condition = Conditions.Dark.jungfrau(\n", + " memory_cells=memory_cells,\n", + " bias_voltage=bias_voltage,\n", + " integration_time=integration_time,\n", + ")\n", "\n", - "def get_constant_for_module(karabo_id, condition, cal_db_interface, creation_time, cal_db_timeout,\n", - " memoryCells, karabo_da):\n", + "def get_constants_for_module(karabo_da: str):\n", " \"\"\" Get calibration constants for given module of Jungfrau\n", - " \n", - " Function contains all includes to be used with ipCluster\n", "\n", - " :param condition: Calibration condition\n", - " :param cal_db_interface: Interface string, e.g. \"tcp://max-exfl016:8015\"\n", - " :param creation_time: Latest time for constant to be created\n", - " :param cal_db_timeout: Timeout for zmq request\n", - " :param: memoryCells: Number of used memory cells\n", - " :param: db_module: Module of Jungfrau, e.g. \"Jungfrau_M035\"\n", - "\n", - " :return: offset_map (offset map), mask (mask of bad pixels), \n", - " gain_map (map of relative gain factors), db_module (name of DB module), \n", + " :return:\n", + " offset_map (offset map),\n", + " mask (mask of bad pixels),\n", + " gain_map (map of relative gain factors),\n", + " db_module (name of DB module),\n", " when (dictionaty: constant - creation time)\n", " \"\"\"\n", - "\n", - " from iCalibrationDB import (ConstantMetaData, Constants, Conditions, Detectors, Versions)\n", - " from cal_tools.tools import get_constant_from_db_and_time\n", - " import numpy as np\n", " \n", " when = {}\n", - "\n", - " #TODO: Remove condition + constant retrieval duplication from notebook \n", - "\n", - " offset_map, when['Offset'] = \\\n", - " get_constant_from_db_and_time(karabo_id, karabo_da,\n", - " Constants.jungfrau.Offset(),\n", - " condition,\n", - " np.zeros((1024, 512, 1, 3)),\n", - " cal_db_interface,\n", - " creation_time=creation_time,\n", - " timeout=cal_db_timeout)\n", - " mask, when['BadPixels'] = \\\n", - " get_constant_from_db_and_time(karabo_id, karabo_da,\n", - " Constants.jungfrau.BadPixelsDark(),\n", - " condition,\n", - " np.zeros((1024, 512, 1, 3)),\n", - " cal_db_interface,\n", - " creation_time=creation_time,\n", - " timeout=cal_db_timeout)\n", - " gain_map, when['Gain'] = \\\n", - " get_constant_from_db_and_time(karabo_id, karabo_da,\n", - " Constants.jungfrau.RelativeGain(),\n", - " condition,\n", - " None,\n", - " cal_db_interface,\n", - " creation_time=creation_time,\n", - " timeout=cal_db_timeout)\n", - "\n", + " retrieval_function = partial(\n", + " get_constant_from_db_and_time,\n", + " karabo_id=karabo_id,\n", + " karabo_da=karabo_da,\n", + " condition=condition,\n", + " cal_db_interface=cal_db_interface,\n", + " creation_time=creation_time,\n", + " timeout=cal_db_timeout,\n", + " )\n", + " offset_map, when[\"Offset\"] = retrieval_function(\n", + " constant=Constants.jungfrau.Offset(), empty_constant=np.zeros((1024, 512, 1, 3))\n", + " )\n", + " mask, when[\"BadPixelsDark\"] = retrieval_function(\n", + " constant=Constants.jungfrau.BadPixelsDark(),\n", + " empty_constant=np.zeros((1024, 512, 1, 3)),\n", + " )\n", + " mask_ff, when[\"BadPixelsFF\"] = retrieval_function(\n", + " constant=Constants.jungfrau.BadPixelsFF(),\n", + " empty_constant=None\n", + " )\n", + " gain_map, when[\"Gain\"] = retrieval_function(\n", + " constant=Constants.jungfrau.RelativeGain(),\n", + " empty_constant=None\n", + " )\n", + "\n", + " # combine masks\n", + " if mask_ff is not None:\n", + " mask |= np.moveaxis(mask_ff, 0, 1)\n", + " \n", " # move from x,y,cell,gain to cell,x,y,gain\n", " offset_map = np.squeeze(offset_map)\n", " mask = np.squeeze(mask)\n", - " if memoryCells > 1:\n", + " \n", + " if memory_cells > 1:\n", " offset_map = np.moveaxis(np.moveaxis(offset_map, 0, 2), 0, 2)\n", " mask = np.moveaxis(np.moveaxis(mask, 0, 2), 0, 2)\n", "\n", " if gain_map is not None:\n", - " if memoryCells > 1:\n", + " if memory_cells > 1:\n", " gain_map = np.moveaxis(np.moveaxis(gain_map, 0, 2), 0, 1)\n", " else:\n", " gain_map = np.squeeze(gain_map)\n", @@ -288,24 +278,19 @@ " return offset_map, mask, gain_map, karabo_da, when\n", "\n", "\n", - "# Retrieve Offset, BadPixels and gain constants for a JungFrau module.\n", - "# Run ip Cluster parallelization over modules\n", - "p = partial(get_constant_for_module, karabo_id, condition, cal_db_interface, \n", - " creation_time, cal_db_timeout, memoryCells)\n", - "\n", - "r = view.map_sync(p, karabo_da)\n", - "#r = list(map(p, karabo_da))\n", + "with multiprocessing.Pool() as pool:\n", + " r = pool.map(get_constants_for_module, karabo_da)\n", "\n", "constants = {}\n", - "for rr in r:\n", - " offset_map, mask, gain_map, k_da, when = rr\n", + "for offset_map, mask, gain_map, k_da, when in r:\n", " print(f'Constants for module {k_da}:')\n", " for const in when:\n", - " print(f'{const} injected at {when[const]}')\n", + " print(f' {const} injected at {when[const]}')\n", + " \n", " if gain_map is None:\n", - " print(\"No gain map found\")\n", + " print(\" No gain map found\")\n", " no_relative_gain = True\n", - " \n", + " \n", " constants[k_da] = (offset_map, mask, gain_map)" ] }, @@ -319,11 +304,9 @@ " \"\"\" Copy and sanitize data in `infile` that is not touched by `correctLPD`\n", " \"\"\"\n", "\n", - " if h5base.startswith(\"/\"):\n", - " h5base = h5base[1:]\n", + " h5base = h5base.lstrip(\"/\")\n", " dont_copy = [\"adc\", ]\n", - " dont_copy = [h5base+\"/{}\".format(do)\n", - " for do in dont_copy]\n", + " dont_copy = [f'{h5base}/{dnc}' for dnc in dont_copy]\n", "\n", " def visitor(k, item):\n", " if k not in dont_copy:\n", @@ -344,11 +327,7 @@ "outputs": [], "source": [ "# Correct a chunk of images for offset and gain\n", - "def correct_chunk(offset_map, mask, gain_map, memoryCells, no_relative_gain, inp):\n", - " import numpy as np\n", - " import copy\n", - " import h5py\n", - "\n", + "def correct_chunk(offset_map, mask, gain_map, memory_cells, no_relative_gain, inp):\n", " fim_data = None\n", " gim_data = None\n", " rim_data = None\n", @@ -360,13 +339,13 @@ " g[g==3] = 2\n", "\n", " if copy_sample and ind==0:\n", - " if memoryCells==1:\n", + " if memory_cells==1:\n", " rim_data = np.squeeze(copy.copy(d))\n", " else:\n", " rim_data = np.squeeze(copy.copy(d[:,0,...]))\n", "\n", " # Select memory cells\n", - " if memoryCells>1:\n", + " if memory_cells>1:\n", " m[m>16] = 0\n", " offset_map_cell = offset_map[m,...]\n", " mask_cell = mask[m,...]\n", @@ -380,7 +359,7 @@ "\n", " # Gain correction\n", " if not no_relative_gain:\n", - " if memoryCells>1:\n", + " if memory_cells>1:\n", " gain_map_cell = gain_map[m,...]\n", " else:\n", " gain_map_cell = gain_map\n", @@ -391,7 +370,7 @@ "\n", " # Store sample of data for plotting\n", " if copy_sample and ind==0:\n", - " if memoryCells==1:\n", + " if memory_cells==1:\n", " fim_data = np.squeeze(copy.copy(d))\n", " gim_data = np.squeeze(copy.copy(g))\n", " msk_data = np.squeeze(copy.copy(msk))\n", @@ -403,30 +382,37 @@ " except Exception as e:\n", " err = e\n", "\n", - " return ind, d, msk, rim_data, fim_data, gim_data, msk_data, err\n", - "\n", + " return ind, d, msk, rim_data, fim_data, gim_data, msk_data, err" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ "fim_data = {}\n", "gim_data = {}\n", "rim_data = {}\n", "msk_data = {}\n", "\n", + "# For each module, chunks will be processed by pool\n", + "pool = multiprocessing.Pool()\n", "# Loop over modules\n", - "for i, key in enumerate(mapped_files):\n", + "for local_karabo_da, mapped_files_module in zip(karabo_da, mapped_files.values()):\n", + " h5path_f = h5path.format(int(local_karabo_da[-2:]))\n", " # Loop over sequences for given module\n", - " for k, f in enumerate(list(mapped_files[key].queue)):\n", - " \n", - " offset_map, mask, gain_map = constants[karabo_da[i]]\n", - " h5path_f = h5path.format(int(karabo_da[i][-2:]))\n", + " for sequence_file_number, sequence_file in enumerate(mapped_files_module.queue):\n", + " sequence_file = Path(sequence_file)\n", + " offset_map, mask, gain_map = constants[local_karabo_da]\n", " \n", - " with h5py.File(f, 'r') as infile:\n", - " \n", + " with h5py.File(sequence_file, 'r') as infile:\n", " # The processed files are saved here in a folder with the run name.\n", - " out_file = \"{}/{}\".format(out_folder, f.split(\"/\")[-1])\n", - " out_file = out_file.replace(\"RAW\", \"CORR\")\n", - " print(f'Process file: {f}, with path {h5path_f}')\n", + " out_filename = out_folder / sequence_file.name.replace(\"RAW\", \"CORR\")\n", + " print(f'Process file: {sequence_file}, with path {h5path_f}')\n", " try:\n", - " with h5py.File(out_file, \"w\") as ofile:\n", - " copy_and_sanitize_non_cal_data(infile, ofile, h5path_f)\n", + " with h5py.File(out_filename, \"w\") as outfile:\n", + " copy_and_sanitize_non_cal_data(infile, outfile, h5path_f)\n", "\n", " oshape = infile[h5path_f+\"/adc\"].shape\n", " print(f'Data shape: {oshape}')\n", @@ -434,18 +420,18 @@ " raise ValueError(f\"No image data: shape {oshape}\")\n", " # Chunk always contains >= 1 complete image\n", " chunk_shape = (chunk_size_idim, 1) + oshape[-2:]\n", - " ddset = ofile.create_dataset(h5path_f+\"/adc\",\n", - " oshape,\n", - " chunks=chunk_shape,\n", - " dtype=np.float32)\n", - "\n", - " mskset = ofile.create_dataset(h5path_f+\"/mask\",\n", - " oshape,\n", - " chunks=chunk_shape,\n", - " dtype=np.uint32,\n", - " compression=\"gzip\", compression_opts=1, shuffle=True)\n", - "\n", - " # Run ip Cluster parallelization over chunks of images\n", + "\n", + " ddset = outfile.create_dataset(h5path_f+\"/adc\",\n", + " oshape,\n", + " chunks=chunk_shape,\n", + " dtype=np.float32)\n", + "\n", + " mskset = outfile.create_dataset(h5path_f+\"/mask\",\n", + " oshape,\n", + " chunks=chunk_shape,\n", + " dtype=np.uint32,\n", + " compression=\"gzip\", compression_opts=1, shuffle=True)\n", + " # Parallelize over chunks of images\n", " inp = []\n", " max_ind = oshape[0]\n", " ind = 0\n", @@ -464,23 +450,21 @@ " else:\n", " m = None\n", " print(f'To process: {d.shape}')\n", - " inp.append((d,g,m, ind, k==0))\n", + " inp.append((d, g, m, ind, sequence_file_number==0))\n", " ind += chunk_size\n", "\n", " print('Preparation time: ', time.time() - ts)\n", " ts = time.time()\n", "\n", " print(f'Run {len(inp)} processes')\n", - " p = partial(correct_chunk, offset_map, mask, gain_map, memoryCells, no_relative_gain)\n", + " p = partial(correct_chunk, offset_map, mask, gain_map, memory_cells, no_relative_gain)\n", "\n", - " r = view.map_sync(p, inp)\n", - " # Used for debugging correct chunk\n", - " #r = list(map(p, inp))\n", + " r = pool.map(p, inp)\n", " \n", - " if k==0:\n", + " if sequence_file_number == 0:\n", " (_,_,_,\n", - " rim_data[karabo_da[i]], fim_data[karabo_da[i]],\n", - " gim_data[karabo_da[i]], msk_data[karabo_da[i]], _) = r[0]\n", + " rim_data[local_karabo_da], fim_data[local_karabo_da],\n", + " gim_data[local_karabo_da], msk_data[local_karabo_da], _) = r[0]\n", "\n", " print('Correction time: ', time.time() - ts)\n", " ts = time.time()\n", @@ -495,7 +479,8 @@ "\n", " print('Saving time: ', time.time() - ts)\n", " except Exception as e:\n", - " print(f\"Error: {e}\")" + " print(f\"Error: {e}\")\n", + "pool.close()" ] }, { @@ -513,8 +498,7 @@ " ax.set_ylabel(y_axis)\n", " ax.set_title(title)\n", " cb = fig.colorbar(im)\n", - " cb.set_label(\"Counts\")\n", - " " + " cb.set_label(\"Counts\")" ] }, { @@ -524,8 +508,10 @@ "outputs": [], "source": [ "for mod in rim_data: \n", - " h, ex, ey = np.histogram2d(rim_data[mod].flatten(), gim_data[mod].flatten(),\n", - " bins=[100, 4], range=[[0, 10000], [0,4]])\n", + " h, ex, ey = np.histogram2d(rim_data[mod].flatten(),\n", + " gim_data[mod].flatten(),\n", + " bins=[100, 4],\n", + " range=[[0, 10000], [0, 4]])\n", " do_2d_plot(h, (ex, ey), \"Signal (ADU)\", \"Gain Bit Value\", f'Module {mod}')" ] }, diff --git a/tests/test_cal_tools.py b/tests/test_cal_tools.py index 24d61719ce141e78e517e32cc02816e02228f40f..7fe26e12a9fb96a9a38f006311726da13ae46559 100644 --- a/tests/test_cal_tools.py +++ b/tests/test_cal_tools.py @@ -20,7 +20,7 @@ def test_dir_creation_date(): date = get_dir_creation_date(folder, 9983) assert isinstance(date, datetime) - assert str(date) == '2020-09-23 13:30:50' + assert str(date) == '2020-09-23 13:30:50+00:00' with pytest.raises(ValueError) as e: get_dir_creation_date(folder, 4) diff --git a/webservice/messages.py b/webservice/messages.py index 3f3255daa3510fe9333024996156344c84dc3369..957ebfe33836474a27ebf82f7bbd56e17acf3caa 100644 --- a/webservice/messages.py +++ b/webservice/messages.py @@ -13,6 +13,7 @@ class Errors: MDC_RESPONSE = "FAILED: Response error from MDC: {}" NOT_CONFIGURED = "FAILED: instrument not configured, please contact det-support@xfel.eu" NOT_SUBMITTED = "FAILED: correction of {} failed during submision, please contact det-support@xfel.eu" + OTHER_ERROR = "FAILED: Error {}, please contact det-support@xfel.eu" class MDC: diff --git a/webservice/webservice.py b/webservice/webservice.py index de19d5a2545ce55ef540d4cb016e908e3c0fefd8..0d1696e53e29fb83971c47cf4784062198834d93 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -3,16 +3,16 @@ import asyncio import copy import getpass import glob +import inspect import json import logging import os import sqlite3 -import traceback import urllib.parse from asyncio import get_event_loop, shield from datetime import datetime from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import yaml import zmq @@ -95,10 +95,9 @@ def init_config_repo(config): logging.info("Config repo is initialized") -async def upload_config(socket, config, yaml, instrument, cycle, proposal): +async def upload_config(config, yaml, instrument, cycle, proposal) -> bytes: """ Upload a new configuration YAML - :param socket: ZMQ socket to send reply on :param config: the configuration defined in the `config-repo` section of the webservice.yaml configuration. :param yaml: the YAML contents to update @@ -128,7 +127,7 @@ async def upload_config(socket, config, yaml, instrument, cycle, proposal): datetime.now().isoformat())) repo.remote().push() logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal)) - socket.send(Success.UPLOADED_CONFIG.format(cycle, proposal).encode()) + return Success.UPLOADED_CONFIG.format(cycle, proposal).encode() def merge(source: Dict, destination: Dict) -> Dict: @@ -156,8 +155,8 @@ def merge(source: Dict, destination: Dict) -> Dict: return destination -async def change_config(socket, config, updated_config, karabo_id, instrument, - cycle, proposal, apply=False): +def change_config(config, updated_config, karabo_id, instrument, + cycle, proposal, apply=False) -> bytes: """ Change the configuration of a proposal @@ -166,7 +165,6 @@ async def change_config(socket, config, updated_config, karabo_id, instrument, Changes are committed to git. - :param socket: ZMQ socket to send reply on :param config: repo config as given in YAML config file :param updated_config: a dictionary containing the updated config :param instrument: the instrument to change config for @@ -206,7 +204,7 @@ async def change_config(socket, config, updated_config, karabo_id, instrument, "Update to proposal YAML: {}".format(datetime.now().isoformat())) repo.remote().push() logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal)) - socket.send(yaml.dump(new_conf, default_flow_style=False).encode()) + return yaml.dump(new_conf, default_flow_style=False).encode() async def run_proc_async(cmd: List[str]) -> (int, bytes): @@ -262,7 +260,7 @@ async def slurm_job_status(jobid): return "NA", "NA", "NA" -async def query_rid(conn, socket, rid): +def query_rid(conn, rid) -> bytes: c = conn.cursor() c.execute("SELECT * FROM jobs WHERE rid LIKE ?", rid) combined = {} @@ -293,7 +291,7 @@ async def query_rid(conn, socket, rid): msg += "\n".join(statii) if msg == "": msg = 'NA' - socket.send(msg.encode()) + return msg.encode() def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]: @@ -350,7 +348,7 @@ async def update_job_db(config): rid, jobid, proposal, run, status, time, _, action = r logging.debug("DB info {}".format(r)) - cflg, cstatus = combined.get(rid, ([], [])) + cflg, cstatus = combined.get((rid, action), ([], [])) if jobid in statii: slstatus, runtime = statii[jobid] query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?" @@ -368,17 +366,22 @@ async def update_job_db(config): else: cflg.append("NA") cstatus.append(slstatus) - combined[rid] = cflg, cstatus + combined[rid, action] = cflg, cstatus conn.commit() flg_order = {"R": 2, "A": 1, "NA": 0} dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'} - for rid, value in combined.items(): + for rid, action in combined: if int(rid) == 0: # this job was not submitted from MyMDC continue - flgs, statii = value + flgs, statii = combined[rid, action] # sort by least done status flg = max(flgs, key=lambda i: flg_order[i]) + if flg != 'R': + logging.info( + "Jobs finished - action: %s, run id: %s, status: %s", + action, rid, flg, + ) msg = "\n".join(statii) msg_debug = f"Update MDC {rid}, {msg}" logging.debug(msg_debug.replace('\n', ', ')) @@ -391,10 +394,11 @@ async def update_job_db(config): 'calcat_feedback': msg}} response = mdc.update_dark_run_api(rid, data) if response.status_code != 200: + logging.error("Failed to update MDC for action %s, rid %s", + action, rid) logging.error(Errors.MDC_RESPONSE.format(response)) - except Exception as e: - e = str(e) - logging.error(f"Failure to update job DB: {e}") + except Exception: + logging.error(f"Failure to update job DB", exc_info=True) await asyncio.sleep(time_interval) @@ -416,7 +420,7 @@ async def copy_untouched_files(file_list, out_folder, run): logging.info("Copying {} to {}".format(f, of)) -async def run_action(job_db, cmd, mode, proposal, run, rid): +async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: """ Run action command (CORRECT OR DARK) :param job_db: jobs database @@ -425,7 +429,7 @@ async def run_action(job_db, cmd, mode, proposal, run, rid): but the command will be logged :param proposal: proposal the command was issued for :param run: run the command was issued for - :param: rid: run id in the MDC + :param rid: run id in the MDC Returns a formatted Success or Error message indicating outcome of the execution. @@ -474,7 +478,7 @@ async def run_action(job_db, cmd, mode, proposal, run, rid): return Success.START_CORRECTION_SIM.format(proposal, run) -async def wait_on_transfer(rpath, max_tries=300): +async def wait_on_transfer(rpath, max_tries=300) -> bool: """ Wait on data files to be transferred to Maxwell @@ -511,6 +515,34 @@ async def wait_on_transfer(rpath, max_tries=300): await asyncio.sleep(10) +async def wait_transfers( + wait_runs: List[str], in_folder: str, proposal: str +) -> bool: + """Wait for multiple runs to be transferred to Maxwell. + + :param wait_runs: Run numbers to wait for + :param in_folder: Proposal raw directory containing runs + :param proposal: Proposal number + :return: True if all runs transferred, false on timeout + """ + logging.debug("Waiting for: propsal %s, runs %s", proposal, wait_runs) + + # FIXME: this loop should be an asyncio.gather + for runnr in wait_runs: + rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) + transfer_complete = await wait_on_transfer(rpath) + if not transfer_complete: + logging.error( + Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr) + ) + return False + + logging.info( + "Transfer complete: proposal %s, runs %s", proposal, wait_runs + ) + return True + + def check_files(in_folder: str, runs: List[int], karabo_das: List[str]) -> bool: @@ -553,6 +585,7 @@ async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str, response = await shield(loop.run_in_executor(None, mdc.update_dark_run_api, rid, data)) if response.status_code != 200: + logging.error("Failed to update MDC dark report path for run id %s", rid) logging.error(Errors.MDC_RESPONSE.format(response)) @@ -584,441 +617,561 @@ async def update_mdc_status(mdc: MetadataClient, action: str, func = mdc.update_run_api data = {'flg_cal_data_status': flag, 'cal_pipeline_reply': message} - if action == 'dark_request': + elif action == 'dark_request': func = mdc.update_dark_run_api data = {'dark_run': {'flg_status': flag, 'calcat_feedback': message}} + else: + raise ValueError(f"Unexpected action: {action}") loop = get_event_loop() response = await shield(loop.run_in_executor(None, func, rid, data)) if response.status_code != 200: + logging.error("Failed to update MDC status for action %s, run id %s", + action, rid) logging.error(Errors.MDC_RESPONSE.format(response)) -async def server_runner(config, mode): - """ The main server loop - - The main server loop handles remote requests via a ZMQ interface. - - Requests are the form of ZMQ.REQuest and have the format - - command, *params - - where *parms is a string-encoded python list as defined by the - commands. The following commands are currently understood: - - - correct, with parmeters rid, sase, instrument, cycle, proposal, runnr - - where - - :param rid: is the runid within the MDC database - :param sase: is the sase beamline - :param instrument: is the instrument - :param cycle: is the facility cycle - :param proposal: is the proposal id - :param runnr: is the run number in integer form, e.g. without leading - "r" - - This will trigger a correction process to be launched for that run in - the given cycle and proposal. - - - dark_request, with parameters rid, sase, instrument, cycle, proposal, - did, operation_mode, pdu_names, karabo_das, runnr - - where - - :param rid: is the runid within the MDC database - :param sase: is the sase beamline - :param instrument: is the instrument - :param cycle: is the facility cycle - :param proposal: is the proposal id - :param did: is the detector karabo id - :param operation_mode: is the detector's operation mode, as defined in - CalCat - :param pdu_names: physical detector units for each modules - :param karabo_das: the Data Agreggators representing which detector - modules to calibrate - :param runnr: is the run number in integer form, i.e. without leading - "r" - - - upload-yaml, with parameters sase, instrument, cycle, proposal, yaml - - where - - :param sase: is the sase beamline - :param instrument: is the instrument - :param cycle: is the facility cycle - :param proposal: is the proposal id - :param yaml: is url-encoded (quotes and spaces) representation of - new YAML file - - This will create or replace the existing YAML configuration for the - proposal and cycle with the newly sent one, and then push it to the git - configuration repo. - - """ - - init_config_repo(config['config-repo']) - job_db = await init_job_db(config) - mdc = await init_md_client(config) - - context = zmq.asyncio.Context() - auth = zmq.auth.thread.ThreadAuthenticator(context) - if mode == "prod-auth": - auth.start() - auth.allow(config['web-service']['allowed-ips']) +class ActionsServer: + def __init__(self, config, mode, job_db, mdc): + self.config = config + self.mode = mode + self.job_db = job_db + self.mdc = mdc + + # Set up a ZMQ socket to listen for requests + self.zmq_ctx = zmq.asyncio.Context() + auth = zmq.auth.thread.ThreadAuthenticator(self.zmq_ctx) + if mode == "prod-auth": + auth.start() + auth.allow(config['web-service']['allowed-ips']) + + self.socket = self.zmq_ctx.socket(zmq.REP) + self.socket.zap_domain = b'global' + self.socket.bind("{}:{}".format(config['web-service']['bind-to'], + config['web-service']['port'])) + + # __init__ can't be async - this is a workaround + @classmethod + async def ainit(cls, config, mode): + init_config_repo(config['config-repo']) + job_db = await init_job_db(config) + mdc = await init_md_client(config) + return cls(config, mode, job_db, mdc) + + @classmethod + async def launch(cls, config, mode): + server = await cls.ainit(config, mode) + return await server.run() + + async def run(self): + """The main server loop + + The main server loop handles remote requests via a ZMQ interface. + + Requests are the form of ZMQ.REQuest and have the format + + command, *params + + where *parms is a string-encoded python list as defined by the + commands. + """ + while True: + req = await self.socket.recv_multipart() + logging.debug("Raw request data: %r", req) + try: + resp = await self.handle_one_req(req) + except Exception as e: + logging.error("Unexpected error handling request", exc_info=e) + resp = Errors.OTHER_ERROR.format(e).encode() - socket = context.socket(zmq.REP) - socket.zap_domain = b'global' - socket.bind("{}:{}".format(config['web-service']['bind-to'], - config['web-service']['port'])) + logging.debug("Sending response: %r", resp) + await self.socket.send(resp) - while True: - response = await socket.recv_multipart() - if isinstance(response, list) and len(response) == 1: + async def handle_one_req(self, req: List[bytes]) -> bytes: + if len(req) == 1: try: # protect against unparseable requests - response = eval(response[0]) + req = eval(req[0]) except SyntaxError as e: logging.error(str(e)) - socket.send(Errors.REQUEST_FAILED.encode()) - continue + return Errors.REQUEST_FAILED.encode() - if len(response) < 2: # catch parseable but malformed requests - logging.error(Errors.REQUEST_MALFORMED.format(response)) - socket.send(Errors.REQUEST_MALFORMED.format(response).encode()) - continue + if len(req) < 2: # catch parseable but malformed requests + logging.error(Errors.REQUEST_MALFORMED.format(req)) + return Errors.REQUEST_MALFORMED.format(req).encode() - # FIXME: action should be an enum - action, payload = response[0], response[1:] + action, *payload = req - if action not in ['correct', 'dark', 'dark_request', 'query-rid', - 'upload-yaml', 'update_conf']: + if action not in self.accepted_actions: logging.warning(Errors.UNKNOWN_ACTION.format(action)) - socket.send(Errors.UNKNOWN_ACTION.format(action).encode()) - continue + return Errors.UNKNOWN_ACTION.format(action).encode() - logging.debug('{}, {}'.format(action, payload)) + logging.info("Handling request for action %s", action) + logging.debug('Running action %s, payload %r', action, payload) - if action == "query-rid": - rid = payload[0] - await query_rid(job_db, socket, rid) - continue + handler = getattr(self, 'handle_' + action.replace('-', '_')) - async def do_action(action, payload): # FIXME: this needn't be nested - in_folder = None - run_mapping = {} - priority = None # TODO: Investigate argument - - if action == 'update_conf': - updated_config = None - try: - sase, karabo_id, instrument, cycle, proposal, config_yaml, apply = payload # noqa - updated_config = json.loads(config_yaml) - await change_config(socket, config['config-repo'], - updated_config, karabo_id, instrument, - cycle, proposal, - apply.upper() == "TRUE") - except Exception as e: - e = str(e) - err_msg = (f"Failure applying config for {proposal}:" - f" {e}: {updated_config}") - logging.error(err_msg) - logging.error(f"Unexpected error: {traceback.format_exc()}") # noqa - socket.send(yaml.dump(err_msg, - default_flow_style=False).encode()) - - if action in ['dark', 'correct', 'dark_request']: - request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') - try: - wait_runs: List[str] = [] - rid, sase, instrument, cycle, proposal, *payload = payload - - if action == 'correct': - runnr, priority = payload - runnr = runnr.strip('r') - wait_runs = [runnr] - - if action == 'dark': - karabo_ids, karabo_das, *runs = payload - - karabo_ids = karabo_ids.split(',') - karabo_das = karabo_das.split(',') - for i, run in enumerate(runs): - erun = eval(run) - if isinstance(erun, (list, tuple)): - typ, runnr = erun - if typ == "reservation": - continue - runnr = runnr.strip('r') - run_mapping[typ] = runnr - wait_runs.append(runnr) - else: - run_mapping['no_mapping_{}'.format(i)] = erun - wait_runs.append(erun) - - if action == 'dark_request': - karabo_id, operation_mode, *payload = payload - payload = eval(','.join(payload)) - pdus, karabo_das, wait_runs = payload - - karabo_das = [val.strip() for val in karabo_das] - wait_runs = [str(val) for val in wait_runs] - - proposal = proposal.strip('p') - proposal = "{:06d}".format(int(proposal)) - - logging.info(f'{action} of {proposal} run {wait_runs} at ' - f'{instrument} is requested. Checking files.') - - # Read calibration configuration from yaml - conf_file = Path(config['config-repo']['local-path'], - cycle, f'{proposal}.yaml') - if not conf_file.exists(): - conf_file = Path(config['config-repo']['local-path'], - "default.yaml") - - with open(conf_file, "r") as f: - pconf_full = yaml.load(f.read(), - Loader=yaml.FullLoader) - - # FIXME: remove once MyMDC sends `dark` action - action_ = 'dark' if action == 'dark_request' else action - data_conf = pconf_full['data-mapping'] - if instrument in pconf_full[action_]: - pconf = pconf_full[action_][instrument] - else: - socket.send(Errors.NOT_CONFIGURED.encode()) - logging.info(f'Instrument {instrument} is unknown') - return + # Verify that requests contains the right number of parameters + sig = inspect.signature(handler) + try: + sig.bind(*payload) + except TypeError: + logging.error( + "Wrong number of arguments for action %s", action, exc_info=True + ) + return Errors.REQUEST_MALFORMED.format(req).encode() + + res = handler(*payload) + if asyncio.iscoroutine(res): + res = await res + return res + + accepted_actions = { + 'correct', 'dark', 'dark_request', 'query-rid', 'upload-yaml', + 'update_conf', + } + + # Handler methods for each available action ------------------------------ + + async def handle_correct( + self, rid, _sase, instrument, cycle, proposal, runnr, priority + ): + """Launch detector correction + + :param rid: is the runid within the MDC database + :param _sase: is the sase beamline + :param instrument: is the instrument + :param cycle: is the facility cycle + :param proposal: is the proposal id + :param runnr: is the run number in integer form, e.g. without leading + "r" + + This will trigger a correction process to be launched for that run in + the given cycle and proposal. + """ + request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + try: + runnr = runnr.strip('r') - in_folder = config[action_]['in-folder'].format( - instrument=instrument, cycle=cycle, proposal=proposal) + proposal = self._normalise_proposal_num(proposal) + pconf_full = self.load_proposal_config(cycle, proposal) - msg = Success.QUEUED.format(proposal, wait_runs) - socket.send(msg.encode()) - logging.debug(msg) + data_conf = pconf_full['data-mapping'] + if instrument in pconf_full['correct']: + pconf = pconf_full['correct'][instrument] + else: + logging.info(f'Instrument {instrument} is unknown') + return Errors.NOT_CONFIGURED.encode() - if action in ['correct', 'dark_request']: - await update_mdc_status(mdc, action, rid, msg) + in_folder = self.config['correct']['in-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal) + out_folder = self.config['correct']['out-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal, + run='r{:04d}'.format(int(runnr)) + ) - except Exception as e: - e = str(e) - msg = Errors.JOB_LAUNCH_FAILED.format(action, e) - logging.error(msg) - socket.send(msg.encode()) + except Exception as e: + msg = Errors.JOB_LAUNCH_FAILED.format('correct', e) + logging.error(msg, exc_info=e) + asyncio.ensure_future( + update_mdc_status(self.mdc, 'correct', rid, msg) + ) + return msg.encode() - if action in ['correct', 'dark_request']: - await update_mdc_status(mdc, action, rid, msg) - return + queued_msg = Success.QUEUED.format(proposal, [runnr]) + logging.debug(queued_msg) - # Check if all files for given runs are transferred - all_transfers = [] - - # FIXME: this loop should be an asyncio.gather - for runnr in wait_runs: - rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) - transfer_complete = await wait_on_transfer(rpath) - all_transfers.append(transfer_complete) - if not transfer_complete: - logging.error( - Errors.TRANSFER_EVAL_FAILED.format(proposal, - runnr)) - if action in ['correct', 'dark_request']: - await update_mdc_status(mdc, action, rid, - MDC.MIGRATION_TIMEOUT) - - if not all(all_transfers): - logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal, ','.join(wait_runs))) # noqa + async def _continue(): + """Runs in the background after we reply to the 'correct' request""" + await update_mdc_status(self.mdc, 'correct', rid, queued_msg) + try: + transfer_complete = await wait_transfers( + [runnr], in_folder, proposal + ) + if not transfer_complete: + # Timed out + await update_mdc_status(self.mdc, 'correct', rid, + MDC.MIGRATION_TIMEOUT) return - logging.debug(f"Now doing: {action}") - ts = datetime.now().strftime('%y%m%d_%H%M%S') - if action == 'dark': + rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) + + # Prepare configs for all detectors in run + fl = glob.glob(f"{rpath}/*.h5") + corr_file_list = set() + copy_file_list = set(fl) detectors = {} - out_folder = config[action]['out-folder'].format( - instrument=instrument, cycle=cycle, proposal=proposal, - runs="_".join(wait_runs)) + for karabo_id in pconf: + dconfig = data_conf[karabo_id] + # check for files according to mapping in raw run dir. + if any(y in x for x in fl + for y in dconfig['karabo-da']): + for karabo_da in dconfig['karabo-da']: + tfl = glob.glob(f"{rpath}/*{karabo_da}*.h5") + corr_file_list = corr_file_list.union(set(tfl)) + thisconf = copy.copy(dconfig) + if isinstance(pconf[karabo_id], dict): + thisconf.update(copy.copy(pconf[karabo_id])) + thisconf["in-folder"] = in_folder + thisconf["out-folder"] = out_folder + thisconf["karabo-id"] = karabo_id + thisconf["run"] = runnr + if priority: + thisconf["priority"] = str(priority) - # Run over all available detectors - if karabo_ids[0] == 'all': - karabo_ids = list(pconf.keys()) + detectors[karabo_id] = thisconf + copy_file_list = copy_file_list.difference(corr_file_list) + asyncio.ensure_future(copy_untouched_files(copy_file_list, + out_folder, + runnr)) + except Exception as corr_e: + logging.error(f"Error during correction", exc_info=corr_e) + await update_mdc_status(self.mdc, 'correct', rid, + Errors.REQUEST_FAILED) + return + + if len(detectors) == 0: + msg = Errors.NOTHING_TO_DO.format(rpath) + logging.warning(msg) + await update_mdc_status(self.mdc, 'correct', rid, msg) + return + + ret, _ = await self.launch_jobs( + [runnr], rid, detectors, 'correct', instrument, cycle, proposal, + request_time, + ) + await update_mdc_status(self.mdc, 'correct', rid, ret) + # END of part to run after sending reply + + asyncio.ensure_future(_continue()) + + return queued_msg.encode() + + async def handle_dark( + self, rid, _sase, instrument, cycle, proposal, karabo_ids, + karabo_das, *runs + ): + request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + try: + run_mapping = {} + wait_runs = [] + + karabo_ids = karabo_ids.split(',') + karabo_das = karabo_das.split(',') + + for i, run in enumerate(runs): + erun = eval(run) + if isinstance(erun, (list, tuple)): + typ, runnr = erun + if typ == "reservation": + continue + runnr = runnr.strip('r') + run_mapping[typ] = runnr + wait_runs.append(runnr) + else: + run_mapping['no_mapping_{}'.format(i)] = erun + wait_runs.append(erun) - # Prepare configs for all requested detectors - for karabo_id in karabo_ids: + proposal = self._normalise_proposal_num(proposal) + pconf_full = self.load_proposal_config(cycle, proposal) - # use selected karabo_das - if karabo_das[0] == 'all': - karabo_da = data_conf[karabo_id]["karabo-da"] + data_conf = pconf_full['data-mapping'] + if instrument in pconf_full['dark']: + pconf = pconf_full['dark'][instrument] + else: + logging.info(f'Instrument {instrument} is unknown') + return Errors.NOT_CONFIGURED.encode() - # Check if any files for given karabo-das exists - if check_files(in_folder, wait_runs, karabo_da): - thisconf = copy.copy(data_conf[karabo_id]) + # Run over all available detectors + if karabo_ids[0] == 'all': + karabo_ids = list(pconf.keys()) - if (karabo_id in pconf and - isinstance(pconf[karabo_id], dict)): - thisconf.update(copy.copy(pconf[karabo_id])) + in_folder = self.config['dark']['in-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal) + out_folder = self.config['dark']['out-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal, + runs="_".join(wait_runs)) - thisconf["in-folder"] = in_folder - thisconf["out-folder"] = '/'.join((out_folder, - karabo_id.replace('-', '_'))) # noqa FIXME Make use of pathlib - thisconf["karabo-id"] = karabo_id - thisconf["karabo-da"] = karabo_da + except Exception as e: + msg = Errors.JOB_LAUNCH_FAILED.format('dark', e) + logging.error(msg, exc_info=e) + return msg.encode() + + async def _continue(): + """Runs in the background after we reply to the 'dark' request""" + transfer_complete = await wait_transfers( + wait_runs, in_folder, proposal + ) + if not transfer_complete: + return # Timed out + + detectors = {} + + # Prepare configs for all requested detectors + for karabo_id in karabo_ids: + + # use selected karabo_das + karabo_das_for_id = karabo_das + if karabo_das[0] == 'all': + karabo_das_for_id = data_conf[karabo_id]["karabo-da"] + + # Check if any files for given karabo-das exists + if check_files(in_folder, wait_runs, karabo_das_for_id): + thisconf = copy.copy(data_conf[karabo_id]) + + if (karabo_id in pconf and + isinstance(pconf[karabo_id], dict)): + thisconf.update(copy.copy(pconf[karabo_id])) + + thisconf["in-folder"] = in_folder + thisconf["out-folder"] = os.path.join( + out_folder, karabo_id.replace('-', '_') + ) + thisconf["karabo-id"] = karabo_id + thisconf["karabo-da"] = karabo_das_for_id + + run_config = [] + for typ, run in run_mapping.items(): + if "no_mapping" in typ: + run_config.append(run) + else: + thisconf[typ] = run + if len(run_config): + thisconf["runs"] = ",".join(run_config) + + detectors[karabo_id] = thisconf + else: + logging.warning( + "File list for %s in proposal %s runs %s is empty", + karabo_id, proposal, wait_runs + ) + if len(detectors) == 0: + logging.warning(Errors.NOTHING_TO_DO.format(wait_runs)) + return + + await self.launch_jobs( + wait_runs, 0, detectors, 'dark', instrument, cycle, proposal, + request_time, + ) + # END of part to run after sending reply + + asyncio.ensure_future(_continue()) + + msg = Success.QUEUED.format(proposal, wait_runs) + logging.debug(msg) + return msg.encode() + + async def handle_dark_request( + self, rid, _sase, instrument, cycle, proposal, karabo_id, + operation_mode, *extra + ): + """Launch dark run processing + + :param rid: is the runid within the MDC database + :param _sase: is the sase beamline + :param instrument: is the instrument + :param cycle: is the facility cycle + :param proposal: is the proposal id + :param karabo_id: is the detector karabo id + :param operation_mode: is the detector's operation mode, as defined in + CalCat + :param pdu_names: physical detector units for each modules + :param karabo_das: the Data Agreggators representing which detector + modules to calibrate + :param runnr: is the run number in integer form, i.e. without leading + "r" + """ + request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + try: + pdus, karabo_das, wait_runs = eval(','.join(extra)) - run_config = [] - for typ, run in run_mapping.items(): - if "no_mapping" in typ: - run_config.append(run) - else: - thisconf[typ] = run - if len(run_config): - thisconf["runs"] = ",".join(run_config) + karabo_das = [val.strip() for val in karabo_das] + runs = [str(val) for val in wait_runs] - detectors[karabo_id] = thisconf - else: - logging.warning("File list for {} at {} is empty" - .format(karabo_id, - "{}/*.h5".format(rpath))) - - if len(detectors) == 0: - logging.warning(Errors.NOTHING_TO_DO.format(rpath)) - - if action == 'correct': - try: - runnr = wait_runs[0] - rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) - - out_folder = config[action]['out-folder'].format( - instrument=instrument, cycle=cycle, proposal=proposal, - run='r{:04d}'.format(int(runnr))) - - # Prepare configs for all detectors in run - fl = glob.glob(f"{rpath}/*.h5") - corr_file_list = set() - copy_file_list = set(fl) - detectors = {} - for karabo_id in pconf: - dconfig = data_conf[karabo_id] - # check for files according to mapping in raw run dir. - if any(y in x for x in fl - for y in dconfig['karabo-da']): - for karabo_da in dconfig['karabo-da']: - tfl = glob.glob(f"{rpath}/*{karabo_da}*.h5") - corr_file_list = corr_file_list.union(set(tfl)) - thisconf = copy.copy(dconfig) - if isinstance(pconf[karabo_id], dict): - thisconf.update(copy.copy(pconf[karabo_id])) - thisconf["in-folder"] = in_folder - thisconf["out-folder"] = out_folder - thisconf["karabo-id"] = karabo_id - thisconf["run"] = runnr - if priority: - thisconf["priority"] = str(priority) - - detectors[karabo_id] = thisconf - copy_file_list = copy_file_list.difference(corr_file_list) - asyncio.ensure_future(copy_untouched_files(copy_file_list, - out_folder, - runnr)) - if len(detectors) == 0: - logging.warning(Errors.NOTHING_TO_DO.format(rpath)) - await update_mdc_status(mdc, action, rid, - MDC.NOTHING_TO_DO) - return - - except Exception as corr_e: - logging.error(f"Error during correction: {str(corr_e)}") - await update_mdc_status(mdc, action, rid, - Errors.REQUEST_FAILED) - - if action == 'dark_request': - runs = [str(r) for r in wait_runs] - - # Notebooks require one or three runs, depending on the - # detector type and operation mode. - triple = any(det in karabo_id for det in - ["LPD", "AGIPD", "JUNGFRAU", "JF", "JNGFR"]) - - if triple and len(runs) == 1: - runs_dict = {'run-high': runs[0], - 'run-med': '0', - 'run-low': '0'} - elif triple and len(runs) == 3: - runs_dict = {'run-high': runs[0], - 'run-med': runs[1], - 'run-low': runs[2]} - else: # single - runs_dict = {'run': runs[0]} - - out_folder = config['dark']['out-folder'].format( - instrument=instrument, cycle=cycle, proposal=proposal, - runs='_'.join(runs)) - out_folder = str(Path(out_folder, - karabo_id.replace('-', '_'))) - - # We assume that MyMDC does not allow dark request if the data - # is not migrated, thus skipping some validation here. - thisconf = copy.copy(data_conf[karabo_id]) - - if (karabo_id in pconf - and isinstance(pconf[karabo_id], dict)): - thisconf.update(copy.copy(pconf[karabo_id])) - - thisconf['in-folder'] = in_folder - thisconf['out-folder'] = out_folder - thisconf['karabo-id'] = karabo_id - thisconf['karabo-da'] = karabo_das - thisconf['operation-mode'] = operation_mode - - thisconf.update(runs_dict) - - detectors = {karabo_id: thisconf} - - if action in ['correct', 'dark', 'dark_request']: - # run xfel_calibrate - action_ = 'dark' if action == 'dark_request' else action - for karabo_id, dconfig in detectors.items(): - detector = dconfig['detector-type'] - del dconfig['detector-type'] - cmd = config[action_]['cmd'].format( - detector=detector, - sched_prio=str(config[action_]['sched-prio']), - action=action_, instrument=instrument, - cycle=cycle, proposal=proposal, - runs="_".join([f"r{r}" for r in wait_runs]), - time_stamp=ts, - det_instance=karabo_id, - request_time=request_time - ).split() - - cmd = parse_config(cmd, dconfig) - - rid = rid if action in ['correct', 'dark_request'] else 0 - ret = await run_action(job_db, cmd, mode, - proposal, runnr, rid) - - if action == 'correct': - await update_mdc_status(mdc, action, rid, ret) - if action == 'dark_request': - await update_mdc_status(mdc, action, rid, ret) - report_idx = cmd.index('--report-to') + 1 - report = cmd[report_idx] + '.pdf' - await update_darks_paths(mdc, rid, in_folder, - out_folder, report) - - # TODO: moving this block further up reduces the need of so - # many nested ifs. Move up and return directly - if action == 'upload-yaml': - sase, instrument, cycle, proposal, this_yaml = payload - this_yaml = urllib.parse.unquote_plus(this_yaml) - await upload_config(socket, config['config-repo'], this_yaml, - instrument, cycle, proposal) + proposal = self._normalise_proposal_num(proposal) - try: - asyncio.ensure_future( - do_action(copy.copy(action), copy.copy(payload))) - except Exception as e: # actions that fail are only error logged - logging.error(str(e)) + pconf_full = self.load_proposal_config(cycle, proposal) + + data_conf = pconf_full['data-mapping'] + if instrument in pconf_full['dark']: + pconf = pconf_full['dark'][instrument] + else: + logging.info(f'Instrument {instrument} is unknown') + return Errors.NOT_CONFIGURED.encode() + in_folder = self.config['dark']['in-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal) + out_folder = self.config['dark']['out-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal, + runs='_'.join(runs)) + out_folder = str(Path(out_folder, karabo_id.replace('-', '_'))) + + except Exception as e: + msg = Errors.JOB_LAUNCH_FAILED.format('dark_request', e) + logging.error(msg, exc_info=e) + asyncio.ensure_future( + update_mdc_status(self.mdc, 'dark_request', rid, msg) + ) + return msg.encode() + + queued_msg = Success.QUEUED.format(proposal, runs) + logging.debug(queued_msg) + + async def _continue(): + """Runs in the background after we reply to the 'dark_request' request""" + await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg) + + transfer_complete = await wait_transfers( + runs, in_folder, proposal + ) + if not transfer_complete: + # Timed out + await update_mdc_status( + self.mdc, 'dark_request', rid, MDC.MIGRATION_TIMEOUT + ) + return + + # Notebooks require one or three runs, depending on the + # detector type and operation mode. + triple = any(det in karabo_id for det in + ["LPD", "AGIPD", "JUNGFRAU", "JF", "JNGFR"]) + + if triple and len(runs) == 1: + runs_dict = {'run-high': runs[0], + 'run-med': '0', + 'run-low': '0'} + elif triple and len(runs) == 3: + runs_dict = {'run-high': runs[0], + 'run-med': runs[1], + 'run-low': runs[2]} + else: # single + runs_dict = {'run': runs[0]} + + # We assume that MyMDC does not allow dark request if the data + # is not migrated, thus skipping some validation here. + thisconf = copy.copy(data_conf[karabo_id]) + + if (karabo_id in pconf + and isinstance(pconf[karabo_id], dict)): + thisconf.update(copy.copy(pconf[karabo_id])) + + thisconf['in-folder'] = in_folder + thisconf['out-folder'] = out_folder + thisconf['karabo-id'] = karabo_id + thisconf['karabo-da'] = karabo_das + thisconf['operation-mode'] = operation_mode + + thisconf.update(runs_dict) + + detectors = {karabo_id: thisconf} + + ret, report_path = await self.launch_jobs( + runs, rid, detectors, 'dark', instrument, cycle, proposal, + request_time + ) + await update_mdc_status(self.mdc, 'dark_request', rid, ret) + if report_path is None: + logging.warning("Failed to identify report path for dark_request") + else: + await update_darks_paths(self.mdc, rid, in_folder, + out_folder, report_path) + # END of part to run after sending reply + + asyncio.ensure_future(_continue()) + + return queued_msg.encode() + + def handle_query_rid(self, rid): + return query_rid(self.job_db, rid) + + def handle_upload_yaml(self, _sase, instrument, cycle, proposal, this_yaml): + """Reconfigure detector calibration for the specified proposal + + :param _sase: is the sase beamline + :param instrument: is the instrument + :param cycle: is the facility cycle + :param proposal: is the proposal id + :param yaml: is url-encoded (quotes and spaces) representation of + new YAML file + + This will create or replace the existing YAML configuration for the + proposal and cycle with the newly sent one, and then push it to the git + configuration repo. + """ + this_yaml = urllib.parse.unquote_plus(this_yaml) + return upload_config( + self.config['config-repo'], this_yaml, instrument, cycle, proposal + ) + + async def handle_update_conf( + self, sase, karabo_id, instrument, cycle, proposal, config_yaml, + apply + ): + updated_config = None + try: + updated_config = json.loads(config_yaml) + return change_config(self.config['config-repo'], + updated_config, karabo_id, instrument, + cycle, proposal, + apply.upper() == "TRUE") + except Exception as e: + err_msg = (f"Failure applying config for {proposal}:" + f" {e}: {updated_config}") + logging.error(err_msg, exc_info=e) + return yaml.dump(err_msg, default_flow_style=False).encode() + + # Helper methods for handlers --------------------------------------------- + + @staticmethod + def _normalise_proposal_num(p: str) -> str: + return "{:06d}".format(int(p.strip('p'))) + + def load_proposal_config(self, cycle, proposal) -> Dict: + # Read calibration configuration from yaml + conf_file = Path( + self.config['config-repo']['local-path'], cycle, f'{proposal}.yaml' + ) + if not conf_file.exists(): + conf_file = Path( + self.config['config-repo']['local-path'], "default.yaml" + ) + + logging.debug("Loading config for cycle %s, proposal %s from %s", + cycle, proposal, conf_file) + + with open(conf_file, "r") as f: + return yaml.load(f.read(), Loader=yaml.FullLoader) + + async def launch_jobs( + self, run_nrs, rid, detectors, action, instrument, cycle, proposal, + request_time + ) -> (str, Optional[str]): + # run xfel_calibrate + for karabo_id, dconfig in detectors.items(): + detector = dconfig['detector-type'] + del dconfig['detector-type'] + cmd = self.config[action]['cmd'].format( + detector=detector, + sched_prio=str(self.config[action]['sched-prio']), + action=action, instrument=instrument, + cycle=cycle, proposal=proposal, + runs="_".join([f"r{r}" for r in run_nrs]), + time_stamp=datetime.now().strftime('%y%m%d_%H%M%S'), + det_instance=karabo_id, + request_time=request_time + ).split() + + cmd = parse_config(cmd, dconfig) + + ret = await run_action(self.job_db, cmd, self.mode, + proposal, run_nrs[-1], rid) + + if '--report-to' in cmd[:-1]: + report_idx = cmd.index('--report-to') + 1 + report = cmd[report_idx] + '.pdf' + else: + report = None + return ret, report parser = argparse.ArgumentParser( description='Start the calibration webservice') @@ -1028,7 +1181,7 @@ parser.add_argument('--mode', type=str, default="sim", choices=['sim', 'prod']) parser.add_argument('--logging', type=str, default="INFO", choices=['INFO', 'DEBUG', 'ERROR']) -if __name__ == "__main__": +def main(): args = vars(parser.parse_args()) conf_file = args["config_file"] with open(conf_file, "r") as f: @@ -1041,5 +1194,9 @@ if __name__ == "__main__": format=fmt) loop = asyncio.get_event_loop() loop.create_task(update_job_db(config)) - loop.run_until_complete(server_runner(config, mode)) + loop.run_until_complete(ActionsServer.launch(config, mode)) loop.close() + + +if __name__ == "__main__": + main()