Skip to content
Snippets Groups Projects
Commit 7a0ed2b5 authored by Steffen Hauf's avatar Steffen Hauf
Browse files

Add dark run handling

parent 10ea22b7
No related branches found
No related tags found
1 merge request!47Fixes from webservices
...@@ -16,10 +16,9 @@ import zmq.asyncio ...@@ -16,10 +16,9 @@ import zmq.asyncio
import zmq.auth.thread import zmq.auth.thread
from dateutil import parser as timeparser from dateutil import parser as timeparser
from git import Repo, InvalidGitRepositoryError from git import Repo, InvalidGitRepositoryError
from messages import Errors, Success
from metadata_client.metadata_client import MetadataClient from metadata_client.metadata_client import MetadataClient
from messages import Errors, MDC, Success
async def init_job_db(config): async def init_job_db(config):
""" Initialize the sqlite job database """ Initialize the sqlite job database
...@@ -337,73 +336,158 @@ async def server_runner(config, mode): ...@@ -337,73 +336,158 @@ async def server_runner(config, mode):
action, payload = response[0], response[1:] action, payload = response[0], response[1:]
if action not in ["correct", if action not in ["correct", 'dark',
'upload-yaml']: # only handle known actions 'upload-yaml']: # only handle known actions
logging.warn(Errors.UNKNOWN_ACTION.format(action)) logging.warn(Errors.UNKNOWN_ACTION.format(action))
socket.send(Errors.UNKNOWN_ACTION.format(action).encode()) socket.send(Errors.UNKNOWN_ACTION.format(action).encode())
continue continue
async def do_action(): async def do_action():
if action == 'correct': in_folder = None
rid, sase, instrument, cycle, proposal, runnr = payload out_folder = None
run_mapping = {}
if action in ['dark', 'correct']:
wait_runs = []
if action == 'correct':
rid, sase, instrument, cycle, proposal, runnr = payload
wait_runs = [runnr]
if action == 'dark':
rid, sase, instrument, cycle, proposal = payload[:5]
runs = payload[5:] # can be many
for i, run in enumerate(runs):
erun = eval(run)
if isinstance(erun, (list, tuple)):
typ, runnr = erun
run_mapping[typ] = runnr
wait_runs.append(runnr)
else:
run_mapping['no_mapping_{}'.format(i)] = erun
wait_runs.append(erun)
specific_conf_file = "{}/{}/{}.yaml".format( specific_conf_file = "{}/{}/{}.yaml".format(
config['config-repo']['local-path'], cycle, proposal) config['config-repo']['local-path'], cycle, proposal)
if os.path.exists(specific_conf_file): if os.path.exists(specific_conf_file):
with open(specific_conf_file, "r") as f: with open(specific_conf_file, "r") as f:
pconf = yaml.load(f.read()) pconf = yaml.load(f.read())[action]
else: else:
print("Using default file, as {} does not exist".format(
specific_conf_file))
default_file = "{}/default.yaml".format( default_file = "{}/default.yaml".format(
config['config-repo']['local-path']) config['config-repo']['local-path'])
with open(default_file, "r") as f: with open(default_file, "r") as f:
pconf = yaml.load(f.read()) pconf = yaml.load(f.read())[action]
in_folder = config['correct']['in-folder'].format( in_folder = config['correct']['in-folder'].format(
instrument=instrument, cycle=cycle, proposal=proposal) instrument=instrument, cycle=cycle, proposal=proposal)
out_folder = config['correct']['out-folder'].format( out_folder = config['correct']['out-folder'].format(
instrument=instrument, cycle=cycle, proposal=proposal) instrument=instrument, cycle=cycle, proposal=proposal)
detectors = {}
rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) msg = "Queued proposal {}, run {} for offline calibration".format(
proposal, ", ".join(wait_runs))
async def wait_on_transfer():
rstr = None
ret = None
max_tries = 60 # 600s
tries = 0
while not os.path.exists(rpath):
await asyncio.sleep(10)
# await asyncio.sleep(1)
while (rstr is None or 'status="online"' in rstr
or ret.returncode != 0):
ret = subprocess.run(
["getfattr", "-n", "user.status", rpath],
stdout=subprocess.PIPE)
rstr = ret.stdout.decode()
await asyncio.sleep(10)
if tries > max_tries:
return False
tries += 1
return ret.returncode == 0
msg = Success.QUEUED.format(proposal, runnr)
socket.send(msg.encode()) socket.send(msg.encode())
transfer_complete = await wait_on_transfer() all_transfers = []
if not transfer_complete: for runnr in wait_runs:
logging.error( rpath = "{}/r{:04d}/".format(in_folder, int(runnr))
Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr))
msg = MDC.MIGRATION_TIMEOUT async def wait_on_transfer():
response = mdc.update_run_api(rid, rstr = None
{'flg_cal_data_status': 'NA', ret = None
'cal_pipeline_reply': msg}) max_tries = 60 # 600s
if response.status_code != 200: tries = 0
logging.error(Errors.MDC_RESPONSE.format(response)) while not os.path.exists(rpath):
await asyncio.sleep(10)
# await asyncio.sleep(1)
while rstr is None or 'status="online"' in rstr or ret.returncode != 0:
print(" ".join(
["getfattr", "-n", "user.status", rpath]))
ret = subprocess.run(
["getfattr", "-n", "user.status", rpath],
stdout=subprocess.PIPE)
rstr = ret.stdout.decode()
await asyncio.sleep(10)
if tries > max_tries:
return False
tries += 1
return ret.returncode == 0
transfer_complete = await wait_on_transfer()
all_transfers.append(transfer_complete)
if not transfer_complete:
logging.error(
Errors.TRANSFER_EVAL_FAILED.format(proposal,
runnr))
msg = "Timeout waiting for migration. Contact det-support@xfel.eu"
response = mdc.update_run_api(rid, {
'flg_cal_data_status': 'NA',
'cal_pipeline_reply': msg})
if response.status_code != 200:
logging.error(Errors.MDC_RESPONSE.format(response))
if not all(all_transfers):
logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal,
",".join(
wait_runs)))
return
if action == 'dark':
status = []
detectors = {}
for runnr in wait_runs:
rpath = "{}/r{:04d}/".format(in_folder, int(runnr))
if len(detectors) == 0:
logging.warn(Errors.NOTHING_TO_DO.format(rpath))
msg = "Nothing to characterize for these runs"
response = mdc.update_run_api(rid, {
'flg_cal_data_status': 'NA',
'cal_pipeline_reply': msg})
if response.status_code != 200:
logging.error(Errors.MDC_RESPONSE.format(response))
for detector, dconfig in pconf[instrument].items():
# check if we find files according to mapping in raw run folder
fl = glob.glob(
"{}/RAW-*{}*.h5".format(rpath, dconfig["inset"]))
if len(fl):
corr_file_list = corr_file_list.union(set(fl))
thisconf = copy.copy(dconfig)
thisconf["in-folder"] = in_folder
thisconf["out-folder"] = out_folder
del thisconf[
"inset"] # don't need this for xfel-calibrate
detectors[detector] = thisconf
for detector, dconfig in detectors.items():
cmd = ["python", "-m", "xfel_calibrate.calibrate",
detector, "DARK"]
run_config = []
for typ, run in run_mapping.items():
if "no_mapping" in type:
run_config.append(run)
else:
dconfig[typ] = run
if len(run_config):
dconfig["runs"] = ",".join(run_config)
for key, value in dconfig.items():
if not isinstance(value, bool):
cmd += ["--{}".format(key), str(value)]
else:
cmd += ["--{}".format(key)]
ret = await run_correction(job_db, cmd, mode, proposal,
runnr,
dconfig["in-folder"], rid)
status.append(ret)
if action == 'correct':
rpath = "{}/r{:04d}/".format(in_folder, int(wait_runs[0]))
corr_file_list = set() corr_file_list = set()
copy_file_list = set(glob.glob("{}/*.h5".format(rpath))) copy_file_list = set(glob.glob("{}/*.h5".format(rpath)))
for detector, dconfig in pconf[instrument].items(): for detector, dconfig in pconf[instrument].items():
# check if we find files according to # check if we find files according to mapping in raw run folder
# mapping in raw run folder
fl = glob.glob( fl = glob.glob(
"{}/RAW-*{}*.h5".format(rpath, dconfig["inset"])) "{}/RAW-*{}*.h5".format(rpath, dconfig["inset"]))
if len(fl): if len(fl):
...@@ -415,14 +499,14 @@ async def server_runner(config, mode): ...@@ -415,14 +499,14 @@ async def server_runner(config, mode):
del thisconf[ del thisconf[
"inset"] # don't need this for xfel-calibrate "inset"] # don't need this for xfel-calibrate
detectors[detector] = thisconf detectors[detector] = thisconf
copy_file_list = copy_file_list.difference(corr_file_list) copy_file_list = copy_file_list.difference(corr_file_list)
asyncio.ensure_future( asyncio.ensure_future(
copy_untouched_files(copy_file_list, out_folder, runnr)) copy_untouched_files(copy_file_list, out_folder, runnr))
if len(detectors) == 0: if len(detectors) == 0:
logging.warn(Errors.NOTHING_TO_DO.format(rpath)) logging.warn(Errors.NOTHING_TO_DO.format(rpath))
msg = MDC.NOTHING_TO_DO msg = "Nothing to calibrate for this run, copied raw data only"
response = mdc.update_run_api(rid, response = mdc.update_run_api(rid,
{'flg_cal_data_status': 'NA', {'flg_cal_data_status': 'NA',
'cal_pipeline_reply': msg}) 'cal_pipeline_reply': msg})
...@@ -430,6 +514,7 @@ async def server_runner(config, mode): ...@@ -430,6 +514,7 @@ async def server_runner(config, mode):
logging.error(Errors.MDC_RESPONSE.format(response)) logging.error(Errors.MDC_RESPONSE.format(response))
return return
status = [] status = []
detectors = {}
for detector, dconfig in detectors.items(): for detector, dconfig in detectors.items():
cmd = ["python", "-m", "xfel_calibrate.calibrate", cmd = ["python", "-m", "xfel_calibrate.calibrate",
detector, "CORRECT"] detector, "CORRECT"]
...@@ -439,7 +524,8 @@ async def server_runner(config, mode): ...@@ -439,7 +524,8 @@ async def server_runner(config, mode):
else: else:
cmd += ["--{}".format(key)] cmd += ["--{}".format(key)]
ret = await run_correction(job_db, cmd, mode, proposal, ret = await run_correction(job_db, cmd, mode, proposal,
runnr, rid) runnr,
dconfig["in-folder"], rid)
status.append(ret) status.append(ret)
if action == 'upload-yaml': if action == 'upload-yaml':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment