Skip to content
Snippets Groups Projects
Commit 6db6d922 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Merge branch 'feat/webservice-repeat' into 'master'

[webservice] Support requests to repeat correction from myMdC

See merge request detectors/pycalibration!695
parents 3a015976 d97fe8e9
No related branches found
No related tags found
1 merge request!695[webservice] Support requests to repeat correction from myMdC
......@@ -110,7 +110,7 @@ def main(argv=None):
"This requires the pyenv command to be available."
)
ap.add_argument("--out-folder", help="Directory to put output data")
ap.add_argument("--report-path", help="Location to save PDF report")
ap.add_argument("--report-to", help="Location to save PDF report")
ap.add_argument("--slurm-partition", help="Submit jobs in this Slurm partition")
ap.add_argument("--slurm-mem", type=int, help="Requested node RAM in GB")
ap.add_argument('--no-cluster-job', action="store_true",
......@@ -140,10 +140,10 @@ def main(argv=None):
update_notebooks_params(working_dir, params_to_set)
if args.report_path:
report_path = args.report_path
report_to = args.report_to
else: # Default to saving report in output folder
report_path = str(Path(out_folder, f'xfel-calibrate-repeat-{run_uuid}'))
cal_metadata['report-path'] = report_path
report_to = str(Path(out_folder, f'xfel-calibrate-repeat-{run_uuid}'))
cal_metadata['report-path'] = f'{report_to}.pdf'
cal_metadata.save()
......@@ -167,7 +167,7 @@ def main(argv=None):
fmt_args = {'run_path': working_dir,
'out_path': out_folder,
'version': get_pycalib_version(),
'report_to': report_path,
'report_to': report_to,
'in_folder': parameters['in-folder'],
'request_time': start_time.strftime('%Y-%m-%dT%H:%M:%S'),
'submission_time': start_time.strftime('%Y-%m-%dT%H:%M:%S'),
......
......@@ -31,6 +31,7 @@ kafka:
correct:
in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw
out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run}
reports-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/
commissioning-penalty: 1250
job-penalty: 2
cmd : >-
......
"""Send a request to repeat previous corrections.
The repeat mechanism is meant for if corrected data has been deleted,
but this script can also be used for testing.
"""
import argparse
from glob import glob
import zmq
parser = argparse.ArgumentParser(description='Request repeat correction.')
parser.add_argument('proposal', type=int, help='The proposal number')
parser.add_argument('run', type=int, help='The run number')
parser.add_argument('--mymdc-id', type=int, default=0, help="Run ID in myMdC")
parser.add_argument('--endpoint', default='tcp://max-exfl016:5555',
help="The ZMQ endpoint to connect to (max-exfl017 for testing)")
args = parser.parse_args()
prop_dir = glob('/gpfs/exfel/exp/*/*/p{:06d}'.format(args.proposal))[0]
instrument, cycle = prop_dir.split('/')[4:6]
con = zmq.Context()
socket = con.socket(zmq.REQ)
con = socket.connect(args.endpoint)
parm_list = ["recorrect", str(args.mymdc_id), instrument, cycle,
f'{args.proposal:06d}', str(args.run)]
socket.send(repr(parm_list).encode())
resp = socket.recv()
print(resp.decode())
......@@ -14,6 +14,7 @@ import sys
import urllib.parse
from asyncio import get_event_loop, shield
from datetime import datetime, timezone
from getpass import getuser
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
......@@ -786,6 +787,7 @@ class ActionsServer:
accepted_actions = {
'correct',
'recorrect',
'dark_request',
'query-rid',
'upload-yaml',
......@@ -933,6 +935,104 @@ class ActionsServer:
return queued_msg.encode()
async def handle_recorrect(self, rid, instrument, cycle, proposal, runnr):
request_time = datetime.now()
try:
with self.job_db:
cur = self.job_db.execute(
"INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)",
(rid, proposal, int(runnr), request_time.strftime('%Y-%m-%dT%H:%M:%S'))
)
req_id = cur.lastrowid
reports_dir = Path(self.config['correct']['reports-folder'].format(
instrument=instrument, cycle=cycle, proposal=proposal, runs=f"r{runnr}"
))
mddirs_by_krb_id = {}
for calmeta_pth in sorted(reports_dir.glob('*/calibration_metadata.yml')):
with calmeta_pth.open('r', encoding='utf-8') as f:
calmeta = yaml.safe_load(f)
try:
prior_request_time = calmeta["runtime-summary"]\
["pipeline-steps"]["request-time"]
karabo_id = calmeta["calibration-configurations"]["karabo-id"]
except KeyError:
logging.warning("Did not find expected metadata in %s",
calmeta_pth, exc_info=True)
else:
mddirs_by_krb_id.setdefault(karabo_id, []).append(
(calmeta_pth.parent, prior_request_time)
)
logging.info("Found %d corrections to re-run for p%s r%s: %s",
len(mddirs_by_krb_id), proposal, runnr, list(mddirs_by_krb_id))
except Exception as e:
msg = Errors.JOB_LAUNCH_FAILED.format('correct', e)
logging.error(msg, exc_info=e)
asyncio.ensure_future(
update_mdc_status(self.mdc, 'correct', rid, msg)
)
return msg.encode()
queued_msg = Success.QUEUED.format(proposal, [runnr])
logging.debug(queued_msg)
async def _continue():
"""Runs in the background after we reply to the ZMQ request"""
if len(mddirs_by_krb_id) == 0:
in_folder = self.config['correct']['in-folder'].format(
instrument=instrument, cycle=cycle, proposal=proposal)
rpath = os.path.join(in_folder, f"r{int(runnr):04d}/")
msg = Errors.NOTHING_TO_DO.format(rpath)
logging.warning(msg)
await update_mdc_status(self.mdc, 'correct', rid, msg)
return
await update_mdc_status(self.mdc, 'correct', rid, queued_msg)
ret = []
for karabo_id, mddirs in mddirs_by_krb_id.items():
# Select the latest metadata directory - with the highest request
# time - to re-run for each detector (karabo_id)
mddir, _ = max(mddirs, key=lambda p: p[1])
logging.info("Repeating correction for %s from %s", karabo_id, mddir)
cmd = [
'python', '-m', 'xfel_calibrate.repeat', str(mddir),
'--env-cache',
f'/gpfs/exfel/data/scratch/{getuser()}/calib-repeat-envs',
'--report-to',
f'{reports_dir}/{karabo_id}_RECORRECT_{request_time:%y%m%d_%H%M%S}'
]
with self.job_db:
cur = self.job_db.execute(
"INSERT INTO executions VALUES (NULL, ?, ?, NULL, ?, NULL)",
(req_id, shlex.join(cmd), karabo_id)
)
exec_id = cur.lastrowid
ret.append(await run_action(
self.job_db, cmd, self.mode,
proposal, runnr, exec_id
))
await update_mdc_status(self.mdc, 'correct', rid, ", ".join(ret))
await get_event_loop().run_in_executor(
None, self.mdc.update_run_api,
rid, {'cal_last_begin_at': datetime.now(tz=timezone.utc).isoformat()}
)
# END of part to run after sending reply
asyncio.ensure_future(_continue())
return queued_msg.encode()
async def handle_dark_request(
self, rid, _sase, instrument, cycle, proposal, karabo_id,
operation_mode, *extra
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment