diff --git a/webservice/webservice.py b/webservice/webservice.py index f2a63031ab55ec980e9d7ea4e9c6127665e8f26d..dfb7d15333ea650b4227f6ce47498e9ea750a1f4 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -16,10 +16,9 @@ import zmq.asyncio import zmq.auth.thread from dateutil import parser as timeparser from git import Repo, InvalidGitRepositoryError +from messages import Errors, Success from metadata_client.metadata_client import MetadataClient -from messages import Errors, MDC, Success - async def init_job_db(config): """ Initialize the sqlite job database @@ -337,73 +336,158 @@ async def server_runner(config, mode): action, payload = response[0], response[1:] - if action not in ["correct", + if action not in ["correct", 'dark', 'upload-yaml']: # only handle known actions logging.warn(Errors.UNKNOWN_ACTION.format(action)) socket.send(Errors.UNKNOWN_ACTION.format(action).encode()) continue async def do_action(): - if action == 'correct': - rid, sase, instrument, cycle, proposal, runnr = payload + in_folder = None + out_folder = None + run_mapping = {} + + if action in ['dark', 'correct']: + wait_runs = [] + + if action == 'correct': + rid, sase, instrument, cycle, proposal, runnr = payload + wait_runs = [runnr] + if action == 'dark': + rid, sase, instrument, cycle, proposal = payload[:5] + runs = payload[5:] # can be many + for i, run in enumerate(runs): + erun = eval(run) + if isinstance(erun, (list, tuple)): + typ, runnr = erun + run_mapping[typ] = runnr + wait_runs.append(runnr) + else: + run_mapping['no_mapping_{}'.format(i)] = erun + wait_runs.append(erun) + specific_conf_file = "{}/{}/{}.yaml".format( config['config-repo']['local-path'], cycle, proposal) if os.path.exists(specific_conf_file): with open(specific_conf_file, "r") as f: - pconf = yaml.load(f.read()) + pconf = yaml.load(f.read())[action] else: + print("Using default file, as {} does not exist".format( + specific_conf_file)) default_file = "{}/default.yaml".format( config['config-repo']['local-path']) with open(default_file, "r") as f: - pconf = yaml.load(f.read()) + pconf = yaml.load(f.read())[action] in_folder = config['correct']['in-folder'].format( instrument=instrument, cycle=cycle, proposal=proposal) out_folder = config['correct']['out-folder'].format( instrument=instrument, cycle=cycle, proposal=proposal) - detectors = {} - rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) - - async def wait_on_transfer(): - rstr = None - ret = None - max_tries = 60 # 600s - tries = 0 - while not os.path.exists(rpath): - await asyncio.sleep(10) - # await asyncio.sleep(1) - while (rstr is None or 'status="online"' in rstr - or ret.returncode != 0): - ret = subprocess.run( - ["getfattr", "-n", "user.status", rpath], - stdout=subprocess.PIPE) - rstr = ret.stdout.decode() - await asyncio.sleep(10) - if tries > max_tries: - return False - tries += 1 - - return ret.returncode == 0 - - msg = Success.QUEUED.format(proposal, runnr) + + msg = "Queued proposal {}, run {} for offline calibration".format( + proposal, ", ".join(wait_runs)) socket.send(msg.encode()) - transfer_complete = await wait_on_transfer() - if not transfer_complete: - logging.error( - Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr)) - msg = MDC.MIGRATION_TIMEOUT - response = mdc.update_run_api(rid, - {'flg_cal_data_status': 'NA', - 'cal_pipeline_reply': msg}) - if response.status_code != 200: - logging.error(Errors.MDC_RESPONSE.format(response)) + all_transfers = [] + for runnr in wait_runs: + rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) + + async def wait_on_transfer(): + rstr = None + ret = None + max_tries = 60 # 600s + tries = 0 + while not os.path.exists(rpath): + await asyncio.sleep(10) + # await asyncio.sleep(1) + while rstr is None or 'status="online"' in rstr or ret.returncode != 0: + print(" ".join( + ["getfattr", "-n", "user.status", rpath])) + ret = subprocess.run( + ["getfattr", "-n", "user.status", rpath], + stdout=subprocess.PIPE) + rstr = ret.stdout.decode() + await asyncio.sleep(10) + if tries > max_tries: + return False + tries += 1 + + return ret.returncode == 0 + + transfer_complete = await wait_on_transfer() + all_transfers.append(transfer_complete) + if not transfer_complete: + logging.error( + Errors.TRANSFER_EVAL_FAILED.format(proposal, + runnr)) + msg = "Timeout waiting for migration. Contact det-support@xfel.eu" + response = mdc.update_run_api(rid, { + 'flg_cal_data_status': 'NA', + 'cal_pipeline_reply': msg}) + if response.status_code != 200: + logging.error(Errors.MDC_RESPONSE.format(response)) + + if not all(all_transfers): + logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal, + ",".join( + wait_runs))) + return + + if action == 'dark': + status = [] + detectors = {} + for runnr in wait_runs: + rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) + if len(detectors) == 0: + logging.warn(Errors.NOTHING_TO_DO.format(rpath)) + msg = "Nothing to characterize for these runs" + response = mdc.update_run_api(rid, { + 'flg_cal_data_status': 'NA', + 'cal_pipeline_reply': msg}) + if response.status_code != 200: + logging.error(Errors.MDC_RESPONSE.format(response)) + + for detector, dconfig in pconf[instrument].items(): + # check if we find files according to mapping in raw run folder + fl = glob.glob( + "{}/RAW-*{}*.h5".format(rpath, dconfig["inset"])) + if len(fl): + corr_file_list = corr_file_list.union(set(fl)) + thisconf = copy.copy(dconfig) + thisconf["in-folder"] = in_folder + thisconf["out-folder"] = out_folder + + del thisconf[ + "inset"] # don't need this for xfel-calibrate + detectors[detector] = thisconf + for detector, dconfig in detectors.items(): + cmd = ["python", "-m", "xfel_calibrate.calibrate", + detector, "DARK"] + run_config = [] + for typ, run in run_mapping.items(): + if "no_mapping" in type: + run_config.append(run) + else: + dconfig[typ] = run + if len(run_config): + dconfig["runs"] = ",".join(run_config) + for key, value in dconfig.items(): + if not isinstance(value, bool): + cmd += ["--{}".format(key), str(value)] + else: + cmd += ["--{}".format(key)] + ret = await run_correction(job_db, cmd, mode, proposal, + runnr, + dconfig["in-folder"], rid) + status.append(ret) + + if action == 'correct': + rpath = "{}/r{:04d}/".format(in_folder, int(wait_runs[0])) corr_file_list = set() copy_file_list = set(glob.glob("{}/*.h5".format(rpath))) for detector, dconfig in pconf[instrument].items(): - # check if we find files according to - # mapping in raw run folder + # check if we find files according to mapping in raw run folder fl = glob.glob( "{}/RAW-*{}*.h5".format(rpath, dconfig["inset"])) if len(fl): @@ -415,14 +499,14 @@ async def server_runner(config, mode): del thisconf[ "inset"] # don't need this for xfel-calibrate detectors[detector] = thisconf - copy_file_list = copy_file_list.difference(corr_file_list) + asyncio.ensure_future( copy_untouched_files(copy_file_list, out_folder, runnr)) if len(detectors) == 0: logging.warn(Errors.NOTHING_TO_DO.format(rpath)) - msg = MDC.NOTHING_TO_DO + msg = "Nothing to calibrate for this run, copied raw data only" response = mdc.update_run_api(rid, {'flg_cal_data_status': 'NA', 'cal_pipeline_reply': msg}) @@ -430,6 +514,7 @@ async def server_runner(config, mode): logging.error(Errors.MDC_RESPONSE.format(response)) return status = [] + detectors = {} for detector, dconfig in detectors.items(): cmd = ["python", "-m", "xfel_calibrate.calibrate", detector, "CORRECT"] @@ -439,7 +524,8 @@ async def server_runner(config, mode): else: cmd += ["--{}".format(key)] ret = await run_correction(job_db, cmd, mode, proposal, - runnr, rid) + runnr, + dconfig["in-folder"], rid) status.append(ret) if action == 'upload-yaml':