diff --git a/.gitignore b/.gitignore index 47d66ada746a8a3f50dcaa576110574d79949d04..7e45073ed70843fde1ea1914457fdc44ccbff222 100644 --- a/.gitignore +++ b/.gitignore @@ -10,11 +10,12 @@ *.pkl *.png *.png +*.secrets.yaml +*.so *.tar *.tif *.tiff *.tmp -*.so */slurm_tmp* *egg* ./temp @@ -34,4 +35,3 @@ slurm_tmp* src/cal_tools/agipdalgs.c webservice/*.log webservice/*sqlite -webservice/webservice.yaml diff --git a/README.rst b/README.rst index f425fdee5ac4941585012e01c63bac3331d1e7f8..28a68631cee9763ffab2c516e78ba1549972febc 100644 --- a/README.rst +++ b/README.rst @@ -40,13 +40,13 @@ python. This can be activated with ``source /gpfs/exfel/sw/calsoft/.pyenv/bin/ac A quick setup would be: -0. ``source /gpfs/exfel/sw/calsoft/.pyenv/bin/activate`` -1. ``git clone ssh://git@git.xfel.eu:10022/detectors/pycalibration.git && cd pycalibration`` - clone the offline calibration package from EuXFEL GitLab -2. ``pyenv shell 3.8.11`` - load required version of python -3. ``python3 -m venv .venv`` - create the virtual environment -4. ``source .venv/bin/activate`` - activate the virtual environment -5. ``python3 -m pip install --upgrade pip`` - upgrade version of pip -6. ``python3 -m pip install .`` - install the pycalibration package (add ``-e`` flag for editable development installation) +1. ``source /gpfs/exfel/sw/calsoft/.pyenv/bin/activate`` +2. ``git clone ssh://git@git.xfel.eu:10022/detectors/pycalibration.git && cd pycalibration`` - clone the offline calibration package from EuXFEL GitLab +3. ``pyenv shell 3.8.11`` - load required version of python +4. ``python3 -m venv .venv`` - create the virtual environment +5. ``source .venv/bin/activate`` - activate the virtual environment +6. ``python3 -m pip install --upgrade pip`` - upgrade version of pip +7. ``python3 -m pip install .`` - install the pycalibration package (add ``-e`` flag for editable development installation) Copy/paste script: @@ -71,11 +71,11 @@ will downgrade/upgrade your local packages, which may cause major issues and may **break your local environment**, it is highly recommended to use the venv installation method instead. -0. ``source /gpfs/exfel/sw/calsoft/.pyenv/bin/activate`` -1. ``git clone ssh://git@git.xfel.eu:10022/detectors/pycalibration.git && cd pycalibration`` - clone the offline calibration package from EuXFEL GitLab -2. ``pyenv shell 3.8.11`` - load required version of python -3. ``pip install .`` - install the pycalibration package (add ``-e`` flag for editable development installation) -4. ``export PATH=$HOME/.local/bin:$PATH`` - make sure that the home directory is in the PATH environment variable +1. ``source /gpfs/exfel/sw/calsoft/.pyenv/bin/activate`` +2. ``git clone ssh://git@git.xfel.eu:10022/detectors/pycalibration.git && cd pycalibration`` - clone the offline calibration package from EuXFEL GitLab +3. ``pyenv shell 3.8.11`` - load required version of python +4. ``pip install .`` - install the pycalibration package (add ``-e`` flag for editable development installation) +5. ``export PATH=$HOME/.local/bin:$PATH`` - make sure that the home directory is in the PATH environment variable Copy/paste script: @@ -103,10 +103,142 @@ venv) activate the virtual environment first, and then run: This can be useful for Jupyter notebook tools as https://max-jhub.desy.de/hub/login +Offline Calibration Configuration +********************************* + +The offline calibration package is configured with three configuration files: + +- `webservice/config/webservice.yaml` - configuration for the web service +- `webservice/config/serve_overview.yaml` - configuration for the overview page +- `src/cal_tools/mdc_config.yaml` - configuration for MDC access by cal tools + +These configuration files should not be modified directly, instead you should +create a file `$CONFIG.secrets.yaml` (e.g. `webservice.secrets.yaml`) in the +configuration directory, and then add any modifications, such as secrets, to +this file. + +Alternatively, configurations are also searched for in +`~/.config/pycalibration/$MODULE/$CONFIG.yaml` (e.g. +`~/.config/pycalibration/webservice/serve_overview.yaml`), which is a useful +place to store configurations like secrets so that they are present even if you +delete the pycalibration directory, or if you have multiple `pycalibration` +repos checked out, as you no longer need to copy/paste the configurations each +time. + +Finally, you can use environment variables to override the configuration without +modifying any files, which is useful for one-off changes or if you are running +tests in a CI environment. The environment variables should be prefixed with: + +- `webservice/config/webservice.yaml` - `CAL_WEBSERVICE` +- `webservice/config/serve_overview.yaml` - `CAL_SERVE_OVERVIEW` +- `src/cal_tools/mdc_config.yaml` - `CAL_CAL_TOOLS` + +Followed by an underscore and the configuration key you wish to change. Nested +keys can be accessed with two underscores, e.g. +`CAL_WEBSERVICE_CONFIG_REPO__URL` would modify the `config-repo: url: ` value. + +Note that the order of priority is: + +- default configuration - e.g. `webservice/config/webservice.yaml` +- local configuration - e.g. `webservice/config/webservice.secrets.yaml` +- user configuration - e.g. `~/.config/pycalibration/webservice/webservice.yaml` +- environment variables - e.g. `export CAL_WEBSERVICE_*=...` + +Examples +======== + +For example, `webservice/config/webservice.yaml` has: + +```yaml +config-repo: + url: "@note add this to secrets file" + local-path: "@format {env[HOME]}/calibration_config" +... +metadata-client: + user-id: "@note add this to secrets file" + user-secret: "@note add this to secrets file" + user-email: "@note add this to secrets file" +``` + +So you would create a file `webservice/config/webservice.secrets.yaml`: + +```yaml +config-repo: + url: "https://USERNAME:TOKEN@git.xfel.eu/gitlab/detectors/calibration_configurations.git" + +metadata-client: + user-id: "id..." + user-secret: "secret..." + user-email: "calibration@example.com" +``` + +Alternatively, this file could be placed at `~/.config/pycalibration/webservice/webservice.yaml` + +Checking Configurations +======================= + +Having multiple nested configurations can get a bit confusing, so `dynaconf` +includes a command to help view what a configuration will be resolved to. Once +you have activated the python environment pycalibration is installed in, you +can run the command `dynaconf -i webservice.config.webservice list` to list the +current configuration values: + +``` +> dynaconf -i webservice.config.webservice list +Working in main environment +WEBSERVICE_DIR<PosixPath> PosixPath('/home/roscar/work/git.xfel.eu/detectors/pycalibration/webservice') +CONFIG-REPO<dict> {'local-path': '/home/roscar/calibration_config', + 'url': 'https://haufs:AAABBBCCCDDDEEEFFF@git.xfel.eu/gitlab/detectors/calibration_configurations.git'} +WEB-SERVICE<dict> {'allowed-ips': '131.169.4.197, 131.169.212.226', + 'bind-to': 'tcp://*', + 'job-db': '/home/roscar/work/git.xfel.eu/detectors/pycalibration/webservice/webservice_jobs.sqlite', + 'job-timeout': 3600, + 'job-update-interval': 60, + 'port': 5556} +METADATA-CLIENT<dict> {'auth-url': 'https://in.xfel.eu/test_metadata/oauth/authorize', + 'base-api-url': 'https://in.xfel.eu/metadata/api/', + 'metadata-web-app-url': 'https://in.xfel.eu/test_metadata', + 'refresh-url': 'https://in.xfel.eu/test_metadata/oauth/token', + 'scope': '', + 'token-url': 'https://in.xfel.eu/test_metadata/oauth/token', + 'user-email': 'calibration@example.com', + 'user-id': 'AAABBBCCCDDDEEEFFF', + 'user-secret': 'AAABBBCCCDDDEEEFFF'} +KAFKA<dict> {'brokers': ['it-kafka-broker01.desy.de', + 'it-kafka-broker02.desy.de', + 'it-kafka-broker03.desy.de'], + 'topic': 'xfel-test-offline-cal'} +CORRECT<dict> {'cmd': 'python -m xfel_calibrate.calibrate {detector} CORRECT ' + '--slurm-scheduling {sched_prio} --slurm-mem 750 --request-time ' + '{request_time} --slurm-name ' + '{action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs} ' + '--report-to ' + '/gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/{det_instance}_{action}_{proposal}_{runs}_{time_stamp} ' + '--cal-db-timeout 300000 --cal-db-interface ' + 'tcp://max-exfl016:8015#8044', + 'in-folder': '/gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw', + 'out-folder': '/gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run}', + 'sched-prio': 80} +DARK<dict> {'cmd': 'python -m xfel_calibrate.calibrate {detector} DARK --concurrency-par ' + 'karabo_da --slurm-scheduling {sched_prio} --request-time ' + '{request_time} --slurm-name ' + '{action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs} ' + '--report-to ' + '/gpfs/exfel/d/cal/caldb_store/xfel/reports/{instrument}/{det_instance}/{action}/{action}_{proposal}_{runs}_{time_stamp} ' + '--cal-db-interface tcp://max-exfl016:8015#8044 --db-output', + 'in-folder': '/gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw', + 'out-folder': '/gpfs/exfel/u/usr/{instrument}/{cycle}/p{proposal}/dark/runs_{runs}', + 'sched-prio': 10} +``` + +And here you can see that `metadata-client: user-id: ` contains the ID now +instead of the note "add this to secrets file", so the substitution has worked +correctly. + + Contributing ************ - Guidelines ========== diff --git a/bin/slurm_calibrate.sh b/bin/slurm_calibrate.sh index 1c644da127472a9c21c174f009d88085955ce8c9..a22f322e802cf1949599d7c3aa78f2aaf0e2478d 100755 --- a/bin/slurm_calibrate.sh +++ b/bin/slurm_calibrate.sh @@ -1,5 +1,7 @@ #!/bin/bash +set -euo pipefail + # set paths to use nb_path=$1 python_path=$2 @@ -7,19 +9,15 @@ ipcluster_profile=$3 notebook=$4 detector=$5 caltype=$6 -finalize=$7 -cluster_cores=$8 -cal_python_path=$9 +cluster_cores=$7 echo "Running with the following parameters:" echo "Notebook path: $nb_path" echo "Python path: $python_path" -echo "Calibration Python: $cal_python_path" echo "IP-Cluster profile: $ipcluster_profile" echo "notebook: $notebook" echo "detector: $detector" echo "caltype: $caltype" -echo "finalize: $finalize" echo "cluster_cores: $cluster_cores" echo "job ID: $SLURM_JOB_ID" @@ -28,7 +26,6 @@ export CAL_NOTEBOOK_NAME="$notebook" # set-up enviroment source /etc/profile.d/modules.sh module load anaconda/3 -module load texlive/2019 # make sure we use agg backend export MPLBACKEND=AGG @@ -47,7 +44,6 @@ fi echo "Running notebook" ${python_path} -m princess ${nb_path} --save -${cal_python_path} -m nbconvert --to rst --TemplateExporter.exclude_input=True ${nb_path} # stop the cluster if requested if [ "${ipcluster_profile}" != "NO_CLUSTER" ] @@ -57,8 +53,3 @@ then echo "Removing cluster profile from: $profile_path" rm -rf $profile_path fi - -if [ -n "${finalize}" ] -then - ${cal_python_path} ${finalize} -fi diff --git a/bin/slurm_finalize.sh b/bin/slurm_finalize.sh new file mode 100644 index 0000000000000000000000000000000000000000..c0f632dd8c7603ad5c87d1cf04f4e1dc469601b2 --- /dev/null +++ b/bin/slurm_finalize.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +set -euo pipefail + +# set paths to use +python_path=$1 +temp_dir=$2 +finalize_script=$3 + +echo "Running with the following parameters:" +echo "Python path: $python_path" +echo "Correction temp dir: $temp_dir" +echo "finalize script: $finalize_script" +echo "job ID: $SLURM_JOB_ID" + +# set-up enviroment +source /etc/profile.d/modules.sh +module load texlive/2019 + +# make sure we use agg backend +export MPLBACKEND=AGG + +# Ensure Python uses UTF-8 for files by default +export LANG=en_US.UTF-8 + +shopt -s failglob # Fail fast if there are no notebooks found +echo "Converting notebooks" +${python_path} -m nbconvert --to rst --TemplateExporter.exclude_input=True "$temp_dir"/*.ipynb +shopt -u failglob # Restore default glob behaviour + +${python_path} "$finalize_script" diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb index 0d9de779b982b7eb85b0b6e07ae6640c5ae00944..2c7bfd749ba7185179ba8476d921ff25d961a66d 100644 --- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb +++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb @@ -338,9 +338,12 @@ "mem_cells_db = mem_cells if mem_cells_db == 0 else mem_cells_db\n", "max_cells = mem_cells if max_cells == 0 else max_cells\n", "\n", + "fast_paths = (filename, karabo_id, channel)\n", + "slow_paths = (control_fn, karabo_id_control)\n", + "\n", "# Evaluate aquisition rate\n", "if acq_rate == 0:\n", - " acq_rate = get_acq_rate((filename, karabo_id, channel))\n", + " acq_rate = get_acq_rate(fast_paths, slow_paths)\n", "\n", "print(f\"Maximum memory cells to calibrate: {max_cells}\")" ] diff --git a/notebooks/AGIPD/AGIPD_Retrieve_Constants_Precorrection.ipynb b/notebooks/AGIPD/AGIPD_Retrieve_Constants_Precorrection.ipynb index 760e60941d08c91429037188d6d76472918d5648..7df44401b68b457adbe57f7276cac5b0a4a341c9 100644 --- a/notebooks/AGIPD/AGIPD_Retrieve_Constants_Precorrection.ipynb +++ b/notebooks/AGIPD/AGIPD_Retrieve_Constants_Precorrection.ipynb @@ -165,7 +165,7 @@ "\n", "# Evaluate integration time\n", "if integration_time < 0:\n", - " integration_time = agipblib.get_integration_time(control_fn, h5path_ctrl)\n", + " integration_time = agipdlib.get_integration_time(control_fn, h5path_ctrl)\n", " \n", "print(f\"Gain setting: {gain_setting}\")\n", "print(f\"Gain mode: {gain_mode.name}\")\n", diff --git a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb index 35fddcd6ac142b9b73b3d4ee58995b6252623df2..9fb046b08e29923134ab8349e0acb6759d670d2b 100644 --- a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb +++ b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb @@ -47,6 +47,7 @@ "mem_cells = 0 # number of memory cells used, set to 0 to automatically infer\n", "bias_voltage = 0 # detector bias voltage\n", "gain_setting = 0.1 # the gain setting, use 0.1 to try to auto-determine\n", + "integration_time = -1 # integration time, negative values for auto-detection.\n", "acq_rate = 0. # the detector acquisition rate, use 0 to try to auto-determine\n", "interlaced = False # assume interlaced data format, for data prior to Dec. 2017\n", "rawversion = 2 # RAW file format version\n", @@ -323,6 +324,7 @@ "\n", "inp = []\n", "for gain_index, (gain, qm_file_map) in enumerate(gain_mapped_files.items()):\n", + " gain_input = []\n", " for module_index in modules:\n", " qm = module_index_to_qm(module_index)\n", " if qm not in qm_file_map:\n", @@ -336,9 +338,15 @@ " with h5py.File(filename, \"r\") as fin:\n", " if fin[h5path.format(module_index)+\"/trainId\"].shape[0] != 0:\n", " print(f\"Process {filename} for {qm}\")\n", - " inp.append((filename, module_index, gain_index))\n", + " gain_input.append((filename, module_index, gain_index))\n", " else:\n", - " print(f\"Do not process {filename} because it is empty.\")" + " print(f\"Do not process {filename} because it is empty.\")\n", + " if not gain_input:\n", + " raise ValueError(\n", + " \"No images to process for run: \"\n", + " f\"{[v for v in offset_runs.values()][gain_index]}\"\n", + " )\n", + " inp += gain_input" ] }, { diff --git a/notebooks/AGIPD/Characterize_AGIPD_Gain_FlatFields_NBC.ipynb b/notebooks/AGIPD/Characterize_AGIPD_Gain_FlatFields_NBC.ipynb index 80a904f8d92da4d2dbb9e63cbd8590b1faed8f43..9d09b56d44110704d3a9f84a058c04705392738e 100644 --- a/notebooks/AGIPD/Characterize_AGIPD_Gain_FlatFields_NBC.ipynb +++ b/notebooks/AGIPD/Characterize_AGIPD_Gain_FlatFields_NBC.ipynb @@ -15,7 +15,7 @@ "outputs": [], "source": [ "in_folder = \"/gpfs/exfel/exp/SPB/202030/p900138/scratch/karnem/r0203_r0204_v01/\" # the folder to read histograms from, required\n", - "out_folder = \"/gpfs/exfel/exp/SPB/202030/p900138/scratch/karnem/r0203_r0204_v01/\" # the folder to output to, required\n", + "out_folder = \"\" # the folder to output to, required\n", "hist_file_template = \"hists_m{:02d}_sum.h5\" # the template to use to access histograms\n", "modules = [10] # modules to correct, set to -1 for all, range allowed\n", "\n", @@ -758,12 +758,13 @@ " 'No Entry',\n", " 'Gain deviation']\n", "\n", - "plt.bar(x, y2, width, label='Only this cut')\n", - "plt.bar(x, y, width, label='Cut flow')\n", - "plt.xticks(x, labels, rotation=90)\n", - "plt.ylim(y[5]-0.5, 100)\n", - "plt.grid(True)\n", - "plt.legend()\n", + "ax.bar(x, y2, width, label='Only this cut')\n", + "ax.bar(x, y, width, label='Cut flow')\n", + "ax.set_xticks(x)\n", + "ax.set_xticklabels(labels, rotation=90)\n", + "ax.set_ylim(y[5]-0.5, 100)\n", + "ax.grid(True)\n", + "ax.legend()\n", "plt.show()" ] } @@ -788,5 +789,5 @@ } }, "nbformat": 4, - "nbformat_minor": 1 + "nbformat_minor": 4 } diff --git a/notebooks/AGIPD/Chracterize_AGIPD_Gain_PC_NBC.ipynb b/notebooks/AGIPD/Chracterize_AGIPD_Gain_PC_NBC.ipynb index 75c1a43da968cc286ba748fbbebc35a7c7515d7b..c6dc6f9076bd9737aa82366d09e8c02e98090ccf 100644 --- a/notebooks/AGIPD/Chracterize_AGIPD_Gain_PC_NBC.ipynb +++ b/notebooks/AGIPD/Chracterize_AGIPD_Gain_PC_NBC.ipynb @@ -50,6 +50,7 @@ "h5path_ctrl = '/CONTROL/{}/MDL/FPGA_COMP' # path to control information\n", "\n", "use_dir_creation_date = True\n", + "delta_time = 0 # offset to the creation time (e.g. useful in case we want to force the system to use diff. dark constants)\n", "cal_db_interface = \"tcp://max-exfl016:8019\" # the database interface to use\n", "local_output = True # output constants locally\n", "db_output = False # output constants to database\n", @@ -81,7 +82,7 @@ "\n", "import os\n", "import warnings\n", - "from datetime import datetime\n", + "from datetime import datetime, timedelta\n", "from functools import partial\n", "\n", "warnings.filterwarnings('ignore')\n", @@ -208,7 +209,7 @@ "creation_time=None\n", "if use_dir_creation_date:\n", " creation_time = get_dir_creation_date(in_folder, run)\n", - "\n", + " creation_time = creation_time + timedelta(hours=delta_time)\n", "print(f\"Using {creation_time} as creation time of constant.\")\n", "\n", "if not creation_time and use_dir_creation_date:\n", @@ -280,11 +281,11 @@ " #print('Reading ',fname)\n", " with h5py.File(fname, 'r') as f:\n", " if rawversion == 2:\n", - " count = np.squeeze(f[\"/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/count\".format(instrument, channel)][()])\n", + " count = np.squeeze(f[\"/INDEX/{}/DET/{}CH0:xtdf/image/count\".format(karabo_id, channel)][()])\n", " bursts_per_file.append(np.count_nonzero(count))\n", " del count\n", " else:\n", - " status = np.squeeze(f[\"/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/status\".format(instrument, channel)][()]) \n", + " status = np.squeeze(f[\"/INDEX/{}/DET/{}CH0:xtdf/image/status\".format(karabo_id, channel)][()]) \n", " bursts_per_file.append(np.count_nonzero(status != 0))\n", " del status\n", " if bursts_per_file[0] == 0:\n", @@ -304,18 +305,18 @@ " with h5py.File(fname, 'r') as f:\n", " \n", " #print('Reading ',fname)\n", - " image_path_temp = 'INSTRUMENT/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/data'.format(instrument, channel)\n", - " cellID_path_temp = 'INSTRUMENT/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/cellId'.format(instrument, channel)\n", + " image_path_temp = 'INSTRUMENT/{}/DET/{}CH0:xtdf/image/data'.format(karabo_id, channel)\n", + " cellID_path_temp = 'INSTRUMENT/{}/DET/{}CH0:xtdf/image/cellId'.format(karabo_id, channel)\n", " if rawversion == 2:\n", - " count = np.squeeze(f[\"/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/count\".format(instrument, channel)])\n", - " first = np.squeeze(f[\"/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/first\".format(instrument, channel)])\n", + " count = np.squeeze(f[\"/INDEX/{}/DET/{}CH0:xtdf/image/count\".format(karabo_id, channel)])\n", + " first = np.squeeze(f[\"/INDEX/{}/DET/{}CH0:xtdf/image/first\".format(karabo_id, channel)])\n", " last_index = int(first[count != 0][-1]+count[count != 0][-1])\n", " first_index = int(first[count != 0][0])\n", " else:\n", - " status = np.squeeze(f[\"/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/status\".format(instrument, channel)])\n", + " status = np.squeeze(f[\"/INDEX/{}/DET/{}CH0:xtdf/image/status\".format(karabo_id, channel)])\n", " if np.count_nonzero(status != 0) == 0:\n", " return\n", - " last = np.squeeze(f[\"/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/last\".format(instrument, channel)])\n", + " last = np.squeeze(f[\"/INDEX/{}/DET/{}CH0:xtdf/image/last\".format(karabo_id, channel)])\n", " last_index = int(last[status != 0][-1])\n", " first_index = int(last[status != 0][0])\n", " #print(first_index, last_index)\n", @@ -486,8 +487,7 @@ " m = ydiffs/xdiffs\n", " ms[:,i] = m\n", " m = np.mean(ms, axis=1)\n", - " mm = np.zeros_like(m)\n", - " mm[...] = np.nan\n", + "\n", " m[scan_range//2:-scan_range//2+1] = np.mean(rolling_window(m, scan_range),-1)\n", " reg1 = m > r1\n", " reg2 = m < r2\n", @@ -499,7 +499,8 @@ " regions[lbl] = r\n", " scan_range = 30\n", " mregions = np.round(np.mean(rolling_window(regions, scan_range),-1))\n", - " regions[...] = np.nan \n", + " # change from np.nan to -1 \n", + " regions[...] = -1 \n", " regions[scan_range//2:-scan_range//2+1] = mregions\n", " \n", " \n", @@ -805,21 +806,22 @@ " #print(bound)\n", " # fit linear slope\n", " if not np.isnan(bound_m):\n", - " xl = x[(x<bound)]\n", - " yl = y[(x<bound)] - offset[pix[0], pix[1], cell, 0]\n", - " parms = {'m': bound_m, 'b': np.min(yl)}\n", - "\n", - " errors = np.ones(xl.shape)*noise[pix[0], pix[1], cell, 0]\n", - " fitted = fit_data(lin_fun, xl, yl, errors , parms)\n", - " yf = lin_fun(xl, fitted['m'], fitted['b'])\n", - " max_devl = np.max(np.abs((yl-yf)/yl))\n", - "\n", - " d3.append({'x': xl,\n", - " 'y': yf,\n", - " 'color': 'k',\n", - " 'linewidth': 1,\n", - " 'y2': (yf-yl)/errors\n", - " })\n", + " xl = x[(x<bound-20)]\n", + " yl = y[(x<bound-20)] - offset[pix[0], pix[1], cell, 0]\n", + " if yl.shape[0] != 0:\n", + " parms = {'m': bound_m, 'b': np.min(yl)}\n", + "\n", + " errors = np.ones(xl.shape)*noise[pix[0], pix[1], cell, 0]\n", + " fitted = fit_data(lin_fun, xl, yl, errors , parms)\n", + " yf = lin_fun(xl, fitted['m'], fitted['b'])\n", + " max_devl = np.max(np.abs((yl-yf)/yl))\n", + "\n", + " d3.append({'x': xl,\n", + " 'y': yf,\n", + " 'color': 'k',\n", + " 'linewidth': 1,\n", + " 'y2': (yf-yl)/errors\n", + " })\n", " # fit hook slope\n", " if fit_hook:\n", " idx = (x >= bound) & (y > 0) & np.isfinite(x) & np.isfinite(y)\n", @@ -828,7 +830,7 @@ " if len(yh[yh > 0]) == 0:\n", " break\n", " parms = {'m': bound_m/10 if bound_m/10>0.3 else 0.5, 'b': np.min(yh[yh > 0]), 'a': np.max(yh), 'c': 5, 'o': bound-1}\n", - " parms[\"limit_m\"] = [0.3, 1.0]\n", + " parms[\"limit_m\"] = [0.3, 2.0]\n", " parms[\"limit_c\"] = [1., 1000]\n", " errors = np.ones(xh.shape)*noise[pix[0], pix[1], cell, 1]\n", " fitted = fit_data(hook_fun, xh, yh, errors, parms)\n", @@ -945,14 +947,15 @@ "\n", " # fit linear slope\n", " idx = (x >= bound) & (y > 0) & np.isfinite(x) & np.isfinite(y)\n", - " xl = x[(x<bound)]\n", - " yl = y[(x<bound)] - offset[pix[0], pix[1], cell, 0]\n", + " xl = x[(x<bound-20)]\n", + " yl = y[(x<bound-20)] - offset[pix[0], pix[1], cell, 0]\n", " errors = np.ones(xl.shape)*noise[pix[0], pix[1], cell, 0]\n", - " parms = {'m': bound_m, 'b': np.min(yl)}\n", - " fitted = fit_data(lin_fun, xl, yl, errors, parms)\n", + " if yl.shape[0] != 0:\n", + " parms = {'m': bound_m, 'b': np.min(yl)}\n", + " fitted = fit_data(lin_fun, xl, yl, errors, parms)\n", "\n", - " yf = lin_fun(xl, fitted['m'], fitted['b'])\n", - " max_devl = np.max(np.abs((yl-yf)/yl))\n", + " yf = lin_fun(xl, fitted['m'], fitted['b'])\n", + " max_devl = np.max(np.abs((yl-yf)/yl))\n", "\n", " xtt = np.arange(ana.shape[0])\n", " ytt = ana[:,cell, pix[0], pix[1]]\n", @@ -999,7 +1002,7 @@ " 'c': 5.,\n", " 'o': bound-1\n", " }\n", - " parms[\"limit_m\"] = [0.3, 1.0]\n", + " parms[\"limit_m\"] = [0.3, 2.0]\n", " parms[\"limit_c\"] = [1., 1000]\n", " fitted = fit_data(hook_fun, xh, yh, errors, parms)\n", " yf = hook_fun(xh, fitted['a'], fitted['c'], fitted['o'], fitted['m'], fitted['b'])\n", @@ -1086,8 +1089,6 @@ " m = ydiffs/xdiffs\n", " ms[:,i] = m\n", " m = np.mean(ms, axis=1)\n", - " mm = np.zeros_like(m)\n", - " mm[...] = np.nan\n", " m[scan_range//2:-scan_range//2+1] = np.mean(rolling_window(m, scan_range),-1)\n", " reg1 = m > r1\n", " reg2 = m < r2\n", @@ -1099,7 +1100,8 @@ " regions[lbl] = r\n", " scan_range = 30\n", " mregions = np.round(np.mean(rolling_window(regions, scan_range),-1))\n", - " regions[...] = np.nan \n", + " # chanage from np.nan to -1\n", + " regions[...] = -1 \n", " regions[scan_range//2:-scan_range//2+1] = mregions\n", "\n", "\n", @@ -1210,13 +1212,14 @@ " bound_m = ms[1]\n", "\n", " # fit linear slope\n", - " xl = x[x<bound]\n", - " yl = y[x<bound] - offset[col, 0]\n", + " xl = x[x<bound-20]\n", + " yl = y[x<bound-20] - offset[col, 0]\n", " errors = np.ones(xl.shape)*noise[col, 0]\n", - " parms = {'m': bound_m, 'b': np.min(yl)}\n", - " fitted = fit_data(lin_fun, xl, yl, errors, parms)\n", - " yf = lin_fun(xl, fitted['m'], fitted['b'])\n", - " max_devl = np.median(np.abs((yl-yf)/yl))\n", + " if yl.shape[0] != 0:\n", + " parms = {'m': bound_m, 'b': np.min(yl)}\n", + " fitted = fit_data(lin_fun, xl, yl, errors, parms)\n", + " yf = lin_fun(xl, fitted['m'], fitted['b'])\n", + " max_devl = np.median(np.abs((yl-yf)/yl))\n", " ml[col] = fitted['m']\n", " bl[col] = fitted['b']\n", " devl[col] = max_devl\n", @@ -1230,7 +1233,7 @@ " yh = y[idx] - offset[col, 1]\n", " errors = np.ones(xh.shape)*noise[col, 1]\n", " parms = {'m': bound_m/10 if bound_m/10 > 0.3 else 0.5, 'b': np.min(yh[yh > 0]), 'a': np.max(yh), 'c': 5., 'o': bound-1}\n", - " parms[\"limit_m\"] = [0.3, 1.0]\n", + " parms[\"limit_m\"] = [0.3, 2.0]\n", " parms[\"limit_c\"] = [1., 1000]\n", " fitted = fit_data(hook_fun, xh, yh, errors, parms)\n", " yf = hook_fun(xh, fitted['a'], fitted['c'], fitted['o'], fitted['m'], fitted['b'])\n", diff --git a/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb b/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb index e1ef8825a5f63ee1d6da4015ae6c9fb058351f34..048af2e7d23db6e5245d902dcafbb97bbdd5f08f 100644 --- a/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb +++ b/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb @@ -20,13 +20,13 @@ "outputs": [], "source": [ "cluster_profile = \"noDB\" # The ipcluster profile to use\n", - "in_folder = \"/gpfs/exfel/exp/SCS/202031/p900170/raw\" # path to input data, required\n", + "in_folder = \"/gpfs/exfel/exp/SQS/202131/p900210/raw\" # path to input data, required\n", "out_folder = \"/gpfs/exfel/data/scratch/samartse/data/DSSC\" # path to output to, required\n", "sequences = [0] # sequence files to evaluate.\n", "modules = [-1] # modules to run for\n", - "run = 223 #run number in which data was recorded, required\n", + "run = 20 #run number in which data was recorded, required\n", "\n", - "karabo_id = \"SCS_DET_DSSC1M-1\" # karabo karabo_id\n", + "karabo_id = \"SQS_DET_DSSC1M-1\" # karabo karabo_id\n", "karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators\n", "receiver_id = \"{}CH0\" # inset for receiver devices\n", "path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data\n", @@ -52,9 +52,10 @@ "thresholds_noise_hard = [0.001, 3] # thresholds in absolute ADU terms for offset deduced bad pixels\n", "offset_numpy_algorithm = \"mean\"\n", "\n", - "instrument = \"SCS\" # the instrument\n", + "instrument = \"SQS\" # the instrument\n", "high_res_badpix_3d = False # set this to True if you need high-resolution 3d bad pixel plots. Runtime: ~ 1h\n", "slow_data_aggregators = [1,2,3,4] # quadrant/aggregator\n", + "slow_data_path = 'SQS_NQS_DSSC/FPGA/PPT_Q'\n", "operation_mode = '' # Detector operation mode, optional" ] }, @@ -238,7 +239,7 @@ "\n", " thresholds_offset_hard, thresholds_offset_sigma, thresholds_noise_hard, thresholds_noise_sigma = bp_thresh \n", "\n", - " infile = h5py.File(filename, \"r\", driver=\"core\")\n", + " infile = h5py.File(filename, \"r\")\n", " if rawversion == 2:\n", " count = np.squeeze(infile[f\"{h5path_idx}/count\"])\n", " first = np.squeeze(infile[f\"{h5path_idx}/first\"])\n", @@ -314,7 +315,7 @@ " tGain, encodedGain, operatingFreq = get_dssc_ctrl_data(in_folder + \"/r{:04d}/\".format(offset_runs[\"high\"]),\n", " slow_data_pattern,\n", " slow_data_aggregators,\n", - " offset_runs[\"high\"])\n", + " offset_runs[\"high\"], slow_data_path)\n", "except IOError:\n", " print(\"ERROR: Couldn't access slow data to read tGain, encodedGain, and operatingFreq \\n\")\n", " \n", @@ -692,7 +693,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.7" + "version": "3.8.8" } }, "nbformat": 4, diff --git a/notebooks/DSSC/DSSC_Correct_and_Verify.ipynb b/notebooks/DSSC/DSSC_Correct_and_Verify.ipynb index 4d5bc0eb53bac3158b4950f7679f82976016e13c..cb01d46d12410767e8a9a90e900d0e52689d2445 100644 --- a/notebooks/DSSC/DSSC_Correct_and_Verify.ipynb +++ b/notebooks/DSSC/DSSC_Correct_and_Verify.ipynb @@ -23,13 +23,13 @@ "outputs": [], "source": [ "cluster_profile = \"noDB\" # The ipcluster profile to use\n", - "in_folder = \"/gpfs/exfel/exp/SCS/202031/p900170/raw\" # path to input data, required\n", - "out_folder = \"/gpfs/exfel/data/scratch/samartse/test/DSSC\" # path to output to, required\n", + "in_folder = \"/gpfs/exfel/exp/SQS/202131/p900210/raw\" # path to input data, required\n", + "out_folder = \"/gpfs/exfel/data/scratch/samartse/data/DSSC\" # path to output to, required\n", "sequences = [-1] # sequence files to evaluate.\n", "modules = [-1] # modules to correct, set to -1 for all, range allowed\n", - "run = 229 #runs to process, required\n", + "run = 20 #runs to process, required\n", "\n", - "karabo_id = \"SCS_DET_DSSC1M-1\" # karabo karabo_id\n", + "karabo_id = \"SQS_DET_DSSC1M-1\" # karabo karabo_id\n", "karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators\n", "receiver_id = \"{}CH0\" # inset for receiver devices\n", "path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data\n", @@ -53,6 +53,7 @@ "geo_file = \"/gpfs/exfel/data/scratch/xcal/dssc_geo_june19.h5\" # detector geometry file\n", "dinstance = \"DSSC1M1\"\n", "slow_data_aggregators = [1,2,3,4] #quadrant/aggregator\n", + "slow_data_path = 'SQS_NQS_DSSC/FPGA/PPT_Q'\n", "\n", "def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da):\n", " from xfel_calibrate.calibrate import balance_sequences as bs\n", @@ -420,7 +421,7 @@ "\n", "tGain, encodedGain, operatingFreq = get_dssc_ctrl_data(in_folder\\\n", " + \"/r{:04d}/\".format(run),\\\n", - " slow_data_pattern,slow_data_aggregators, run)\n", + " slow_data_pattern,slow_data_aggregators, run, slow_data_path)\n", "\n", "whens = []\n", "qms = []\n", @@ -959,7 +960,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.7" + "version": "3.8.8" } }, "nbformat": 4, diff --git a/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb b/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb index 31fb1d431120a2bb5bdee4c2627f821e518ae626..9c2c7086b0303912ab11386dafbe483b5706dbb3 100644 --- a/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb +++ b/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb @@ -201,7 +201,11 @@ " fp_name = path_template.format(r_n, karabo_da_control)\n", " fp_path = '{}/{}'.format(ped_dir, fp_name)\n", " \n", - " n_files = len(glob.glob(\"{}/*{}*.h5\".format(ped_dir, path_inset)))\n", + " files_pattern = \"{}/*{}*.h5\".format(ped_dir, path_inset)\n", + " n_files = len(glob.glob(files_pattern))\n", + " if n_files == 0:\n", + " raise Exception(f\"No files found matching {files_pattern!r}\")\n", + "\n", " myRange = range(0, n_files)\n", " control_path = h5path_cntrl.format(karabo_id_control, receiver_control_id)\n", " \n", diff --git a/notebooks/LPD/LPDChar_Darks_NBC.ipynb b/notebooks/LPD/LPDChar_Darks_NBC.ipynb index d912c7c0a0c89040e29c0a235e74ab974c804c06..2b5f78efda66c69a3026cfc4ef57c293f3979edc 100644 --- a/notebooks/LPD/LPDChar_Darks_NBC.ipynb +++ b/notebooks/LPD/LPDChar_Darks_NBC.ipynb @@ -221,7 +221,7 @@ " filename, channel, gg, cap = inp\n", " thresholds_offset_hard, thresholds_offset_sigma, thresholds_noise_hard, thresholds_noise_sigma = bp_thresh\n", "\n", - " infile = h5py.File(filename, \"r\", driver=\"core\")\n", + " infile = h5py.File(filename, \"r\")\n", " \n", " h5path = h5path.format(channel)\n", " h5path_idx = h5path_idx.format(channel)\n", diff --git a/notebooks/LPD/LPD_Correct_and_Verify.ipynb b/notebooks/LPD/LPD_Correct_and_Verify.ipynb index 4c2fafc5287430d8d5983632877d9e8f4db451a6..4c99b082e764389c8e7206da8ff742a625d09843 100644 --- a/notebooks/LPD/LPD_Correct_and_Verify.ipynb +++ b/notebooks/LPD/LPD_Correct_and_Verify.ipynb @@ -539,9 +539,9 @@ "\n", "import cal_tools.metrology as metro\n", "\n", - "in_files = \"{}/CORR*LPD*S{:05d}*.h5\".format(out_folder, sequences[0] if sequences else 0)\n", + "out_files = \"{}/CORR*LPD*S{:05d}*.h5\".format(out_folder, sequences[0] if sequences else 0)\n", "datapath = \"{}/image/data\".format(h5path)\n", - "print(\"Preview is from {}\".format(in_files))" + "print(\"Preview is from {}\".format(out_files))" ] }, { @@ -555,9 +555,9 @@ }, "outputs": [], "source": [ - "posarr = metro.positionFileList(in_files, datapath, geometry_file, d_quads, nImages = 10)\n", + "posarr = metro.positionFileList(out_files, datapath, geometry_file, d_quads, nImages = 10)\n", "maskpath = \"{}/image/mask\".format(h5path)\n", - "maskedarr = metro.positionFileList(in_files, maskpath, geometry_file, d_quads, nImages = 10)" + "maskedarr = metro.positionFileList(out_files, maskpath, geometry_file, d_quads, nImages = 10)" ] }, { @@ -748,7 +748,8 @@ "## Maxium Gain Value Reached ##\n", "\n", "The following plot shows the maximum gain value reached. It can be used as an indication of whether the detector went into saturation." - ] + ], + "metadata": {} }, { "cell_type": "code", @@ -761,20 +762,13 @@ }, "outputs": [], "source": [ - "gainpath = \"{}/gain\".format(h5path)\n", - "posarr = metro.positionFileList(in_files, gainpath, geometry_file, d_quads, nImages = 100)" + "gainpath = \"{}/image/gain\".format(h5path)\n", + "posarr = metro.positionFileList(out_files, gainpath, geometry_file, d_quads, nImages = 100)" ] }, { "cell_type": "code", "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2018-11-13T18:25:03.625885Z", - "start_time": "2018-11-13T18:25:03.092923Z" - } - }, - "outputs": [], "source": [ "fig = plt.figure(figsize=(15,15))\n", "ax = fig.add_subplot(111)\n", @@ -782,7 +776,14 @@ "im=ax.imshow((parr), vmin=0, vmax=3)\n", "cb = fig.colorbar(im)\n", "cb.set_label(\"Intensity (ADU\")" - ] + ], + "outputs": [], + "metadata": { + "ExecuteTime": { + "end_time": "2018-11-13T18:25:03.625885Z", + "start_time": "2018-11-13T18:25:03.092923Z" + } + } }, { "cell_type": "code", diff --git a/notebooks/ePix100/Correction_ePix100_NBC.ipynb b/notebooks/ePix100/Correction_ePix100_NBC.ipynb index 0c46331fffe0b4c4106fa2c0e5ccd85644711a05..83ffb1e5983c8f2666be9b7f394ea54af5c83ad0 100644 --- a/notebooks/ePix100/Correction_ePix100_NBC.ipynb +++ b/notebooks/ePix100/Correction_ePix100_NBC.ipynb @@ -18,10 +18,10 @@ "outputs": [], "source": [ "cluster_profile = \"noDB\" # ipcluster profile to use\n", - "in_folder = \"/gpfs/exfel/exp/CALLAB/202031/p900113/raw\" # input folder, required\n", + "in_folder = \"/gpfs/exfel/exp/MID/202121/p002929/raw\" # input folder, required\n", "out_folder = \"\" # output folder, required\n", "sequences = [-1] # sequences to correct, set to -1 for all, range allowed\n", - "run = 9988 # which run to read data from, required\n", + "run = 126 # which run to read data from, required\n", "\n", "karabo_id = \"MID_EXP_EPIX-1\" # karabo karabo_id\n", "karabo_da = \"EPIX01\" # data aggregators\n", @@ -46,9 +46,11 @@ "in_vacuum = False # detector operated in vacuum\n", "fix_temperature = 290. # fix temperature to this value\n", "gain_photon_energy = 9.0 # Photon energy used for gain calibration\n", - "photon_energy = 8.0 # Photon energy to calibrate in number of photons, 0 for calibration in keV\n", + "photon_energy = 0. # Photon energy to calibrate in number of photons, 0 for calibration in keV\n", "\n", - "relative_gain = False # Apply relative gain correction.\n", + "pattern_classification = True # do clustering.\n", + "relative_gain = True # Apply relative gain correction.\n", + "absolute_gain = True # Apply absolute gain correction (implies relative gain).\n", "common_mode = True # Apply common mode correction.\n", "cm_min_frac = 0.25 # No CM correction is performed if after masking the ratio of good pixels falls below this \n", "cm_noise_sigma = 5. # CM correction noise standard deviation\n", @@ -74,6 +76,7 @@ "\n", "import h5py\n", "import numpy as np\n", + "import matplotlib.pyplot as plt\n", "from IPython.display import Latex, display\n", "from pathlib import Path\n", "\n", @@ -108,18 +111,20 @@ "metadata": {}, "outputs": [], "source": [ - "# TODO: expose to first cell after fixing clustering.\n", - "pattern_classification = False # do clustering.\n", - "\n", + "if absolute_gain :\n", + " relative_gain = True" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ "h5path = h5path.format(karabo_id, receiver_id)\n", "h5path_t = h5path_t.format(karabo_id, receiver_id)\n", "h5path_cntrl = h5path_cntrl.format(karabo_id)\n", - "plot_unit = 'ADU'\n", - "\n", - "if relative_gain:\n", - " plot_unit = 'keV'\n", - " if photon_energy > 0:\n", - " plot_unit = '$\\gamma$'" + "plot_unit = 'ADU'" ] }, { @@ -232,7 +237,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "temp_limits = 5.\n", @@ -297,7 +304,20 @@ "metadata": {}, "outputs": [], "source": [ - "# ************************Calculators******************** #\n", + "hrange = np.array([-50, 1000])\n", + "nbins = hrange[1] - hrange[0]\n", + "hscale = 1\n", + "\n", + "commonModeBlockSize = [x//2, y//2]\n", + "stats = True" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ "offsetCorrection = xcal.OffsetCorrection(\n", " sensorSize,\n", " const_data[\"Offset\"],\n", @@ -308,29 +328,10 @@ " parallel=run_parallel\n", ")\n", "\n", - "if relative_gain:\n", - " gainCorrection = xcal.RelativeGainCorrection(\n", - " sensorSize,\n", - " 1./const_data[\"RelativeGain\"][..., None],\n", - " nCells=memoryCells,\n", - " parallel=run_parallel,\n", - " cores=cpuCores,\n", - " blockSize=blockSize,\n", - " gains=None,\n", - " )" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# *****************Histogram Calculators****************** #\n", "histCalOffsetCor = xcal.HistogramCalculator(\n", " sensorSize,\n", - " bins=1050,\n", - " range=[-50, 1000],\n", + " bins=nbins,\n", + " range=hrange,\n", " parallel=run_parallel,\n", " nCells=memoryCells,\n", " cores=cpuCores,\n", @@ -338,35 +339,22 @@ ")" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Applying corrections" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "histCalOffsetCor.debug()\n", - "offsetCorrection.debug()\n", - "if relative_gain:\n", - " gainCorrection.debug()" - ] - }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "# ************************Calculators******************** #\n", "if common_mode:\n", - " commonModeBlockSize = [x//2, y//2]\n", - " stats = True\n", + " histCalCMCor = xcal.HistogramCalculator(\n", + " sensorSize,\n", + " bins=nbins,\n", + " range=hrange,\n", + " parallel=run_parallel,\n", + " nCells=memoryCells,\n", + " cores=cpuCores,\n", + " blockSize=blockSize,\n", + " )\n", "\n", " cmCorrectionB = xcal.CommonModeCorrection(\n", " shape=sensorSize,\n", @@ -400,16 +388,53 @@ " stats=stats,\n", " minFrac=cm_min_frac,\n", " noiseSigma=cm_noise_sigma,\n", + " )\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if relative_gain:\n", + " gain_cnst = np.median(const_data[\"RelativeGain\"])\n", + " hscale = gain_cnst\n", + " plot_unit = 'keV'\n", + " if photon_energy > 0:\n", + " plot_unit = '$\\gamma$'\n", + " hscale /= photon_energy\n", + " \n", + " gainCorrection = xcal.RelativeGainCorrection(\n", + " sensorSize,\n", + " gain_cnst/const_data[\"RelativeGain\"][..., None],\n", + " nCells=memoryCells,\n", + " parallel=run_parallel,\n", + " cores=cpuCores,\n", + " blockSize=blockSize,\n", + " gains=None,\n", " )\n", - " histCalCMCor = xcal.HistogramCalculator(\n", + "\n", + " histCalRelGainCor = xcal.HistogramCalculator(\n", " sensorSize,\n", - " bins=1050,\n", - " range=[-50, 1000],\n", + " bins=nbins,\n", + " range=hrange,\n", " parallel=run_parallel,\n", " nCells=memoryCells,\n", " cores=cpuCores,\n", - " blockSize=blockSize,\n", - " )" + " blockSize=blockSize\n", + " )\n", + " \n", + " if absolute_gain:\n", + " histCalAbsGainCor = xcal.HistogramCalculator(\n", + " sensorSize,\n", + " bins=nbins,\n", + " range=hrange*hscale,\n", + " parallel=run_parallel,\n", + " nCells=memoryCells,\n", + " cores=cpuCores,\n", + " blockSize=blockSize\n", + " )" ] }, { @@ -418,7 +443,7 @@ "metadata": {}, "outputs": [], "source": [ - "if pattern_classification:\n", + "if pattern_classification :\n", " patternClassifier = xcal.PatternClassifier(\n", " [x, y],\n", " const_data[\"Noise\"],\n", @@ -435,30 +460,29 @@ "\n", " histCalSECor = xcal.HistogramCalculator(\n", " sensorSize,\n", - " bins=1050,\n", - " range=[-50, 1000],\n", + " bins=nbins,\n", + " range=hrange,\n", " parallel=run_parallel,\n", " nCells=memoryCells,\n", " cores=cpuCores,\n", " blockSize=blockSize,\n", + " )\n", + " histCalGainCorSingles = xcal.HistogramCalculator(\n", + " sensorSize,\n", + " bins=nbins,\n", + " range=hrange*hscale,\n", + " parallel=run_parallel,\n", + " nCells=memoryCells,\n", + " cores=cpuCores,\n", + " blockSize=blockSize\n", " )" ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "if common_mode:\n", - " cmCorrectionB.debug()\n", - " cmCorrectionR.debug()\n", - " cmCorrectionC.debug()\n", - " histCalCMCor.debug()\n", - "\n", - "if pattern_classification:\n", - " patternClassifier.debug()\n", - " histCalSECor.debug()" + "Applying corrections" ] }, { @@ -519,7 +543,8 @@ "\n", " # Offset correction.\n", " data = offsetCorrection.correct(data.astype(np.float32))\n", - "\n", + " histCalOffsetCor.fill(data)\n", + " \n", " # Common Mode correction.\n", " if common_mode:\n", " # Block CM\n", @@ -528,33 +553,15 @@ " data = cmCorrectionR.correct(data)\n", " # COL CM\n", " data = cmCorrectionC.correct(data)\n", - "\n", " histCalCMCor.fill(data)\n", "\n", " # relative gain correction.\n", " if relative_gain:\n", " data = gainCorrection.correct(data.astype(np.float32))\n", - " if photon_energy > 0:\n", - " data /= photon_energy\n", + " histCalRelGainCor.fill(data)\n", "\n", - " histCalOffsetCor.fill(data)\n", " ddset[...] = np.moveaxis(data, 2, 0)\n", "\n", - " \"\"\"The gain correction is currently applying\n", - " an absolute correction (not a relative correction\n", - " as the implied by the name);\n", - " it changes the scale (the unit of measurement)\n", - " of the data from ADU to either keV or n_of_photons.\n", - " But the pattern classification relies on comparing\n", - " data with the noise map, which is still in ADU.\n", - "\n", - " The best solution is to do a relative gain\n", - " correction first and apply the global absolute\n", - " gain to the data at the end, after clustering.\n", - " \"\"\"\n", - " \n", - " # TODO: Fix conflict between pattern classification\n", - " # and gain corr.\n", " if pattern_classification:\n", " ddsetc = ofile.create_dataset(\n", " h5path+\"/pixels_classified\",\n", @@ -568,15 +575,30 @@ " chunks=(chunk_size_idim, oshape[1], oshape[2]),\n", " dtype=np.int32, compression=\"gzip\")\n", "\n", + " data_clu, patterns = patternClassifier.classify(data)\n", "\n", - " data, patterns = patternClassifier.classify(data)\n", - "\n", - " data[data < (split_evt_primary_threshold*const_data[\"Noise\"])] = 0 # noqa\n", - " ddsetc[...] = np.moveaxis(data, 2, 0)\n", + " data_clu[data_clu < (split_evt_primary_threshold*const_data[\"Noise\"])] = 0 # noqa\n", + " ddsetc[...] = np.moveaxis(data_clu, 2, 0)\n", " ddsetp[...] = np.moveaxis(patterns, 2, 0)\n", "\n", - " data[patterns != 100] = np.nan\n", - " histCalSECor.fill(data)\n", + " data_clu[patterns != 100] = np.nan\n", + " histCalSECor.fill(data_clu)\n", + "\n", + " # absolute gain correction\n", + " # changes data from ADU to keV (or n. of photons)\n", + " if absolute_gain:\n", + " data = data * gain_cnst\n", + " if photon_energy > 0:\n", + " data /= photon_energy\n", + " histCalAbsGainCor.fill(data)\n", + "\n", + " if pattern_classification:\n", + " data_clu = data_clu *gain_cnst\n", + " if photon_energy > 0:\n", + " data_clu /= photon_energy\n", + " ddsetc[...] = np.moveaxis(data_clu, 2, 0)\n", + " histCalGainCorSingles.fill(data_clu)\n", + "\n", " except Exception as e:\n", " print(f\"ERROR applying common mode correction for {f}: {e}\")" ] @@ -610,6 +632,19 @@ " 'errorcoarsing': 2,\n", " 'label': 'CM corr.'\n", " })\n", + " \n", + "if relative_gain :\n", + " ho, eo, co, so = histCalRelGainCor.get()\n", + " d.append({\n", + " 'x': co,\n", + " 'y': ho,\n", + " 'y_err': np.sqrt(ho[:]),\n", + " 'drawstyle': 'steps-mid',\n", + " 'errorstyle': 'bars',\n", + " 'errorcoarsing': 2,\n", + " 'label': 'Relative gain corr.'\n", + " })\n", + "\n", "\n", "if pattern_classification:\n", " ho, eo, co, so = histCalSECor.get()\n", @@ -620,15 +655,59 @@ " 'drawstyle': 'steps-mid',\n", " 'errorstyle': 'bars',\n", " 'errorcoarsing': 2,\n", - " 'label': 'Single split events'\n", + " 'label': 'Isolated photons (singles)'\n", " })\n", "\n", "fig = xana.simplePlot(\n", - " d, aspect=1, x_label=f'Energy({plot_unit})',\n", + " d, aspect=1, x_label=f'Energy (ADU)',\n", " y_label='Number of occurrences', figsize='2col',\n", " y_log=True, x_range=(-50, 500),\n", - " legend='top-center-frame-2col'\n", - ")" + " legend='top-center-frame-2col',\n", + ")\n", + "plt.title(f'run {run} - {karabo_da}')\n", + "plt.grid()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if absolute_gain :\n", + " d=[]\n", + " ho, eo, co, so = histCalAbsGainCor.get()\n", + " d.append({\n", + " 'x': co,\n", + " 'y': ho,\n", + " 'y_err': np.sqrt(ho[:]),\n", + " 'drawstyle': 'steps-mid',\n", + " 'errorstyle': 'bars',\n", + " 'errorcoarsing': 2,\n", + " 'label': 'Absolute gain corr.'\n", + " })\n", + "\n", + " if pattern_classification:\n", + " ho, eo, co, so = histCalGainCorSingles.get()\n", + " d.append({\n", + " 'x': co,\n", + " 'y': ho,\n", + " 'y_err': np.sqrt(ho[:]),\n", + " 'drawstyle': 'steps-mid',\n", + " 'errorstyle': 'bars',\n", + " 'errorcoarsing': 2,\n", + " 'label': 'Isolated photons (singles)'\n", + " })\n", + "\n", + " fig = xana.simplePlot(\n", + " d, aspect=1, x_label=f'Energy ({plot_unit})',\n", + " y_label='Number of occurrences', figsize='2col',\n", + " y_log=True, \n", + " x_range=np.array((-50, 500))*hscale,\n", + " legend='top-center-frame-2col',\n", + " )\n", + " plt.grid()\n", + " plt.title(f'run {run} - {karabo_da}')" ] }, { @@ -692,7 +771,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.7" + "version": "3.8.11" }, "latex_envs": { "LaTeX_envs_menu_present": true, diff --git a/pyproject.toml b/pyproject.toml index ac3b70bf15486cd8bf285f4acb60d60bda7ad460..9b2d60a2d97e513e063f6b3b742407686218413b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,9 @@ disable = "C0330, C0326" [tool.pylint.format] max-line-length = "88" +[flake8] +max-line-length = 88 + [tool.pytest.ini_options] norecursedirs = [ "legacy", diff --git a/reportservice/report_service.py b/reportservice/report_service.py index 4e6859f7e990a481f4a16b5dab10a06745de0b9f..0e3da7e7cf04bcac1922a4fee93e77728b1eeac8 100644 --- a/reportservice/report_service.py +++ b/reportservice/report_service.py @@ -64,7 +64,7 @@ async def wait_jobs(joblist): for job in joblist: if str(job) in line: found_jobs.add(job) - if len(found_jobs) == 0: + if not found_jobs: logging.info('Jobs are finished') break await asyncio.sleep(10) diff --git a/setup.py b/setup.py index 2b9a511031515aa1a51cca84c549ec6ffed28973..f7792741fc29b64c81da727d11a0d2dc39f798b9 100644 --- a/setup.py +++ b/setup.py @@ -26,10 +26,8 @@ class PreInstallCommand(build): def run(self): version = check_output(["git", "describe", "--tag"]).decode("utf8") version = version.replace("\n", "") - file = open("src/xfel_calibrate/VERSION.py", "w") - file.write('__version__="{}"'.format(version)) - file.close() - + with open("src/xfel_calibrate/VERSION.py", "w") as file: + file.write('__version__="{}"'.format(version)) build.run(self) @@ -82,6 +80,7 @@ setup( "astcheck==0.2.5", "astsearch==0.2.0", "dill==0.3.0", + "dynaconf==3.1.4", "extra_data==1.4.1", "extra_geom==1.1.1", "gitpython==3.1.0", diff --git a/src/cal_tools/agipdlib.py b/src/cal_tools/agipdlib.py index c5a51c2ec7fac3352e49af77e87861ea2d077cef..2881c9589828ae8b1c6dbebeb4eea3b457bb3250 100644 --- a/src/cal_tools/agipdlib.py +++ b/src/cal_tools/agipdlib.py @@ -1,3 +1,4 @@ +import posixpath import traceback import zlib from multiprocessing.pool import ThreadPool @@ -21,6 +22,7 @@ from cal_tools.agipdutils import ( melt_snowy_pixels, ) from cal_tools.enums import AgipdGainMode, BadPixels, SnowResolution +from cal_tools.h5_copy_except import h5_copy_except_paths from cal_tools.tools import get_constant_from_db_and_time @@ -247,8 +249,12 @@ class AgipdCorrections: self.h5_index_path = h5_index_path self.rng_pulses = max_pulses # avoid list(range(*[0]])) - self.pulses_lst = list(range(*max_pulses)) \ - if not (len(max_pulses) == 1 and max_pulses[0] == 0) else max_pulses # noqa + self.pulses_lst = ( + list(range(*max_pulses)) + if max_pulses != [0] + else max_pulses + ) + self.max_cells = max_cells self.gain_mode = gain_mode self.comp_threads = comp_threads @@ -868,11 +874,7 @@ class AgipdCorrections: """ # Calculate the pulse step from the chosen max_pulse range - if len(self.rng_pulses) == 3: - pulse_step = self.rng_pulses[2] - else: - pulse_step = 1 - + pulse_step = self.rng_pulses[2] if len(self.rng_pulses) == 3 else 1 # Validate selected pulses range: # 1) Make sure the range max doesn't have non-valid idx. if self.pulses_lst[-1] + pulse_step > int(allpulses[-1]): @@ -983,31 +985,13 @@ class AgipdCorrections: # these are touched in the correct function, do not copy them here dont_copy = ["data", "cellId", "trainId", "pulseId", "status", "length"] - dont_copy = [agipd_base + "image/{}".format(do) - for do in dont_copy] - - # don't copy these as we may need to adjust if we filter trains - dont_copy += [idx_base + "{}/first".format(do) - for do in ["image", ]] - dont_copy += [idx_base + "{}/count".format(do) - for do in ["image", ]] - dont_copy += [idx_base + "{}/last".format(do) - for do in ["image", ]] - dont_copy += [idx_base + "{}/status".format(do) - for do in ["image", ]] - - # a visitor to copy everything else - def visitor(k, item): - if k not in dont_copy: - - if isinstance(item, h5py.Group): - outfile.create_group(k) - elif isinstance(item, h5py.Dataset): - group = str(k).split("/") - group = "/".join(group[:-1]) - infile.copy(k, outfile[group]) - - infile.visititems(visitor) + dont_copy = [posixpath.join(agipd_base, "image", ds) + for ds in dont_copy] + + # don't copy index as we may need to adjust if we filter trains + dont_copy.append(posixpath.join(idx_base, "image")) + + h5_copy_except_paths(infile, outfile, dont_copy) # sanitize indices for do in ["image", ]: @@ -1034,10 +1018,7 @@ class AgipdCorrections: if diff: if i < len(cntsv): cntsv = np.insert(cntsv, i, 0) - if i == 0: - fidxv = np.insert(fidxv, i, 0) - else: - fidxv = np.insert(fidxv, i, fidxv[i]) + fidxv = np.insert(fidxv, i, 0) if i == 0 else np.insert(fidxv, i, fidxv[i]) else: # append if at the end of the array cntsv = np.append(cntsv, 0) @@ -1215,7 +1196,7 @@ class AgipdCorrections: # This will handle some historical data in a different format # constant dimension injected first - if slopesPC.shape[0] == 10 or slopesPC.shape[0] == 11: + if slopesPC.shape[0] in [10, 11]: slopesPC = np.moveaxis(slopesPC, 0, 3) slopesPC = np.moveaxis(slopesPC, 0, 2) diff --git a/src/cal_tools/agipdutils.py b/src/cal_tools/agipdutils.py index e7cbd4667317e0a543ae865ed7e6d8ecaf33d1c6..280f07324f8076855a5e3b76eb59a38f2c5c9ab7 100644 --- a/src/cal_tools/agipdutils.py +++ b/src/cal_tools/agipdutils.py @@ -111,7 +111,7 @@ def get_shadowed_stripe(data, threshold, fraction): for idx, i in enumerate(A[1:-1]): if i - 1 not in A: continue - if len(tmp_idx) == 0: + if not tmp_idx: tmp_idx.append(i) continue if tmp_idx[-1] + 1 == i and ( diff --git a/src/cal_tools/agipdutils_ff.py b/src/cal_tools/agipdutils_ff.py index 6f3234ff3b1244b2df21b9b032f1218f5221ea17..ff57587d7f55c60dd59710fe78a156d6507343c3 100644 --- a/src/cal_tools/agipdutils_ff.py +++ b/src/cal_tools/agipdutils_ff.py @@ -228,9 +228,6 @@ def get_mask(fit_summary: Dict[str, Any], d01 = fit_summary['g1mean'] - m0 mask = 0 - if not fit_summary['is_valid']: - mask |= BadPixelsFF.FIT_FAILED - if not fit_summary['has_accurate_covar']: mask |= BadPixelsFF.ACCURATE_COVAR diff --git a/src/cal_tools/ana_tools.py b/src/cal_tools/ana_tools.py index edc60e75d5b0b0c509176a241eb27d87db98569d..4680e3b4267a4b615087d16a240d397f9e44764d 100644 --- a/src/cal_tools/ana_tools.py +++ b/src/cal_tools/ana_tools.py @@ -172,10 +172,7 @@ def combine_lists(*args, names=None): if isinstance(names, (list, tuple)): assert len(names) == len(args) - d_possible_params = [] - for par in possible_params: - d_possible_params.append(dict(zip(names, par))) - return d_possible_params + return [dict(zip(names, par)) for par in possible_params] return possible_params diff --git a/src/cal_tools/dssclib.py b/src/cal_tools/dssclib.py index fb365023fb4da2f7fb58521fbde6cbd5e9d70e7e..59ca874d2c896224418347577f2a991789fb00b4 100644 --- a/src/cal_tools/dssclib.py +++ b/src/cal_tools/dssclib.py @@ -45,7 +45,7 @@ def _get_gain_encoded_val(gainSettingsMap: Dict[str, int]) -> int: def get_dssc_ctrl_data(in_folder, slow_data_pattern, - slow_data_aggregators, run_number): + slow_data_aggregators, run_number, slow_data_path): """Obtaining dssc specific slow data like: operating frequency, target gain and encoded gain code from filename, etc. """ @@ -69,28 +69,12 @@ def get_dssc_ctrl_data(in_folder, slow_data_pattern, if os.path.exists(f): ctrlDataFiles[quadrant + 1] = f - if len(ctrlDataFiles) == 0: + if not ctrlDataFiles: print("ERROR: no Slow Control Data found!") return targetGainAll, encodedGainAll, operatingFreqAll daq_format = None - ctrlloc = None - filename = next(iter(ctrlDataFiles.values())) - with h5py.File(filename, 'r') as ctlrh5file: - if '/METADATA/dataSources/deviceId' in ctlrh5file: - ctrlloc = ctlrh5file['/METADATA/dataSources/deviceId'][0] - daq_format = ctlrh5file['/METADATA/dataFormatVersion'][0].decode( - "utf-8") - elif '/METADATA/deviceId' in ctlrh5file: - ctrlloc = ctlrh5file['/METADATA/deviceId'][0] - else: - print("ERROR: no Slow Control Data found in files!") - return targetGainAll, encodedGainAll, operatingFreqAll - - ctrlloc = ctrlloc.decode("utf-8") - ctrlloc = ctrlloc[:ctrlloc.find('/')] - tGain = {} encodedGain = {} operatingFreqs = {} @@ -98,16 +82,13 @@ def get_dssc_ctrl_data(in_folder, slow_data_pattern, if quadrant in ctrlDataFiles.keys(): file = ctrlDataFiles[quadrant] with h5py.File(file) as h5file: - iramp_path = f"/RUN/{ctrlloc}/FPGA/PPT_Q{quadrant}/gain/irampFineTrm/value" + iramp_path = f"/RUN/{slow_data_path}{quadrant}/gain/irampFineTrm/value" if not daq_format: tGain[quadrant] = 0.0 # 0.0 is default value for TG - if iramp_path in h5file: - irampSettings = h5file[iramp_path][0] - else: - irampSettings = "Various" + irampSettings = h5file[iramp_path][0] if iramp_path in h5file else "Various" else: - epcConfig = h5file[f'/RUN/{ctrlloc}/FPGA/PPT_Q{quadrant}/epcRegisterFilePath/value'][0]\ + epcConfig = h5file[f'/RUN/{slow_data_path}{quadrant}/epcRegisterFilePath/value'][0]\ .decode("utf-8") epcConfig = epcConfig[epcConfig.rfind('/') + 1:] @@ -117,17 +98,21 @@ def get_dssc_ctrl_data(in_folder, slow_data_pattern, targGain) if targGain is not None else 0.0 irampSettings = h5file[iramp_path][0].decode("utf-8") - gainSettingsMap = {} - for coarseParam in ['fcfEnCap', 'csaFbCap', 'csaResistor']: - gainSettingsMap[coarseParam] = int( - h5file[f'/RUN/{ctrlloc}/FPGA/PPT_Q{quadrant}/gain/{coarseParam}/value'][0]) + gainSettingsMap = { + coarseParam: int( + h5file[ + f'/RUN/{slow_data_path}{quadrant}/gain/{coarseParam}/value' + ][0] + ) + for coarseParam in ['fcfEnCap', 'csaFbCap', 'csaResistor'] + } gainSettingsMap['trimmed'] = np.int64( 1) if irampSettings == "Various" else np.int64(0) encodedGain[quadrant] = _get_gain_encoded_val(gainSettingsMap) - opFreq = h5file[f'/RUN/{ctrlloc}/FPGA/PPT_Q{quadrant}/sequencer/cycleLength/value'][0] + opFreq = h5file[f'/RUN/{slow_data_path}{quadrant}/sequencer/cycleLength/value'][0] # The Operating Frequency of the detector should be in MHz. # Here the karabo operation mode is converted to acquisition rate: # 22 corresponds to 4.5 MHz, 44 to 2.25 MHz, etc. diff --git a/src/cal_tools/h5_copy_except.py b/src/cal_tools/h5_copy_except.py new file mode 100644 index 0000000000000000000000000000000000000000..d983ec1280be0aeb24d9c2fdd43c12a61bee80d7 --- /dev/null +++ b/src/cal_tools/h5_copy_except.py @@ -0,0 +1,50 @@ +import os + +import h5py + +def paths_to_tree(paths): + """Convert paths to a nested-dict tree, with True at leaves""" + tree = {} + + for path in paths: + tree_part = tree + path_names = path.strip('/').split('/') + for name in path_names[:-1]: + tree_part = tree_part.setdefault(name, {}) + if tree_part is True: + break # A previous path was a prefix of this one + else: + tree_part[path_names[-1]] = True + + return tree + + +def copy_except_tree(src_group: h5py.Group, dest_group: h5py.Group, except_tree): + for name in src_group: + except_tree_part = except_tree.get(name) + if except_tree_part is True: # Totally excluded + pass + elif except_tree_part is None: # Not excluded + src_group.copy(name, dest_group, name) + else: # Partially excluded + src_subgroup = src_group[name] + assert isinstance(src_subgroup, h5py.Group) + copy_except_tree( + src_subgroup, dest_group.require_group(name), except_tree_part + ) + + +def h5_copy_except_paths(src_group, dest_group, except_paths): + """Copy an HDF5 file except for a list of paths to ignore + + This tries to copy entire groups where possible, to minimise overhead. + """ + # If src_group/dest_group are file paths, open them with h5py. + if isinstance(src_group, (str, bytes, os.PathLike)): + with h5py.File(src_group, 'r') as src_file: + return h5_copy_except_paths(src_file, dest_group, except_paths) + if isinstance(dest_group, (str, bytes, os.PathLike)): + with h5py.File(dest_group, 'a') as dest_file: + return h5_copy_except_paths(src_group, dest_file, except_paths) + + copy_except_tree(src_group, dest_group, paths_to_tree(except_paths)) diff --git a/src/cal_tools/lpdlib.py b/src/cal_tools/lpdlib.py index 217c769fcc6faef919697b2b671a1c9e3a65c488..7ede9b52dcea17614e899586496922806d060340 100644 --- a/src/cal_tools/lpdlib.py +++ b/src/cal_tools/lpdlib.py @@ -1,5 +1,5 @@ import copy -from typing import Optional, Tuple +from typing import List, Optional, Tuple import h5py import numpy as np @@ -50,8 +50,6 @@ class LpdCorrections: :param infile: to be corrected h5py input file :param outfile: writeable h5py output file - :param max_cell: maximum number of memory cells to handle, e.g. if - calibration constants only exist for a subset of cells :param channel: module/channel to correct :param max_pulses: maximum pulse id to consider for preview histograms :param bins_gain_vs_signal: number of bins for gain vs signal histogram @@ -178,7 +176,8 @@ class LpdCorrections: self.create_output_datasets() self.initialized = True - def split_gain(self, d): + @staticmethod + def split_gain(d): """ Split gain information off 16-bit LPD data Gain information can be found in bits 12 and 13 (0-based) @@ -558,7 +557,7 @@ class LpdCorrections: self.hists_gain_vs_signal), (self.low_edges, self.high_edges, self.signal_edges)) - def initialize_from_db(self, dbparms: Tuple['DBParms', 'DBParms_timeout'], + def initialize_from_db(self, dbparms: List[Tuple['DBParms', 'DBParms_timeout']], karabo_id: str, karabo_da: str, only_dark: Optional[bool] = False): """ Initialize calibration constants from the calibration database @@ -750,11 +749,7 @@ class LpdCorrections: """ offsets = None - rel_gains = None - rel_gains_b = None - bpixels = None noises = None - flat_fields = None with h5py.File(filename, "r") as calfile: bpixels = calfile["{}/{}/data".format(qm, "BadPixelsCI")][()] bpix = calfile["{}/{}/data".format(qm, "BadPixelsFF")][()] diff --git a/src/cal_tools/mdc_config.py b/src/cal_tools/mdc_config.py index 1f5f9cf2dc383e76c5a1d6cd0c7a373b89e6ae83..3c491e0ef651fc096a51623ace6d2338109fcfea 100644 --- a/src/cal_tools/mdc_config.py +++ b/src/cal_tools/mdc_config.py @@ -1,11 +1,15 @@ -class MDC_config: +from pathlib import Path - mdconf = {} - mdconf['user-id'] = '' - mdconf['user-secret'] = '' - mdconf['user-email'] = 'calibration@example.com' - mdconf['token-url'] = 'https://in.xfel.eu/metadata/oauth/token' - mdconf['refresh-url'] = 'https://in.xfel.eu/metadata/oauth/token' - mdconf['auth-url'] = 'https://in.xfel.eu/metadata/oauth/authorize' - mdconf['scope'] = '' - mdconf['base-api-url'] = 'https://in.xfel.eu/metadata/api/' +from dynaconf import Dynaconf + +config_dir = Path(__file__).parent.resolve() + +mdc_config = Dynaconf( + envvar_prefix="CAL_CAL_TOOLS", + settings_files=[ + config_dir / "mdc_config.yaml", + config_dir / "mdc_config.secrets.yaml", + Path("~/.config/pycalibration/cal_tools/mdc_config.yaml").expanduser(), + ], + merge_enabled=True, +) diff --git a/src/cal_tools/mdc_config.yaml b/src/cal_tools/mdc_config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..923407bfb14123905e58f336738bde582d97dea1 --- /dev/null +++ b/src/cal_tools/mdc_config.yaml @@ -0,0 +1,8 @@ +auth-url: https://in.xfel.eu/metadata/oauth/authorize +base-api-url: https://in.xfel.eu/metadata/api/ +refresh-url: https://in.xfel.eu/metadata/oauth/token +scope: '' +token-url: https://in.xfel.eu/metadata/oauth/token +user-email: calibration@example.com +user-id: '@note add this to secrets file' +user-secret: '@note add this to secrets file' diff --git a/src/cal_tools/metrology.py b/src/cal_tools/metrology.py index 618888b058408df520c1d4e904a332be7c73a00d..18330747dbb5189965350d4325a20c1cae545188 100644 --- a/src/cal_tools/metrology.py +++ b/src/cal_tools/metrology.py @@ -62,14 +62,14 @@ def getModulePosition(metrologyFile, moduleId): with h5py.File(metrologyFile, "r") as fh: # Check if the keys actually appear in the metrology file for key in h5Keys: - if not key in fh: + if key not in fh: raise ValueError("Invalid key '{}'".format(key)) # Extract the positions from the hdf5 groups corresponding # to a module, if the module has dataset 'Position'. positions = [ np.asarray(fh[key]['Position']) for key in h5Keys if 'Position' in fh[key] ] - if len(positions) == 0: + if not positions: # This is the case when requesting a quadrant; e.g. # getModulePosition('Q1'). Key is valid, but quadrant # has no location (yet). @@ -115,10 +115,7 @@ def translateToModuleBL(tilePositions): # In the clockwise order of LPD tiles, the 8th # tile in the list is the bottom left tile bottomLeft8th = np.asarray([0., moduleCoords[8][1]]) - # Translate coordinates to the bottom left corner - # of the bottom left tile - bottomLeft = moduleCoords - bottomLeft8th - return bottomLeft + return moduleCoords - bottomLeft8th def plotSupermoduleData(tileData, metrologyPositions, zoom=1., vmin=100., vmax=6000.): @@ -427,7 +424,7 @@ def positionFileList(filelist, datapath, geometry_file, quad_pos, nImages='all', indices += list(np.arange(first, first+count)) - if len(indices) == 0: + if not indices: continue indices = np.unique(np.sort(np.array(indices).astype(np.int))) indices = indices[indices < f[datapath.format(ch)].shape[0]] @@ -534,7 +531,7 @@ def matchedFileList(filelist, datapath, nImages='all', trainIds=None, nwa=False, indices += list(np.arange(first, first+count)) - if len(indices) == 0: + if not indices: continue indices = np.unique(np.sort(np.array(indices).astype(np.int))) indices = indices[indices < f[datapath.format(ch)].shape[0]] diff --git a/src/cal_tools/plotting.py b/src/cal_tools/plotting.py index d2434ffafbb9a0f54a299df1a424be036c810d22..a7907f9264e45c918f6b34847e66c17f7edb7a3c 100644 --- a/src/cal_tools/plotting.py +++ b/src/cal_tools/plotting.py @@ -283,7 +283,7 @@ def show_processed_modules(dinstance: str, constants: Optional[Dict[str, Any]], # Create a dict that contains the range of tiles, in the figure, # that belong to a module. - ranges = dict() + ranges = {} tile_count = 0 for quadrant in range(1, quadrants+1): for module in range(1, modules+1): diff --git a/src/cal_tools/tools.py b/src/cal_tools/tools.py index 3896dcdfc934b7cc7aed0b0a3c5da154d5b07bf5..9855f64cb90b9ef4a6dbd3d19ea39521cffd002a 100644 --- a/src/cal_tools/tools.py +++ b/src/cal_tools/tools.py @@ -23,7 +23,7 @@ from metadata_client.metadata_client import MetadataClient from notebook.notebookapp import list_running_servers from .ana_tools import save_dict_to_hdf5 -from .mdc_config import MDC_config +from .mdc_config import mdc_config def parse_runs(runs, return_type=str): @@ -141,8 +141,8 @@ def map_modules_from_files(filelist, file_inset, quadrants, modules_per_quad): total_file_size = 0 module_files = {} mod_ids = {} - for quadrant in range(0, quadrants): - for module in range(0, modules_per_quad): + for quadrant in range(quadrants): + for module in range(modules_per_quad): name = "Q{}M{}".format(quadrant + 1, module + 1) module_files[name] = Queue() num = quadrant * 4 + module @@ -201,8 +201,7 @@ def get_notebook_name(): params={'token': ss.get('token', '')}) for nn in json.loads(response.text): if nn['kernel']['id'] == kernel_id: - relative_path = nn['notebook']['path'] - return relative_path + return nn['notebook']['path'] except: return environ.get("CAL_NOTEBOOK_NAME", "Unknown Notebook") @@ -216,17 +215,18 @@ def get_run_info(proposal, run): :return: dictionary with run information """ - mdconf = MDC_config.mdconf - mdc = MetadataClient(client_id=mdconf['user-id'], - client_secret=mdconf['user-secret'], - user_email=mdconf['user-email'], - token_url=mdconf['token-url'], - refresh_url=mdconf['refresh-url'], - auth_url=mdconf['auth-url'], - scope=mdconf['scope'], - base_api_url=mdconf['base-api-url']) - - runs = mdc.get_proposal_runs(mdc, proposal_number=proposal, + mdc = MetadataClient( + client_id=mdc_config['user-id'], + client_secret=mdc_config['user-secret'], + user_email=mdc_config['user-email'], + token_url=mdc_config['token-url'], + refresh_url=mdc_config['refresh-url'], + auth_url=mdc_config['auth-url'], + scope=mdc_config['scope'], + base_api_url=mdc_config['base-api-url'], + ) + + runs = mdc.get_proposal_runs(proposal_number=proposal, run_number=run) run_id = runs['data']['runs'][0]['id'] @@ -333,26 +333,30 @@ def save_const_to_h5(db_module: str, karabo_id: str, metadata.calibration_constant_version.raw_data_location = file_loc - dpar = {} - for parm in metadata.detector_condition.parameters: - dpar[parm.name] = {'lower_deviation_value': parm.lower_deviation, - 'upper_deviation_value': parm.upper_deviation, - 'value': parm.value, - 'flg_logarithmic': parm.logarithmic} + dpar = { + parm.name: { + 'lower_deviation_value': parm.lower_deviation, + 'upper_deviation_value': parm.upper_deviation, + 'value': parm.value, + 'flg_logarithmic': parm.logarithmic, + } + for parm in metadata.detector_condition.parameters + } creation_time = metadata.calibration_constant_version.begin_at raw_data = metadata.calibration_constant_version.raw_data_location constant_name = metadata.calibration_constant.__class__.__name__ - data_to_store = {} - data_to_store['condition'] = dpar - data_to_store['db_module'] = db_module - data_to_store['karabo_id'] = karabo_id - data_to_store['constant'] = constant_name - data_to_store['data'] = data - data_to_store['creation_time'] = creation_time - data_to_store['file_loc'] = raw_data - data_to_store['report'] = report + data_to_store = { + 'condition': dpar, + 'db_module': db_module, + 'karabo_id': karabo_id, + 'constant': constant_name, + 'data': data, + 'creation_time': creation_time, + 'file_loc': raw_data, + 'report': report, + } ofile = f"{out_folder}/const_{constant_name}_{db_module}.h5" if isfile(ofile): @@ -710,15 +714,12 @@ def get_constant_from_db_and_time(karabo_id: str, karabo_da: str, condition, empty_constant, cal_db_interface, creation_time, int(print_once), timeout, ntries) - if m: - if m.comm_db_success: - return data, m.calibration_constant_version.begin_at - else: - # retun none for injection time if communication with db failed. - # reasons (no constant or condition found, - # or network problem) - return data, None + if m and m.comm_db_success: + return data, m.calibration_constant_version.begin_at else: + # return None for injection time if communication with db failed. + # reasons (no constant or condition found, + # or network problem) return data, None diff --git a/src/xfel_calibrate/calibrate.py b/src/xfel_calibrate/calibrate.py index 17e4aa0ca81776f63bf7ef53cb8ae0e46c689b24..3f6a1101738074dddb2a70f31d3927c5a41c8449 100755 --- a/src/xfel_calibrate/calibrate.py +++ b/src/xfel_calibrate/calibrate.py @@ -28,6 +28,7 @@ from .finalize import tex_escape from .notebooks import notebooks from .settings import ( default_report_path, + finalize_time_limit, free_nodes_cmd, launcher_command, max_reserved, @@ -77,7 +78,7 @@ def make_initial_parser(**kwargs): ' report') parser.add_argument('--concurrency-par', type=str, - help='Name of cuncurrency parameter.' + help='Name of concurrency parameter.' 'If not given, it is taken from configuration.') parser.add_argument('--priority', type=int, default=2, @@ -157,10 +158,7 @@ def consolize_name(name): def deconsolize_args(args): """ Variable names have underscores """ - new_args = {} - for k, v in args.items(): - new_args[k.replace("-", "_")] = v - return new_args + return {k.replace("-", "_"): v for k, v in args.items()} def extract_title_author_version(nb): @@ -311,7 +309,7 @@ def balance_sequences(in_folder: str, run: int, sequences: List[int], sequence_files.extend(in_path.glob(f"*{k_da}-S*.h5")) # Extract sequences from input files. - seq_nums = set([int(sf.stem[-5:]) for sf in sequence_files]) + seq_nums = {int(sf.stem[-5:]) for sf in sequence_files} # Validate selected sequences with sequences in in_folder if sequences != [-1]: @@ -466,12 +464,8 @@ def add_args_from_nb(nb, parser, cvar=None, no_required=False): default = p.value if (not required) else None - if p.type == list or p.name == cvar: - if p.type is list: - ltype = type(p.value[0]) - else: - ltype = p.type - + if issubclass(p.type, list) or p.name == cvar: + ltype = type(p.value[0]) if issubclass(p.type, list) else p.type range_allowed = "RANGE ALLOWED" in p.comment.upper() if p.comment else False pars_group.add_argument(f"--{consolize_name(p.name)}", nargs='+', @@ -480,7 +474,7 @@ def add_args_from_nb(nb, parser, cvar=None, no_required=False): help=helpstr, required=required, action=make_intelli_list(ltype) if range_allowed else None) - elif p.type == bool: + elif issubclass(p.type, bool): # For a boolean, make --XYZ and --no-XYZ options. alt_group = pars_group.add_mutually_exclusive_group(required=required) alt_group.add_argument(f"--{consolize_name(p.name)}", @@ -528,14 +522,6 @@ def extend_params(nb, extend_func_name): fcc["source"] += "\n" + extension -def has_parm(parms, name): - """ Check if a parameter of `name` exists in parms """ - for p in parms: - if p.name == name: - return True - return False - - def get_par_attr(parms, key, attr, default=None): """ Return the type of parameter with name key @@ -557,15 +543,14 @@ def flatten_list(l): :param l: List or a string :return: Same string or string with first and last entry of a list """ - if isinstance(l, list): - if len(l) > 1: - return '{}-{}'.format(l[0], l[-1]) - elif len(l) == 1: - return '{}'.format(l[0]) - else: - return '' - else: + if not isinstance(l, list): return str(l) + if len(l) > 1: + return '{}-{}'.format(l[0], l[-1]) + elif len(l) == 1: + return '{}'.format(l[0]) + else: + return '' def set_figure_format(nb, enable_vector_format): @@ -621,6 +606,38 @@ def create_finalize_script(fmt_args, temp_path, job_list) -> str: return f_name +def run_finalize(fmt_args, temp_path, job_list, sequential=False): + finalize_script = create_finalize_script(fmt_args, temp_path, job_list) + + cmd = [] + if not sequential: + cmd = [ + 'sbatch', + '--parsable', + '--requeue', + '--output', f'{temp_path}/slurm-%j.out', + '--job-name', 'xfel-cal-finalize', + '--time', finalize_time_limit, + '--partition', 'exfel', + "--dependency=afterany:" + ":".join(str(j) for j in job_list), + ] + print(" ".join(cmd)) + + cmd += [ + os.path.join(PKG_DIR, "bin", "slurm_finalize.sh"), # path to helper sh + sys.executable, # Python with calibration machinery installed + temp_path, + finalize_script, + ] + + output = check_output(cmd).decode('utf8') + jobid = None + if not sequential: + jobid = output.partition(';')[0].strip() + print("Submitted finalize job: {}".format(jobid)) + return jobid + + def save_executed_command(run_tmp_path, version): """ Create a file with string used to execute `xfel_calibrate` @@ -662,12 +679,13 @@ def get_slurm_partition_or_reservation(args) -> List[str]: return ['--partition', sprof] -def get_launcher_command(args, temp_path, dep_jids=()) -> List[str]: +def get_launcher_command(args, temp_path, after_ok=(), after_any=()) -> List[str]: """ Return a slurm launcher command :param args: Command line arguments :param temp_path: Temporary path to run job - :param dep_jids: A list of dependent jobs + :param after_ok: A list of jobs which must succeed first + :param after_any: A list of jobs which must finish first, but may fail :return: List of commands and parameters to be used by subprocess """ @@ -683,11 +701,13 @@ def get_launcher_command(args, temp_path, dep_jids=()) -> List[str]: launcher_slurm.append("--mem={}G".format(args.get('slurm_mem', '500'))) - if len(dep_jids): - launcher_slurm.append( - "--dependency=afterok:" + ":".join(str(j) for j in dep_jids) - ) - + deps = [] + if after_ok: + deps.append("afterok:" + ":".join(str(j) for j in after_ok)) + if after_any: + deps.append("afterany:" + ":".join(str(j) for j in after_any)) + if deps: + launcher_slurm.append("--dependency=" + ",".join(deps)) return launcher_slurm @@ -709,8 +729,8 @@ def remove_duplications(l) -> list: def concurrent_run( temp_path: str, nb, nb_path: Path, args: dict, cparm=None, cval=None, - finalize_script: Optional[str] = None, cluster_cores=8, - sequential=False, dep_jids=(), + cluster_cores=8, + sequential=False, after_ok=(), after_any=(), show_title=True, user_venv: Optional[Path] = None, ) -> Optional[str]: """ Launch a concurrent job on the cluster via Slurm @@ -726,7 +746,7 @@ def concurrent_run( # first convert the notebook parms = extract_parameters(nb, lang='python') - if has_parm(parms, "cluster_profile"): + if any(p.name == "cluster_profile" for p in parms): cluster_profile = f"{args['cluster_profile']}_{suffix}" else: # Don't start ipcluster if there's no cluster_profile parameter @@ -744,10 +764,10 @@ def concurrent_run( nbformat.write(new_nb, nbpath) # then run an sbatch job - srun_base = [] + cmd = [] if not sequential: - srun_base = get_launcher_command(args, temp_path, dep_jids) - print(" ".join(srun_base)) + cmd = get_launcher_command(args, temp_path, after_ok, after_any) + print(" ".join(cmd)) if user_venv: print(f"Running job in user venv at {user_venv}\n") @@ -755,7 +775,7 @@ def concurrent_run( else: python = python_path # From settings.py, default is sys.executable - srun_base += [ + cmd += [ os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"), # path to helper sh os.path.abspath(nbpath), # path to notebook python, # path to python to run notebook (& ipcluster) @@ -763,12 +783,10 @@ def concurrent_run( nb_path.stem.upper(), args["detector"].upper(), args["type"].upper(), - finalize_script or '', str(cluster_cores), - sys.executable, # Python for calib machinery (nbconvert, finalize) ] - output = check_output(srun_base).decode('utf8') + output = check_output(cmd).decode('utf8') jobid = None if not sequential: jobid = output.partition(';')[0].strip() @@ -810,7 +828,7 @@ def make_par_table(parms, run_tmp_path: str): if len(value) > max_len[1]: len_parms[1] = max_len[1] value = split_len(value, max_len[1]) - if p.type is str: + if issubclass(p.type, str): value = "``{}''".format(value) comment = tex_escape(str(p.comment)[1:]) l_parms.append([name, value, comment]) @@ -842,7 +860,7 @@ def make_par_table(parms, run_tmp_path: str): finfile.write(textwrap.dedent(tmpl.render(p=col_type, lines=l_parms))) -def make_pipeline_yaml(parms, version, report_path, output_dir): +def make_pipeline_yaml(parms, version, concurrency, report_path, output_dir): """Adds information from arguments to metadata file""" metadata = cal_tools.tools.CalibrationMetadata(output_dir) @@ -854,6 +872,7 @@ def make_pipeline_yaml(parms, version, report_path, output_dir): metadata["pycalibration-version"] = version metadata["report-path"] = f"{report_path}.pdf" + metadata["concurrency"] = concurrency metadata.save() @@ -881,10 +900,18 @@ def run(): pre_notebooks = nb_info.get("pre_notebooks", []) notebook = nb_info["notebook"] dep_notebooks = nb_info.get("dep_notebooks", []) - concurrency = nb_info.get("concurrency", None) + concurrency = nb_info.get("concurrency", {'parameter': None}) + + concurrency_par = args["concurrency_par"] or concurrency['parameter'] + if concurrency_par == concurrency['parameter']: + # Use the defaults from notebook.py to split the work into several jobs + concurrency_defval = concurrency.get('default concurrency', None) + concurrency_func = concurrency.get('use function', None) + else: + # --concurrency-par specified something different from notebook.py: + # don't use the associated settings from there. + concurrency_defval = concurrency_func = None - if args["concurrency_par"] is not None: - concurrency["parameter"] = args["concurrency_par"] notebook_path = Path(PKG_DIR, notebook) nb = nbformat.read(notebook_path, as_version=4) @@ -910,9 +937,8 @@ def run(): run_uuid = f"t{datetime.now().strftime('%y%m%d_%H%M%S')}" # check if concurrency parameter is given and we run concurrently - if not has_parm(parms, concurrency["parameter"]) and concurrency["parameter"] is not None: - msg = "Notebook cannot be run concurrently: no {} parameter".format( - concurrency["parameter"]) + if not any(p.name == "parameter" for p in parms) and concurrency_par is not None: + msg = f"Notebook cannot be run concurrently: no {concurrency_par} parameter" warnings.warn(msg, RuntimeWarning) # If not explicitly specified, use a new profile for ipcluster @@ -953,7 +979,12 @@ def run(): report_to = out_path / report_to # Write metadata about calibration job to output folder - make_pipeline_yaml(parms, version, report_to, out_path) + concurr_details = { + 'parameter': concurrency_par, + 'default': concurrency_defval, + 'function': concurrency_func, + } + make_pipeline_yaml(parms, version, concurr_details, report_to, out_path) folder = get_par_attr(parms, 'in_folder', 'value', '') @@ -974,13 +1005,12 @@ def run(): 'request_time': request_time, 'submission_time': submission_time } - finalize = None # Script created just before we submit the last job user_venv = nb_info.get("user", {}).get("venv") if user_venv: user_venv = Path(user_venv.format(**args)) - joblist = [] + pre_jobs = [] cluster_cores = concurrency.get("cluster cores", 8) # Check if there are pre-notebooks for pre_notebook in pre_notebooks: @@ -992,106 +1022,95 @@ def run(): cluster_cores=cluster_cores, sequential=sequential, user_venv=user_venv ) - joblist.append(jobid) + pre_jobs.append(jobid) - if concurrency.get("parameter", None) is None: - if not dep_notebooks: - finalize = create_finalize_script(fmt_args, run_tmp_path, joblist) + main_jobs = [] + if concurrency_par is None: jobid = concurrent_run(run_tmp_path, nb, notebook_path, args, - finalize_script=finalize, cluster_cores=cluster_cores, sequential=sequential, - dep_jids=joblist, user_venv=user_venv + after_ok=pre_jobs, user_venv=user_venv ) - joblist.append(jobid) + main_jobs.append(jobid) else: - cvar = concurrency["parameter"] - cvals = args.get(cvar, None) + cvals = args.get(concurrency_par, None) - con_func = concurrency.get("use function", None) # Consider [-1] as None - if cvals is None or cvals == [-1]: - defcval = concurrency.get("default concurrency", None) - if defcval is not None: - print(f"Concurrency parameter '{cvar}' " - f"is taken from notebooks.py") - if not isinstance(defcval, (list, tuple)): - cvals = range(defcval) - else: - cvals = defcval + if (cvals is None or cvals == [-1]) and concurrency_defval is not None: + print(f"Concurrency parameter '{concurrency_par}' " + f"is taken from notebooks.py") + cvals = concurrency_defval if isinstance(concurrency_defval, (list, tuple)) else range(concurrency_defval) if cvals is None: - defcval = get_par_attr(parms, cvar, 'value') + defcval = get_par_attr(parms, concurrency_par, 'value') if defcval is not None: - print(f"Concurrency parameter '{cvar}' " + print(f"Concurrency parameter '{concurrency_par}' " f"is taken from '{notebook}'") - if not isinstance(defcval, (list, tuple)): - cvals = [defcval] - else: - cvals = defcval + cvals = defcval if isinstance(defcval, (list, tuple)) else [defcval] - if con_func: - func = get_notebook_function(nb, con_func) + if concurrency_func: + func = get_notebook_function(nb, concurrency_func) if func is None: - warnings.warn(f"Didn't find concurrency function {con_func} in notebook", - RuntimeWarning) + warnings.warn( + f"Didn't find concurrency function {concurrency_func} in notebook", + RuntimeWarning + ) else: df = {} exec(func, df) - f = df[con_func] + f = df[concurrency_func] import inspect sig = inspect.signature(f) - callargs = [] if cvals: # in case default needs to be used for function call - args[cvar] = cvals - for arg in sig.parameters: - callargs.append(args[arg]) + args[concurrency_par] = cvals + callargs = [args[arg] for arg in sig.parameters] cvals = f(*callargs) print(f"Split concurrency into {cvals}") # get expected type - cvtype = get_par_attr(parms, cvar, 'type', list) + cvtype = get_par_attr(parms, concurrency_par, 'type', list) cvals = remove_duplications(cvals) - jlist = [] for cnum, cval in enumerate(cvals): show_title = cnum == 0 - # Job is not final if there are dependent notebooks - if (not dep_notebooks) and cnum == (len(cvals) - 1): - finalize = create_finalize_script( - fmt_args, run_tmp_path, joblist + jlist - ) cval = [cval, ] if not isinstance(cval, list) and cvtype is list else cval jobid = concurrent_run(run_tmp_path, nb, notebook_path, args, - cvar, cval, finalize_script=finalize, + concurrency_par, cval, cluster_cores=cluster_cores, sequential=sequential, show_title=show_title, - dep_jids=joblist, + after_ok=pre_jobs, ) - jlist.append(jobid) - joblist.extend(jlist) + main_jobs.append(jobid) # Run dependent notebooks (e.g. summaries after correction) + dep_jobs = [] for i, dep_notebook in enumerate(dep_notebooks): dep_notebook_path = Path(PKG_DIR, dep_notebook) dep_nb = nbformat.read(dep_notebook_path, as_version=4) - if i == len(dep_notebooks) - 1: - finalize = create_finalize_script(fmt_args, run_tmp_path, joblist) jobid = concurrent_run(run_tmp_path, dep_nb, dep_notebook_path, args, - dep_jids=joblist, - finalize_script=finalize, + after_ok=pre_jobs, + after_any=main_jobs, cluster_cores=cluster_cores, sequential=sequential, ) - joblist.append(jobid) + dep_jobs.append(jobid) + + joblist = pre_jobs + main_jobs + dep_jobs + + joblist.append(run_finalize( + fmt_args=fmt_args, + temp_path=run_tmp_path, + job_list=joblist, + sequential=sequential, + )) - if not all([j is None for j in joblist]): + if any(j is not None for j in joblist): print("Submitted the following SLURM jobs: {}".format(",".join(joblist))) diff --git a/src/xfel_calibrate/finalize.py b/src/xfel_calibrate/finalize.py index 851296ae8498d170ca192240c92e24bd57a87208..8da388037643a3a5617c7b61ba4322fd88c6d7a3 100644 --- a/src/xfel_calibrate/finalize.py +++ b/src/xfel_calibrate/finalize.py @@ -397,7 +397,7 @@ def finalize(joblist, finaljob, run_path, out_path, project, calibration, for job in joblist: if str(job) in line: found_jobs.add(job) - if len(found_jobs) == 0: + if not found_jobs: break sleep(10) diff --git a/src/xfel_calibrate/settings.py b/src/xfel_calibrate/settings.py index 0697bb580f48ccfdc41f6d7b89bb30216f6a7749..989cd1022a47f85cb4d72af7b05754f416973ec8 100644 --- a/src/xfel_calibrate/settings.py +++ b/src/xfel_calibrate/settings.py @@ -26,3 +26,6 @@ max_reserved = 8 # is giving xcal priority by default. reservation = "" reservation_char = "darks" + +# Time limit for the finalize job (creates PDF report & moves files) +finalize_time_limit = "30:00" diff --git a/tests/test_agipdutils_ff.py b/tests/test_agipdutils_ff.py index 19947eb35494366794801bf288526bda39bf42cf..dbc06d3ad7f53af4a2ccb0d3d3fa3a8c022291b1 100644 --- a/tests/test_agipdutils_ff.py +++ b/tests/test_agipdutils_ff.py @@ -134,7 +134,7 @@ def test_set_par_limits(): set_par_limits(parameters, peak_range, peak_norm_range, peak_width_range) assert parameters.keys() == expected.keys() - for key in parameters.keys(): + for key in parameters: if isinstance(parameters[key], np.ndarray): assert np.all(parameters[key] == expected[key]) else: diff --git a/tests/test_webservice.py b/tests/test_webservice.py index 4da6d42528e5012946a2d78be94dc1be43fb2469..47c04b97d23f4ddc7454e570d8480960a04de58c 100644 --- a/tests/test_webservice.py +++ b/tests/test_webservice.py @@ -1,6 +1,8 @@ +import os import sys from pathlib import Path from unittest import mock +from webservice.messages import MigrationError import pytest from testpath import MockCommand @@ -76,14 +78,59 @@ def test_parse_config(): @pytest.mark.asyncio -async def test_wait_on_transfer(tmp_path): - mock_getfattr = MockCommand( - 'getfattr', - content="""#!{}\nprint('user.status="dCache"')""".format(sys.executable) - ) - with mock_getfattr: - res = await wait_on_transfer(str(tmp_path), max_tries=1) - assert res is True +@pytest.mark.parametrize( + "xattr_list, xattr_get, expected_result", + [ + (["user.status"], b"offline", True), + (["user.status"], b"tape", True), + (["user.status"], b"dCache", True), + ], +) +async def test_wait_on_transfer(xattr_list, xattr_get, expected_result, tmp_path): + with mock.patch.object(os, "listxattr", lambda path: xattr_list): + with mock.patch.object(os, "getxattr", lambda path, attr: xattr_get): + res = await wait_on_transfer( + str(tmp_path), + max_tries_completion=1, + max_tries_attributes=1, + sleep_attributes=1, + ) + assert res is expected_result + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "xattr_list, xattr_get, exception_match", + [ + ([], b"", r"FAILED:.*user.status.*"), + (["user.status"], b"notmigrated2d2", r"FAILED:.*notmigratedr2d2.*"), + (["user.status"], b"foobar", r"FAILED:.*unknown.*"), + ], +) +async def test_wait_on_transfer_exceptions( + xattr_list, xattr_get, exception_match, tmp_path +): + with mock.patch.object(os, "listxattr", lambda path: xattr_list): + with mock.patch.object(os, "getxattr", lambda path, attr: xattr_get): + with pytest.raises(MigrationError, match=exception_match): + await wait_on_transfer( + str(tmp_path), + max_tries_completion=1, + max_tries_attributes=1, + sleep_attributes=1, + ) + + +@pytest.mark.asyncio +async def test_wait_on_transfer_timeout(tmp_path): + with mock.patch.object(os, "listxattr", lambda path: ["user.status"]): + with mock.patch.object(os, "getxattr", lambda path, attr: b"migration_in_progress"): + with pytest.raises(MigrationError, match=r"FAILED:.*progress.*"): + await wait_on_transfer( + str(tmp_path), + max_tries_completion=1, + max_tries_attributes=1, + sleep_attributes=1, + ) @pytest.mark.asyncio diff --git a/tests/test_xfel_calibrate/conftest.py b/tests/test_xfel_calibrate/conftest.py index 61fb9c6f1fb24e61c59bc8fcfe6c66ad321c36ee..2920104b43ea83046f5b428a35fc431e0005ae48 100644 --- a/tests/test_xfel_calibrate/conftest.py +++ b/tests/test_xfel_calibrate/conftest.py @@ -37,7 +37,7 @@ class FakeProcessCalibrate(FakeProcess): self.register_subprocess(settings.free_nodes_cmd, stdout=["1"]) self.register_subprocess(settings.preempt_nodes_cmd, stdout=["1"]) self.register_subprocess( - ["sbatch", self.any()], stdout=["Submitted batch job 000000"] + ["sbatch", self.any()], stdout=["000000"] ) # For the version insertion... diff --git a/tests/test_xfel_calibrate/test_cli.py b/tests/test_xfel_calibrate/test_cli.py index 39697e8e7e82c1ec9e26952d67769c1e2e4285a2..ed02de5e71744f9340447e6581dbed04acaf1134 100644 --- a/tests/test_xfel_calibrate/test_cli.py +++ b/tests/test_xfel_calibrate/test_cli.py @@ -209,7 +209,7 @@ class TestTutorialNotebook: today = date.today() expected_equals = { - "joblist": [], + "joblist": ["000000"], "project": "Tutorial Calculation", "calibration": "Tutorial Calculation", "author": "Astrid Muennich", diff --git a/webservice/__init__.py b/webservice/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/webservice/config/__init__.py b/webservice/config/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..8eb56e99b0360810fa654873616da207e10e3f91 --- /dev/null +++ b/webservice/config/__init__.py @@ -0,0 +1,33 @@ +from pathlib import Path + +from dynaconf import Dynaconf + +config_dir = Path(__file__).parent.resolve().expanduser() +webservice_dir = config_dir.parent + +webservice = Dynaconf( + envvar_prefix="CAL_WEBSERVICE", + settings_files=[ + config_dir / "webservice.yaml", + config_dir / "webservice.secrets.yaml", + Path("~/.config/pycalibration/webservice/webservice.yaml").expanduser(), + ], + environments=False, + merge_enabled=True, + webservice_dir=webservice_dir, +) + +serve_overview = Dynaconf( + envvar_prefix="CAL_SERVE_OVERVIEW", + settings_files=[ + config_dir / "serve_overview.yaml", + config_dir / "serve_overview.secrets.yaml", + Path("~/.config/pycalibration/webservice/serve_overview.yaml").expanduser(), + ], + environments=False, + merge_enabled=True, + webservice_dir=webservice_dir, +) + +# `envvar_prefix` = export envvars with `export DYNACONF_FOO=bar`. +# `settings_files` = Load this files in the order. diff --git a/webservice/config/serve_overview.yaml b/webservice/config/serve_overview.yaml new file mode 100644 index 0000000000000000000000000000000000000000..0e9262ae5fde526454a7844655f3b1ee63bc58f6 --- /dev/null +++ b/webservice/config/serve_overview.yaml @@ -0,0 +1,33 @@ +templates: + main-doc: "@format {this.webservice_dir}/templates/main_doc.html" + maxwell-status: "@format {this.webservice_dir}/templates/maxwell_status.html" + log-output: "@format {this.webservice_dir}/templates/log_output.html" + last-characterizations: "@format {this.webservice_dir}/templates/last_characterizations.html" + last-correction: "@format {this.webservice_dir}/templates/last_correction.html" + running-jobs: "@format {this.webservice_dir}/templates/running_jobs.html" + dark-overview: "@format {this.webservice_dir}/templates/dark_overview.html" + css: "@format {this.webservice_dir}/templates/serve_overview.css" + +shell-commands: + nodes-avail-res: "sinfo --nodes=`sinfo -p exfel -t idle -N --noheader -T | grep {} | awk '{{print $6}}'` --noheader -p exfel -o %A" + total-jobs: "sinfo -p exfel -o %A --noheader" + upex-jobs: "sinfo -T --noheader | awk '{print $1}'" + upex-prefix: "upex_" + tail-log: "tail -5000 web.log" + cat-log: "cat web.log" + +run-candidates: + - "--run-high" + - "--run-med" + - "--run-low" + - "--run" + +server-config: + port: 8008 + host: max-exfl016.desy.de + dark-timeout: 30 + n-calib: 10 + +web-service: + job-db: "@format {this.webservice_dir}/webservice_jobs.sqlite" + cal-config: "@format {env[HOME]}/calibration_config/default.yaml" diff --git a/webservice/webservice.yaml b/webservice/config/webservice.yaml similarity index 68% rename from webservice/webservice.yaml rename to webservice/config/webservice.yaml index 377315d99b6d7361850480913979a67190eadda7..26d74e07c8bc2643b9f179afe7c2deec48e8946d 100644 --- a/webservice/webservice.yaml +++ b/webservice/config/webservice.yaml @@ -1,26 +1,25 @@ config-repo: - url: - local-path: /home/xcal/calibration_config/ + url: "@note add this to secrets file" + local-path: "@format {env[HOME]}/calibration_config" web-service: port: 5555 bind-to: tcp://* allowed-ips: - job-db: ./webservice_jobs.sqlite + job-db: "@format {this.webservice_dir}/webservice_jobs.sqlite" job-update-interval: 60 job-timeout: 3600 metadata-client: - user-id: - user-secret: - user-email: - metadata-web-app-url: 'https://in.xfel.eu/metadata' - metadata-web-app-url: 'https://in.xfel.eu/metadata' - token-url: 'https://in.xfel.eu/metadata/oauth/token' - refresh-url: 'https://in.xfel.eu/metadata/oauth/token' - auth-url: 'https://in.xfel.eu/metadata/oauth/authorize' - scope: '' - base-api-url: 'https://in.xfel.eu/metadata/api/' + user-id: "@note add this to secrets file" + user-secret: "@note add this to secrets file" + user-email: "@note add this to secrets file" + metadata-web-app-url: "https://in.xfel.eu/metadata" + token-url: "https://in.xfel.eu/metadata/oauth/token" + refresh-url: "https://in.xfel.eu/metadata/oauth/token" + auth-url: "https://in.xfel.eu/metadata/oauth/authorize" + scope: "" + base-api-url: "https://in.xfel.eu/metadata/api/" kafka: brokers: @@ -33,10 +32,10 @@ correct: in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run} sched-prio: 80 - cmd : > + cmd : >- python -m xfel_calibrate.calibrate {detector} CORRECT --slurm-scheduling {sched_prio} - --slurm-mem 750 + --slurm-partition upex-middle --request-time {request_time} --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs} --report-to /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/{det_instance}_{action}_{proposal}_{runs}_{time_stamp} @@ -47,10 +46,11 @@ dark: in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw out-folder: /gpfs/exfel/u/usr/{instrument}/{cycle}/p{proposal}/dark/runs_{runs} sched-prio: 10 - cmd: > + cmd: >- python -m xfel_calibrate.calibrate {detector} DARK --concurrency-par karabo_da --slurm-scheduling {sched_prio} + --slurm-partition upex-high --request-time {request_time} --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs} --report-to /gpfs/exfel/d/cal/caldb_store/xfel/reports/{instrument}/{det_instance}/{action}/{action}_{proposal}_{runs}_{time_stamp} diff --git a/webservice/listen_kafka.py b/webservice/listen_kafka.py index b0dcbb42a72841d96f6e6135bdaca11f45a8a929..4fed88fce43ee721b3ed50f2298e28b070f6c790 100644 --- a/webservice/listen_kafka.py +++ b/webservice/listen_kafka.py @@ -1,15 +1,10 @@ """Print Kafka events sent by the webservice. """ import json -import os.path as osp -import yaml from kafka import KafkaConsumer -conf_file = osp.join(osp.dirname(__file__), 'webservice.yaml') - -with open(conf_file, "r") as f: - config = yaml.safe_load(f) +from .config import webservice as config topic = config['kafka']['topic'] brokers = config['kafka']['brokers'] diff --git a/webservice/messages.py b/webservice/messages.py index fe1b1a71ce73b293a12fb09c78041c90a053029c..80bc3ddce070d8422f25237a2f53ee874bc93b99 100644 --- a/webservice/messages.py +++ b/webservice/messages.py @@ -16,6 +16,37 @@ class Errors: OTHER_ERROR = "FAILED: Error {}, please contact det-support@xfel.eu" +class MigrationError(Exception): + @classmethod + def no_user_status_xattr(cls, run_path: str): + return cls( + f"FAILED: migration issue for run `{run_path}`: `user.status` xattr not " + f"present after 5 minutes, migration may have failed, manually restarting " + f"migration may help" + ) + + @classmethod + def migration_failed(cls, run_path: str): + return cls( + f"FAILED: migration issue for run `{run_path}`: migration marked as failed " + f"(`run user.status` set to `notmigratedr2d2`), manually restarting " + f"migration may help" + ) + + @classmethod + def unknown_user_status(cls, run_path: str, status: str): + return cls( + f"FAILED: migration issue for run `{run_path}`: run has an unknown " + f"`user.status` xattr `{status}`, manually restarting migration may help" + ) + + @classmethod + def timeout(cls, run_path: str, time: str): + return cls( + f"FAILED: migration issue for run `{run_path}`: migration still marked as" + f"in progress after {time}, manually restarting migration may help" + ) + class MDC: MIGRATION_TIMEOUT = "Timeout waiting for migration. Contact it-support@xfel.eu" NOTHING_TO_DO = "Nothing to calibrate for this run, copied raw data only" diff --git a/webservice/serve_overview.py b/webservice/serve_overview.py index 71031e7bc758f2547de3b36d8647620f4c5c4028..c1e0a4f903beec6ab633494638cb13eebd59a3fe 100644 --- a/webservice/serve_overview.py +++ b/webservice/serve_overview.py @@ -5,13 +5,16 @@ import sqlite3 from collections import OrderedDict from datetime import datetime, timezone from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path from subprocess import check_output -from uuid import uuid4 +from typing import Optional import yaml from jinja2 import Template -from xfel_calibrate.settings import free_nodes_cmd, preempt_nodes_cmd, reservation +from xfel_calibrate.settings import free_nodes_cmd, preempt_nodes_cmd, reservation # noqa: E501 + +from .config import serve_overview as config class LimitedSizeDict(OrderedDict): @@ -30,7 +33,6 @@ class LimitedSizeDict(OrderedDict): self.popitem(last=False) -config = None pdf_queue = LimitedSizeDict(size_limit=50) @@ -40,7 +42,6 @@ class RequestHandler(BaseHTTPRequestHandler): def init_config(self): - global config global cal_config self.nodes_avail_res_cmd = config["shell-commands"]["nodes-avail-res"] @@ -344,11 +345,10 @@ class RequestHandler(BaseHTTPRequestHandler): return -def run(configfile, port=8008): - print('reading config file') - with open(configfile, "r") as cf: - global config - config = yaml.load(cf.read(), Loader=yaml.FullLoader) +def run(config_file: Optional[str] = None): + if config_file is not None: + config.configure(includes_for_dynaconf=[Path(config_file).absolute()]) + with open(config["web-service"]["cal-config"], "r") as cf: global cal_config cal_config = yaml.load(cf.read(), Loader=yaml.FullLoader) @@ -362,7 +362,7 @@ def run(configfile, port=8008): parser = argparse.ArgumentParser( description='Start the overview server') -parser.add_argument('--config', type=str, default="serve_overview.yaml") +parser.add_argument('--config', type=str, default=None,) if __name__ == "__main__": args = vars(parser.parse_args()) run(args["config"]) diff --git a/webservice/serve_overview.yaml b/webservice/serve_overview.yaml deleted file mode 100644 index d9de4339c3deb1f9c1a2a99f611eed24eff4d658..0000000000000000000000000000000000000000 --- a/webservice/serve_overview.yaml +++ /dev/null @@ -1,33 +0,0 @@ -templates: - main-doc: ./templates/main_doc.html - maxwell-status: ./templates/maxwell_status.html - log-output: ./templates/log_output.html - last-characterizations: ./templates/last_characterizations.html - last-correction: ./templates/last_correction.html - running-jobs: ./templates/running_jobs.html - dark-overview: ./templates/dark_overview.html - css: ./templates/serve_overview.css - -shell-commands: - nodes-avail-res: "sinfo --nodes=`sinfo -p exfel -t idle -N --noheader -T | grep {} | awk '{{print $6}}'` --noheader -p exfel -o %A" - total-jobs: "sinfo -p exfel -o %A --noheader" - upex-jobs: "sinfo -T --noheader | awk '{print $1}'" - upex-prefix: "upex_" - tail-log: "tail -5000 web.log" - cat-log: "cat web.log" - -run-candidates: - - "--run-high" - - "--run-med" - - "--run-low" - - "--run" - -server-config: - port: 8008 - host: max-exfl016.desy.de - dark-timeout: 30 - n-calib: 10 - -web-service: - job-db: ./webservice_jobs.sqlite - cal-config: /home/xcal/calibration_config/default.yaml diff --git a/webservice/update_mdc.py b/webservice/update_mdc.py index 38231ee4c9074fab44228b01c421bf75e2358a30..fd68354adaeb3ed3565b032709acf6c3ccb119ed 100644 --- a/webservice/update_mdc.py +++ b/webservice/update_mdc.py @@ -1,13 +1,15 @@ import argparse -import os.path as osp +from pathlib import Path -import yaml from metadata_client.metadata_client import MetadataClient +from .config import webservice as config + parser = argparse.ArgumentParser( description='Update run status at MDC for a given run id.') +# TODO: unify configuration argument names across the project parser.add_argument('--conf-file', type=str, help='Path to webservice config', - default=osp.join(osp.dirname(__file__), 'webservice.yaml')) + default=None) parser.add_argument('--flg', type=str, choices=["NA", "R", "A"], required=True, help='Status flag for MDC request: NA - not available, R - running, A - available.') # noqa parser.add_argument('--rid', type=int, help='Run id from MDC') @@ -20,8 +22,8 @@ rid = args['rid'] flg = args['flg'] msg = args['msg'] -with open(conf_file, "r") as f: - config = yaml.load(f.read(), Loader=yaml.FullLoader) +if conf_file is not None: + config.configure(includes_for_dynaconf=[Path(conf_file).absolute()]) mdconf = config['metadata-client'] client_conn = MetadataClient(client_id=mdconf['user-id'], diff --git a/webservice/webservice.py b/webservice/webservice.py index 836829118b8d9c14eeaa376711d22e6192050829..293504c3cac4623d7091c13a232bba24f8259ef8 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -9,6 +9,7 @@ import json import locale import logging import os +import re import sqlite3 import sys import time @@ -29,10 +30,12 @@ from kafka import KafkaProducer from kafka.errors import KafkaError from metadata_client.metadata_client import MetadataClient +from .config import webservice as config + try: - from .messages import MDC, Errors, Success + from .messages import MDC, Errors, Success, MigrationError except ImportError: - from messages import MDC, Errors, Success + from messages import MDC, Errors, Success, MigrationError def init_job_db(config): @@ -445,6 +448,12 @@ def update_job_db(config): time.sleep(time_interval) +# Do not copy over files of big detectors when they are not being +# corrected. +copy_blocklist_pattern = re.compile( + r'\S*RAW-R\d{4}-(AGIPD|LPD|DSSC)\d{2}-S\d{5}.h5$') + + async def copy_untouched_files(file_list): """ Copy those files which are not touched by the calibration to the output directory. @@ -454,6 +463,9 @@ async def copy_untouched_files(file_list): Copying is done via an asyncio subprocess call """ for f in file_list: + if copy_blocklist_pattern.match(f): + continue + of = f.replace("raw", "proc").replace("RAW", "CORR") cmd = ["rsync", "-av", f, of] await asyncio.subprocess.create_subprocess_shell(" ".join(cmd)) @@ -511,41 +523,82 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: return message -async def wait_on_transfer(rpath, max_tries=300) -> bool: +async def wait_on_transfer( + run_path: str, + max_tries_completion: int = 300, + max_tries_attributes: int = 5, + sleep_completion: int = 10, + sleep_attributes: int = 60, +) -> bool: """ Wait on data files to be transferred to Maxwell - :param rpath: Folder, which contains data files migrated to Maxwell - :param max_tries: Maximum number of checks if files are transferred + :param run_path: Folder, which contains data files migrated to Maxwell + :param max_tries_completion: Maximum number of tries to check for migration completed attribute + :param max_tries_attributes: Maximum number of tries to check for migration attributes being present + :param sleep_completion: Sleep time between checks for migration completed attribute + :param sleep_attributes: Sleep time between checks for migration attributes being present :return: True if files are transferred """ # TODO: Make use of MyMDC to request whether the run has been copied. # It is not sufficient to know that the files are on disk, but also to # check the copy is finished (ie. that the files are complete). - if 'pnfs' in os.path.realpath(rpath): + if 'pnfs' in os.path.realpath(run_path): return True - tries = 0 + + tries_for_completion = 0 + tries_for_attributes = 0 # FIXME: if not kafka, then do event-driven, no sleep # wait until folder gets created - while not os.path.exists(rpath): - if tries > max_tries: + while not os.path.exists(run_path): + if tries_for_completion > max_tries_completion: return False - tries += 1 - await asyncio.sleep(10) + tries_for_completion += 1 + await asyncio.sleep(sleep_completion) # FIXME: if not kafka, then do event-driven, no sleep # wait until files are migrated while True: - retcode, stdout = await run_proc_async([ - "getfattr", "-n", "user.status", rpath - ]) - if retcode == 0 and 'status="online"' not in stdout.decode().lower(): + # TODO: add test case for migration issues/missing attribute handling + if "user.status" not in os.listxattr(run_path): + if tries_for_attributes >= max_tries_attributes: + logging.critical( + "`status` attribute missing after max tries for migration reached. " + "Migration may have failed, try triggering migration manually again." + ) + # TODO: automatically re-trigger migration...? + raise MigrationError.no_user_status_xattr(run_path) + + tries_for_attributes += 1 + logging.warning( + f"`status` attribute missing, migration may have failed, on attempt " + f"{tries_for_attributes}/{max_tries_attributes}" + ) + + # Wait for a longer time if the attributes are missing + await asyncio.sleep(sleep_attributes) + continue + + user_status = os.getxattr(run_path, "user.status").decode().lower() + logging.debug(f"{run_path}: {user_status=}") # TODO: use `trace` instead of `debug` w/ loguru + if user_status in ["dcache", "tape", "offline"]: return True - if tries > max_tries: - return False - tries += 1 - await asyncio.sleep(10) + elif user_status == "notmigrated2d2": + logging.critical(f"Migration failed for {run_path}") + raise MigrationError.migration_failed(run_path) + elif user_status != "migration_in_progress": + logging.critical("Unknown status: {user_status}") + if tries_for_completion > max_tries_completion: + raise MigrationError.unknown_user_status(run_path, user_status) + + if tries_for_completion > max_tries_completion: + raise MigrationError.timeout( + run_path, f"{tries_for_completion*sleep_completion}s" + ) + + tries_for_completion += 1 + await asyncio.sleep(sleep_completion) async def wait_transfers( @@ -854,6 +907,10 @@ class ActionsServer: detectors[karabo_id] = thisconf copy_file_list = copy_file_list.difference(corr_file_list) asyncio.ensure_future(copy_untouched_files(copy_file_list)) + except MigrationError as e: + logging.error("Migration issue", exc_info=e) + await update_mdc_status(self.mdc, 'correct', rid, str(e)) + return except Exception as corr_e: logging.error("Error during correction", exc_info=corr_e) await update_mdc_status(self.mdc, 'correct', rid, @@ -936,15 +993,19 @@ class ActionsServer: async def _continue(): """Runs in the background after we reply to the 'dark_request' request""" await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg) - - transfer_complete = await wait_transfers( - runs, in_folder, proposal - ) - if not transfer_complete: - # Timed out - await update_mdc_status( - self.mdc, 'dark_request', rid, MDC.MIGRATION_TIMEOUT + try: + transfer_complete = await wait_transfers( + runs, in_folder, proposal ) + if not transfer_complete: + # Timed out + await update_mdc_status( + self.mdc, 'dark_request', rid, MDC.MIGRATION_TIMEOUT + ) + return + except MigrationError as e: + logging.error("Migration issue", exc_info=e) + await update_mdc_status(self.mdc, 'dark_request', rid, str(e)) return # Notebooks require one or three runs, depending on the @@ -1113,11 +1174,11 @@ def main(argv: Optional[List[str]] = None): parser = argparse.ArgumentParser( description='Start the calibration webservice' ) - parser.add_argument('--config-file', type=str, default='./webservice.yaml') - parser.add_argument('--mode', type=str, default="sim", choices=['sim', 'prod']) + parser.add_argument('--config-file', type=str, default=None) + parser.add_argument('--mode', type=str, default="sim", choices=['sim', 'prod']) # noqa parser.add_argument('--log-file', type=str, default='./web.log') parser.add_argument( - '--log-level', type=str, default="INFO", choices=['INFO', 'DEBUG', 'ERROR'] + '--log-level', type=str, default="INFO", choices=['INFO', 'DEBUG', 'ERROR'] # noqa ) args = parser.parse_args(argv) @@ -1126,10 +1187,10 @@ def main(argv: Optional[List[str]] = None): log_level = args.log_level mode = args.mode - with open(config_file, "r") as f: - config = yaml.safe_load(f.read()) + if config_file is not None: + config.configure(includes_for_dynaconf=[Path(config_file).absolute()]) - fmt = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] %(message)s' + fmt = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] %(message)s' # noqa logging.basicConfig( filename=log_file, level=getattr(logging, log_level),