Skip to content
Snippets Groups Projects
Commit ead040bf authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Initial skeleton for repeat action in webservice

parent a91966a3
No related branches found
No related tags found
1 merge request!695[webservice] Support requests to repeat correction from myMdC
...@@ -31,6 +31,7 @@ kafka: ...@@ -31,6 +31,7 @@ kafka:
correct: correct:
in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw
out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run} out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run}
reports-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/
commissioning-penalty: 1250 commissioning-penalty: 1250
job-penalty: 2 job-penalty: 2
cmd : >- cmd : >-
......
...@@ -786,6 +786,7 @@ class ActionsServer: ...@@ -786,6 +786,7 @@ class ActionsServer:
accepted_actions = { accepted_actions = {
'correct', 'correct',
'repeat',
'dark_request', 'dark_request',
'query-rid', 'query-rid',
'upload-yaml', 'upload-yaml',
...@@ -933,6 +934,43 @@ class ActionsServer: ...@@ -933,6 +934,43 @@ class ActionsServer:
return queued_msg.encode() return queued_msg.encode()
async def handle_repeat(self, rid, instrument, cycle, proposal, runnr):
reports_dir = Path(self.config['correct']['reports-folder'].format(
instrument=instrument, cycle=cycle, proposal=proposal, runs=runnr
))
mddirs_by_krb_id = {}
for calmeta_pth in sorted(reports_dir.glob('*/calibration_metadata.yml')):
with calmeta_pth.open('r', encoding='utf-8') as f:
calmeta = yaml.safe_load(f)
try:
request_time = calmeta["runtime-summary"]["pipeline-steps"]["request-time"]
karabo_id = calmeta["calibration-configurations"]["karabo-id"]
except KeyError:
logging.warning("Did not find expected metadata in %s",
calmeta_pth, exc_info=True)
else:
mddirs_by_krb_id.setdefault(karabo_id, []).append(
(calmeta_pth.parent, request_time)
)
logging.info("Found %d corrections to re-run for p%s r%s: %s",
len(mddirs_by_krb_id), proposal, runnr, list(mddirs_by_krb_id))
for karabo_id, mddirs in mddirs_by_krb_id.items():
# Select the latest metadata directory - with the highest request
# time - to re-run for each detector (karabo_id)
mddir, _ = max(mddirs, key=lambda p: p[1])
logging.info("Repeating correction for %s from %s", karabo_id, mddir)
cmd = ['python', '-m', 'xfel_calibrate.repeat', mddir]
await run_action(
self.job_db, cmd, self.mode,
proposal, runnr, rid
)
async def handle_dark_request( async def handle_dark_request(
self, rid, _sase, instrument, cycle, proposal, karabo_id, self, rid, _sase, instrument, cycle, proposal, karabo_id,
operation_mode, *extra operation_mode, *extra
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment