diff --git a/webservice/webservice.py b/webservice/webservice.py index 8043d047f7239149294dfe99b369344dd799b75d..0a13276d91bdbccdaae6037e53442923d936c775 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -123,6 +123,13 @@ async def update_job_db(config): await asyncio.sleep(int(config['web-service']['job-update-interval'])) +async def copy_untouched_files(file_list): + for f in file_list: + of = f.replace("raw", "proc").replace("RAW", "CORR") + cmd = ["rsync", "-av", f, of] + await asyncio.subprocess.create_subprocess_shell(" ".join(cmd)) + logging.info("Copying {} to {}".format(f, of)) + async def run_cmd(conn, cmd, mode, proposal, run, rpath): """ Run a correction command @@ -229,81 +236,88 @@ async def server_runner(config, mode): socket.send(Errors.UNKNOWN_ACTION.format(action).encode()) continue - if action == 'correct': - sase, instrument, cycle, proposal, runnr = payload - specific_conf_file = "{}/{}/{}.yaml".format(config['config-repo']['local-path'], cycle, proposal) - if os.path.exists(specific_conf_file): - with open(specific_conf_file, "r") as f: - pconf = yaml.load(f.read()) - else: - print("Using default file, as {} does not exist".format(specific_conf_file)) - default_file = "{}/default.yaml".format(config['config-repo']['local-path']) - with open(default_file, "r") as f: - pconf = yaml.load(f.read()) - - in_folder = config['correct']['in-folder'].format(instrument=instrument, cycle=cycle, proposal=proposal) - out_folder = config['correct']['out-folder'].format(instrument=instrument, cycle=cycle, proposal=proposal) - detectors = {} - rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) - async def wait_on_transfer(): - rstr = None - ret = None - max_tries = 60 # 600s - tries = 0 - while not os.path.exists(rpath): - await asyncio.sleep(10) - await asyncio.sleep(1) - while rstr is None or 'status="online"' in rstr or ret.returncode != 0: - print(" ".join(["getfattr", "-n", "user.status", rpath])) - ret = subprocess.run(["getfattr", "-n", "user.status", rpath], stdout=subprocess.PIPE) - rstr = ret.stdout.decode() - await asyncio.sleep(10) - if tries > max_tries: - return False - tries += 1 - - return ret.returncode == 0 - - msg = "Queued proposal {}, run {} for offline calibration".format(proposal, runnr) - socket.send(msg.encode()) - - transfer_complete = await wait_on_transfer() - if not transfer_complete: - logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr)) - return Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr) - - for detector, dconfig in pconf[instrument].items(): - # check if we find files according to mapping in raw run folder - fl = glob.glob("{}/RAW-*{}*.h5".format(rpath, dconfig["inset"])) - if len(fl): - thisconf = copy.copy(dconfig) - thisconf["in-folder"] = in_folder - thisconf["out-folder"] = out_folder - thisconf["run"] = runnr - del thisconf["inset"] # don't need this for xfel-calibrate - detectors[detector] = thisconf - - if len(detectors) == 0: - logging.warn(Errors.NOTHING_TO_DO.format(rpath)) - socket.send(Errors.NOTHING_TO_DO.format(rpath).encode()) - continue - status = [] - for detector, dconfig in detectors.items(): - cmd = ["python", "-m", "xfel_calibrate.calibrate", detector, "CORRECT"] - for key, value in dconfig.items(): - if not isinstance(value, bool): - cmd += ["--{}".format(key), str(value)] - else: - cmd += ["--{}".format(key)] - ret = await run_cmd(job_db, cmd, mode, proposal, runnr, dconfig["in-folder"]) - status.append(ret) - #socket.send(("\n".join(status)).encode()) - - if action == 'upload-yaml': - sase, instrument, cycle, proposal, this_yaml = payload - this_yaml = urllib.parse.unquote_plus(this_yaml) - await upload_config(socket, config['config-repo'], this_yaml, instrument, cycle, proposal) - + async def do_action(): + if action == 'correct': + sase, instrument, cycle, proposal, runnr = payload + specific_conf_file = "{}/{}/{}.yaml".format(config['config-repo']['local-path'], cycle, proposal) + if os.path.exists(specific_conf_file): + with open(specific_conf_file, "r") as f: + pconf = yaml.load(f.read()) + else: + print("Using default file, as {} does not exist".format(specific_conf_file)) + default_file = "{}/default.yaml".format(config['config-repo']['local-path']) + with open(default_file, "r") as f: + pconf = yaml.load(f.read()) + + in_folder = config['correct']['in-folder'].format(instrument=instrument, cycle=cycle, proposal=proposal) + out_folder = config['correct']['out-folder'].format(instrument=instrument, cycle=cycle, proposal=proposal) + detectors = {} + rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) + async def wait_on_transfer(): + rstr = None + ret = None + max_tries = 60 # 600s + tries = 0 + while not os.path.exists(rpath): + await asyncio.sleep(10) + # await asyncio.sleep(1) + while rstr is None or 'status="online"' in rstr or ret.returncode != 0: + print(" ".join(["getfattr", "-n", "user.status", rpath])) + ret = subprocess.run(["getfattr", "-n", "user.status", rpath], stdout=subprocess.PIPE) + rstr = ret.stdout.decode() + await asyncio.sleep(10) + if tries > max_tries: + return False + tries += 1 + + return ret.returncode == 0 + + msg = "Queued proposal {}, run {} for offline calibration".format(proposal, runnr) + socket.send(msg.encode()) + + transfer_complete = await wait_on_transfer() + if not transfer_complete: + logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr)) + return Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr) + + corr_file_list = set() + copy_file_list = set(glob.glob("{}/*.h5".format(rpath))) + for detector, dconfig in pconf[instrument].items(): + # check if we find files according to mapping in raw run folder + fl = glob.glob("{}/RAW-*{}*.h5".format(rpath, dconfig["inset"])) + if len(fl): + corr_file_list = corr_file_list.union(set(fl)) + thisconf = copy.copy(dconfig) + thisconf["in-folder"] = in_folder + thisconf["out-folder"] = out_folder + thisconf["run"] = runnr + del thisconf["inset"] # don't need this for xfel-calibrate + detectors[detector] = thisconf + + copy_file_list = copy_file_list.difference(corr_file_list) + asyncio.ensure_future(copy_untouched_files(copy_file_list)) + + if len(detectors) == 0: + logging.warn(Errors.NOTHING_TO_DO.format(rpath)) + socket.send(Errors.NOTHING_TO_DO.format(rpath).encode()) + return + status = [] + for detector, dconfig in detectors.items(): + cmd = ["python", "-m", "xfel_calibrate.calibrate", detector, "CORRECT"] + for key, value in dconfig.items(): + if not isinstance(value, bool): + cmd += ["--{}".format(key), str(value)] + else: + cmd += ["--{}".format(key)] + ret = await run_cmd(job_db, cmd, mode, proposal, runnr, dconfig["in-folder"]) + status.append(ret) + #socket.send(("\n".join(status)).encode()) + + if action == 'upload-yaml': + sase, instrument, cycle, proposal, this_yaml = payload + this_yaml = urllib.parse.unquote_plus(this_yaml) + await upload_config(socket, config['config-repo'], this_yaml, instrument, cycle, proposal) + asyncio.ensure_future(do_action()) parser = argparse.ArgumentParser(description='Start the calibration webservice') parser.add_argument('--config-file', type=str, default='./webservice.yaml')