Skip to content
Snippets Groups Projects
Commit 7ec17a5e authored by Steffen Hauf's avatar Steffen Hauf
Browse files

Add copying, make handling async

parent e4541c92
No related branches found
No related tags found
1 merge request!44add a web service to launch calibration from MDC
......@@ -123,6 +123,13 @@ async def update_job_db(config):
await asyncio.sleep(int(config['web-service']['job-update-interval']))
async def copy_untouched_files(file_list):
for f in file_list:
of = f.replace("raw", "proc").replace("RAW", "CORR")
cmd = ["rsync", "-av", f, of]
await asyncio.subprocess.create_subprocess_shell(" ".join(cmd))
logging.info("Copying {} to {}".format(f, of))
async def run_cmd(conn, cmd, mode, proposal, run, rpath):
""" Run a correction command
......@@ -229,81 +236,88 @@ async def server_runner(config, mode):
socket.send(Errors.UNKNOWN_ACTION.format(action).encode())
continue
if action == 'correct':
sase, instrument, cycle, proposal, runnr = payload
specific_conf_file = "{}/{}/{}.yaml".format(config['config-repo']['local-path'], cycle, proposal)
if os.path.exists(specific_conf_file):
with open(specific_conf_file, "r") as f:
pconf = yaml.load(f.read())
else:
print("Using default file, as {} does not exist".format(specific_conf_file))
default_file = "{}/default.yaml".format(config['config-repo']['local-path'])
with open(default_file, "r") as f:
pconf = yaml.load(f.read())
in_folder = config['correct']['in-folder'].format(instrument=instrument, cycle=cycle, proposal=proposal)
out_folder = config['correct']['out-folder'].format(instrument=instrument, cycle=cycle, proposal=proposal)
detectors = {}
rpath = "{}/r{:04d}/".format(in_folder, int(runnr))
async def wait_on_transfer():
rstr = None
ret = None
max_tries = 60 # 600s
tries = 0
while not os.path.exists(rpath):
await asyncio.sleep(10)
await asyncio.sleep(1)
while rstr is None or 'status="online"' in rstr or ret.returncode != 0:
print(" ".join(["getfattr", "-n", "user.status", rpath]))
ret = subprocess.run(["getfattr", "-n", "user.status", rpath], stdout=subprocess.PIPE)
rstr = ret.stdout.decode()
await asyncio.sleep(10)
if tries > max_tries:
return False
tries += 1
return ret.returncode == 0
msg = "Queued proposal {}, run {} for offline calibration".format(proposal, runnr)
socket.send(msg.encode())
transfer_complete = await wait_on_transfer()
if not transfer_complete:
logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr))
return Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr)
for detector, dconfig in pconf[instrument].items():
# check if we find files according to mapping in raw run folder
fl = glob.glob("{}/RAW-*{}*.h5".format(rpath, dconfig["inset"]))
if len(fl):
thisconf = copy.copy(dconfig)
thisconf["in-folder"] = in_folder
thisconf["out-folder"] = out_folder
thisconf["run"] = runnr
del thisconf["inset"] # don't need this for xfel-calibrate
detectors[detector] = thisconf
if len(detectors) == 0:
logging.warn(Errors.NOTHING_TO_DO.format(rpath))
socket.send(Errors.NOTHING_TO_DO.format(rpath).encode())
continue
status = []
for detector, dconfig in detectors.items():
cmd = ["python", "-m", "xfel_calibrate.calibrate", detector, "CORRECT"]
for key, value in dconfig.items():
if not isinstance(value, bool):
cmd += ["--{}".format(key), str(value)]
else:
cmd += ["--{}".format(key)]
ret = await run_cmd(job_db, cmd, mode, proposal, runnr, dconfig["in-folder"])
status.append(ret)
#socket.send(("\n".join(status)).encode())
if action == 'upload-yaml':
sase, instrument, cycle, proposal, this_yaml = payload
this_yaml = urllib.parse.unquote_plus(this_yaml)
await upload_config(socket, config['config-repo'], this_yaml, instrument, cycle, proposal)
async def do_action():
if action == 'correct':
sase, instrument, cycle, proposal, runnr = payload
specific_conf_file = "{}/{}/{}.yaml".format(config['config-repo']['local-path'], cycle, proposal)
if os.path.exists(specific_conf_file):
with open(specific_conf_file, "r") as f:
pconf = yaml.load(f.read())
else:
print("Using default file, as {} does not exist".format(specific_conf_file))
default_file = "{}/default.yaml".format(config['config-repo']['local-path'])
with open(default_file, "r") as f:
pconf = yaml.load(f.read())
in_folder = config['correct']['in-folder'].format(instrument=instrument, cycle=cycle, proposal=proposal)
out_folder = config['correct']['out-folder'].format(instrument=instrument, cycle=cycle, proposal=proposal)
detectors = {}
rpath = "{}/r{:04d}/".format(in_folder, int(runnr))
async def wait_on_transfer():
rstr = None
ret = None
max_tries = 60 # 600s
tries = 0
while not os.path.exists(rpath):
await asyncio.sleep(10)
# await asyncio.sleep(1)
while rstr is None or 'status="online"' in rstr or ret.returncode != 0:
print(" ".join(["getfattr", "-n", "user.status", rpath]))
ret = subprocess.run(["getfattr", "-n", "user.status", rpath], stdout=subprocess.PIPE)
rstr = ret.stdout.decode()
await asyncio.sleep(10)
if tries > max_tries:
return False
tries += 1
return ret.returncode == 0
msg = "Queued proposal {}, run {} for offline calibration".format(proposal, runnr)
socket.send(msg.encode())
transfer_complete = await wait_on_transfer()
if not transfer_complete:
logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr))
return Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr)
corr_file_list = set()
copy_file_list = set(glob.glob("{}/*.h5".format(rpath)))
for detector, dconfig in pconf[instrument].items():
# check if we find files according to mapping in raw run folder
fl = glob.glob("{}/RAW-*{}*.h5".format(rpath, dconfig["inset"]))
if len(fl):
corr_file_list = corr_file_list.union(set(fl))
thisconf = copy.copy(dconfig)
thisconf["in-folder"] = in_folder
thisconf["out-folder"] = out_folder
thisconf["run"] = runnr
del thisconf["inset"] # don't need this for xfel-calibrate
detectors[detector] = thisconf
copy_file_list = copy_file_list.difference(corr_file_list)
asyncio.ensure_future(copy_untouched_files(copy_file_list))
if len(detectors) == 0:
logging.warn(Errors.NOTHING_TO_DO.format(rpath))
socket.send(Errors.NOTHING_TO_DO.format(rpath).encode())
return
status = []
for detector, dconfig in detectors.items():
cmd = ["python", "-m", "xfel_calibrate.calibrate", detector, "CORRECT"]
for key, value in dconfig.items():
if not isinstance(value, bool):
cmd += ["--{}".format(key), str(value)]
else:
cmd += ["--{}".format(key)]
ret = await run_cmd(job_db, cmd, mode, proposal, runnr, dconfig["in-folder"])
status.append(ret)
#socket.send(("\n".join(status)).encode())
if action == 'upload-yaml':
sase, instrument, cycle, proposal, this_yaml = payload
this_yaml = urllib.parse.unquote_plus(this_yaml)
await upload_config(socket, config['config-repo'], this_yaml, instrument, cycle, proposal)
asyncio.ensure_future(do_action())
parser = argparse.ArgumentParser(description='Start the calibration webservice')
parser.add_argument('--config-file', type=str, default='./webservice.yaml')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment