Skip to content
Snippets Groups Projects
Commit 4b023e7c authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Merge branch 'calparrot' into 'master'

Store CalCat requests/responses for reproducibility

See merge request detectors/pycalibration!767
parents 2a167024 fefce33e
No related branches found
No related tags found
1 merge request!767Store CalCat requests/responses for reproducibility
......@@ -43,9 +43,14 @@ then
sleep 15
fi
echo "Running notebook"
${python_path} -m princess ${nb_path} --save
if [ "$caltype" == "CORRECT" ]
then
# calparrot stores and repeats calcat queries
${python_path} -m calparrot -- ${python_path} -m princess ${nb_path} --save
else
${python_path} -m princess ${nb_path} --save
fi
# stop the cluster if requested
if [ "${ipcluster_profile}" != "NO_CLUSTER" ]
......
%% Cell type:markdown id: tags:
# LPD Retrieving Constants Pre-correction #
Author: European XFEL Detector Group, Version: 1.0
The following notebook provides a constants metadata in a YAML file to use while correcting LPD images.
%% Cell type:code id: tags:
``` python
# Input parameters
in_folder = "/gpfs/exfel/exp/FXE/202201/p003073/raw/" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/ahmedk/test/remove/LPD_test" # the folder to output to, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.
modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty
karabo_da = [''] # Data aggregators names to correct, use [''] for all
run = 10 # run to process, required
# Source parameters
karabo_id = 'FXE_DET_LPD1M-1' # Karabo domain for detector.
input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source.
# CalCat parameters
creation_time = "" # The timestamp to use with Calibration DB. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
# Operating conditions
mem_cells = 512 # Memory cells, LPD constants are always taken with 512 cells.
bias_voltage = 250.0 # Detector bias voltage.
capacitor = '5pF' # Capacitor setting: 5pF or 50pF
photon_energy = 9.2 # Photon energy in keV.
category = 0 # Whom to blame.
use_cell_order = False # Whether to use memory cell order as a detector condition (not stored for older constants)
```
%% Cell type:code id: tags:
``` python
from pathlib import Path
from time import perf_counter
import numpy as np
from calibration_client import CalibrationClient
from calibration_client.modules import CalibrationConstantVersion
import extra_data as xd
from cal_tools.lpdlib import get_mem_cell_order
from cal_tools.tools import (
CalibrationMetadata,
calcat_creation_time,
save_constant_metadata,
)
from cal_tools.restful_config import restful_config
```
%% Cell type:code id: tags:
``` python
out_folder = Path(out_folder)
out_folder.mkdir(exist_ok=True)
metadata = CalibrationMetadata(metadata_folder or out_folder)
# Constant paths & timestamps are saved under retrieved-constants in calibration_metadata.yml
retrieved_constants = metadata.setdefault("retrieved-constants", {})
creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f'Using {creation_time.isoformat()} as creation time')
# Pick all modules/aggregators or those selected.
if not karabo_da or karabo_da == ['']:
if not modules or modules == [-1]:
modules = list(range(16))
karabo_da = [f'LPD{i:02d}' for i in modules]
# List of detector sources.
det_inp_sources = [input_source.format(karabo_id=karabo_id, module_index=int(da[-2:])) for da in karabo_da]
```
%% Cell type:code id: tags:
``` python
# Connect to CalCat.
calcat_config = restful_config['calcat']
client = CalibrationClient(
base_api_url=calcat_config['base-api-url'],
use_oauth2=calcat_config['use-oauth2'],
client_id=calcat_config['user-id'],
client_secret=calcat_config['user-secret'],
user_email=calcat_config['user-email'],
token_url=calcat_config['token-url'],
refresh_url=calcat_config['refresh-url'],
auth_url=calcat_config['auth-url'],
scope='')
```
%% Cell type:code id: tags:
``` python
dark_calibrations = {
1: 'Offset',
14: 'BadPixelsDark',
}
base_condition = [
dict(parameter_id=1, value=bias_voltage), # Sensor bias voltage
dict(parameter_id=7, value=mem_cells), # Memory cells
dict(parameter_id=15, value=capacitor), # Feedback capacitor
dict(parameter_id=13, value=256), # Pixels X
dict(parameter_id=14, value=256), # Pixels Y
]
if use_cell_order:
# Read the order of memory cells used
raw_data = xd.RunDirectory(Path(in_folder, f'r{run:04d}'))
cell_ids_pattern_s = get_mem_cell_order(raw_data, det_inp_sources)
print("Memory cells order:", cell_ids_pattern_s)
dark_condition = base_condition + [
dict(parameter_id=30, value=cell_ids_pattern_s), # Memory cell order
]
else:
dark_condition = base_condition.copy()
illuminated_calibrations = {
20: 'BadPixelsFF',
42: 'GainAmpMap',
43: 'FFMap',
44: 'RelativeGain',
}
illuminated_condition = base_condition + [
dict(parameter_id=3, value=photon_energy), # Source energy
dict(parameter_id=25, value=category) # category
]
```
%% Cell type:code id: tags:
``` python
const_data = {}
print('Querying calibration database', end='', flush=True)
start = perf_counter()
for k_da in karabo_da:
pdu = None
if k_da in retrieved_constants:
print(f"Constant for {k_da} already in {metadata.filename}, won't query again.") # noqa
continue
retrieved_constants[k_da] = dict()
const_mdata = retrieved_constants[k_da]["constants"] = dict()
for calibrations, condition in [
(dark_calibrations, dark_condition),
(illuminated_calibrations, illuminated_condition)
]:
resp = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions(
client, karabo_id, list(calibrations.keys()),
{'parameters_conditions_attributes': condition},
karabo_da=k_da, event_at=creation_time.isoformat())
if not resp["success"]:
print(f"ERROR: Constants {list(calibrations.values())} "
f"were not retrieved, {resp['app_info']}")
for cname in calibrations.values():
const_mdata[cname] = dict()
const_mdata[cname]["file-path"] = None
const_mdata[cname]["dataset-name"] = None
const_mdata[cname]["creation-time"] = None
continue
for ccv in resp["data"]:
cc = ccv['calibration_constant']
cname = calibrations[cc['calibration_id']]
const_mdata[cname] = dict()
const_mdata[cname]["file-path"] = str(Path(ccv['path_to_file']) / ccv['file_name'])
const_mdata[cname]["dataset-name"] = ccv['data_set_name']
const_mdata[cname]["creation-time"] = ccv['begin_at']
pdu = ccv['physical_detector_unit']['physical_name']
print('.', end='', flush=True)
retrieved_constants[k_da]["physical-detector-unit"] = pdu
metadata.save()
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
print(f"Stored retrieved constants in {metadata.filename}")
```
......
......@@ -110,6 +110,7 @@ if "readthedocs.org" not in sys.executable:
install_requires += [
"iCalibrationDB @ git+ssh://git@git.xfel.eu:10022/detectors/cal_db_interactive.git@2.4.0", # noqa
"XFELDetectorAnalysis @ git+ssh://git@git.xfel.eu:10022/karaboDevices/pyDetLib.git@2.7.0", # noqa
"CalParrot @ git+ssh://git@git.xfel.eu:10022/calibration/calparrot.git@0.1", # noqa
]
setup(
......
......@@ -385,11 +385,14 @@ class JobArgs:
"""Run this job in a local process, return exit status"""
return call(self.format_cmd(python), cwd=work_dir)
def submit_job(self, work_dir, python, slurm_opts, after_ok=(), after_any=()):
def submit_job(
self, work_dir, python, slurm_opts, after_ok=(), after_any=(), env=None
):
"""Submit this job to Slurm, return its job ID"""
cmd = slurm_opts.get_launcher_command(work_dir, after_ok, after_any)
cmd += self.format_cmd(python)
output = check_output(cmd, cwd=work_dir).decode('utf-8')
# sbatch propagates environment variables into the job by default
output = check_output(cmd, cwd=work_dir, env=env).decode('utf-8')
return output.partition(';')[0].strip() # job ID
......@@ -440,7 +443,7 @@ class JobChain:
'steps': [step.to_dict() for step in self.steps]
}, f, indent=2)
def submit_jobs(self, slurm_opts: SlurmOptions):
def submit_jobs(self, slurm_opts: SlurmOptions, env=None):
"""Submit these jobs to Slurm, return a list of job IDs
Slurm dependencies are used to manage the sequence of jobs.
......@@ -453,7 +456,9 @@ class JobChain:
step_job_ids = []
kw = {('after_any' if step.after_error else 'after_ok'): dep_job_ids}
for job_desc in step.jobs:
jid = job_desc.submit_job(self.work_dir, self.python, slurm_opts, **kw)
jid = job_desc.submit_job(
self.work_dir, self.python, slurm_opts, env=env, **kw
)
step_job_ids.append(jid)
dep_job_ids = step_job_ids
all_job_ids.extend(step_job_ids)
......
......@@ -135,12 +135,12 @@ def main(argv=None):
out_folder = parameters['out-folder']
params_to_set = {'metadata_folder': "."}
if args.out_folder:
out_folder = parameters['out-folder'] = args.out_folder
out_folder = parameters['out-folder'] = os.path.abspath(args.out_folder)
params_to_set['out_folder'] = out_folder
update_notebooks_params(cal_work_dir, params_to_set)
if args.report_to:
report_to = args.report_to
report_to = os.path.abspath(args.report_to)
else: # Default to saving report in output folder
report_to = str(Path(out_folder, f'xfel-calibrate-repeat-{run_uuid}'))
cal_metadata['report-path'] = f'{report_to}.pdf'
......@@ -159,12 +159,17 @@ def main(argv=None):
job_chain.run_direct()
joblist = []
else:
# The queries to look up constants should all be the same as those
# from the previous calibration - tell CalParrot to warn if not.
env = os.environ.copy()
env['CALPARROT_NEW_QUERY'] = 'warn'
joblist = job_chain.submit_jobs(SlurmOptions(
partition=args.slurm_partition,
mem=args.slurm_mem,
))
), env=env)
fmt_args = {'run_path': cal_work_dir,
fmt_args = {'cal_work_dir': cal_work_dir,
'out_path': out_folder,
'version': get_pycalib_version(),
'report_to': report_to,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment