Skip to content
Snippets Groups Projects
Commit d6fd4fe6 authored by Steffen Hauf's avatar Steffen Hauf
Browse files

Propagate backlog of webservice related changes from production system as of 06/2019

See merge request detectors/pycalibration!78
parents 710ba225 048a8a4e
No related branches found
No related tags found
1 merge request!78Propagate backlog of webservice related changes from production system as of 06/2019
......@@ -6,7 +6,7 @@ import urllib.parse
import zmq
parser = argparse.ArgumentParser(
description='Manually request correction for a given run. Will wait on data transfers to complete first!')
description='Request dark characterization. Will wait on data transfers to complete first!')
parser.add_argument('--proposal', type=str, help='The proposal number, without leading p, but with leading zeros')
parser.add_argument('--instrument', type=str, choices=["SPB", "MID", "FXE", "SCS", "SQS", "HED"], help='The instrument')
parser.add_argument('--cycle', type=str, help='The facility cycle')
......
import argparse
from datetime import datetime
import sys
from time import sleep
import urllib.parse
import zmq
parser = argparse.ArgumentParser(
description='Request dark characterization. Will wait on data transfers to complete first!')
parser.add_argument('--proposal', type=str, help='The proposal number, without leading p, but with leading zeros')
parser.add_argument('--instrument', type=str, choices=["SPB", "MID", "FXE", "SCS", "SQS", "HED"], help='The instrument')
description='Request dark characterization. Will wait on data transfers to complete first!') # noqa
parser.add_argument('--proposal', type=str,
help='The proposal number, without leading p, but with leading zeros') # noqa
parser.add_argument('--instrument', type=str,
choices=["SPB", "MID", "FXE", "SCS", "SQS", "HED"],
help='The instrument')
parser.add_argument('--cycle', type=str, help='The facility cycle')
parser.add_argument('--run-high', type=str, help='Run number of high gain data as an integer')
parser.add_argument('--run-med', type=str, help='Run number of medium gain data as an integer')
parser.add_argument('--run-low', type=str, help='Run number of low gain data as an integer')
parser.add_argument('--run-high', type=str,
help='Run number of high gain data as an integer')
parser.add_argument('--run-med', type=str,
help='Run number of medium gain data as an integer')
parser.add_argument('--run-low', type=str,
help='Run number of low gain data as an integer')
parser.add_argument('--run', type=str, help='Run number as an integer')
args = vars(parser.parse_args())
......@@ -23,7 +28,8 @@ con = socket.connect("tcp://max-exfl016:5555")
uuid = str(datetime.now().timestamp().as_integer_ratio()[0])
parm_list = ["dark", uuid, "SASEX", args["instrument"], args["cycle"], args["proposal"]]
parm_list = ["dark", uuid, "SASEX", args["instrument"], args["cycle"],
args["proposal"]]
if "run_high" in args and args["run_high"]:
parm_list += ["(\"run-high\", \"{}\")".format(args["run_high"])]
......@@ -34,7 +40,6 @@ if "run_low" in args and args["run_low"]:
if "run" in args and args["run"]:
parm_list += ["(\"run\", \"{}\")".format(args["run"])]
msg = "','".join(parm_list)
socket.send("['{}']".format(msg).encode())
resp = socket.recv_multipart()[0]
......@@ -45,7 +50,6 @@ while True:
rl = resp.decode().split("\n")
r = ["QUEUE" if "QUEUE" in r else r for r in rl]
r = " | ".join(r)
print(r+"\r", end=' ', flush=True)
print(r + "\r", end=' ', flush=True)
sleep(10)
break
......@@ -71,7 +71,10 @@ def init_config_repo(config):
repo = Repo(config['local-path'])
except InvalidGitRepositoryError:
repo = Repo.clone_from(config['url'], config['local-path'])
repo.remote().pull()
try:
repo.remote().pull()
except:
pass
logging.info("Config repo is initialized")
......@@ -157,7 +160,7 @@ async def query_rid(conn, socket, rid):
flg = max(flgs, key=lambda i: flg_order[i])
msg += "\n".join(statii)
if msg == "":
msg = "DONE"
msg = "DONE"
socket.send(msg.encode())
......@@ -206,11 +209,6 @@ async def update_job_db(config):
c.execute("SELECT * FROM jobs")
for r in c.fetchall():
rid, jobid, proposal, run, flg, status = r
# logging.debug(
# "Job {}, proposal {}, run {} has status {}".format(jobid,
# proposal,
# run,
# status))
cflg, cstatus = combined.get(rid, ([], []))
cflg.append(flg)
cstatus.append(status)
......@@ -373,7 +371,7 @@ async def server_runner(config, mode):
rid = payload[0]
await query_rid(job_db, socket, rid)
continue
mdc = await init_md_client(config)
async def do_action(action, payload):
......@@ -387,6 +385,7 @@ async def server_runner(config, mode):
if action == 'correct':
rid, sase, instrument, cycle, proposal, runnr, priority = payload
runnr = runnr.replace("r", "")
wait_runs = [runnr]
if action == 'dark':
rid, sase, instrument, cycle, proposal = payload[:5]
......@@ -395,11 +394,13 @@ async def server_runner(config, mode):
erun = eval(run)
if isinstance(erun, (list, tuple)):
typ, runnr = erun
runnr = runnr.replace("r", "")
run_mapping[typ] = runnr
wait_runs.append(runnr)
else:
run_mapping['no_mapping_{}'.format(i)] = erun
wait_runs.append(erun)
proposal = proposal.replace("p", "")
proposal = "{:06d}".format(int(proposal))
specific_conf_file = "{}/{}/{}.yaml".format(
config['config-repo']['local-path'], cycle, proposal)
......@@ -430,7 +431,8 @@ async def server_runner(config, mode):
rpath = "{}/r{:04d}/".format(in_folder, int(runnr))
async def wait_on_transfer():
if 'pnfs' in os.path.realpath(rpath): # dcache files are assumed migrated
if 'pnfs' in os.path.realpath(
rpath): # dcache files are assumed migrated
return True
rstr = None
ret = None
......@@ -472,14 +474,15 @@ async def server_runner(config, mode):
",".join(
wait_runs)))
return
print("Now doing: {}".format(action))
if action == 'dark':
print("Running dark cal")
status = []
detectors = {}
out_folder = config[action]['out-folder'].format(
instrument=instrument, cycle=cycle, proposal=proposal, runs="_".join(wait_runs))
instrument=instrument, cycle=cycle, proposal=proposal,
runs="_".join(wait_runs))
for runnr in wait_runs:
rpath = "{}/r{:04d}/".format(in_folder, int(runnr))
if len(detectors) == 0:
......@@ -507,8 +510,12 @@ async def server_runner(config, mode):
for detector, dconfig in detectors.items():
if "-" in detector:
detector, _ = detector.split("-")
priority = '1'
if detector.upper() in ["JUNGFRAU", "FASTCCD", "PNCCD",
"EPIX"]:
priority = '0'
cmd = ["python", "-m", "xfel_calibrate.calibrate",
detector, "DARK", '--priority', '1']
detector, "DARK", '--priority', priority]
run_config = []
for typ, run in run_mapping.items():
if "no_mapping" in typ:
......@@ -564,8 +571,8 @@ async def server_runner(config, mode):
return
status = []
for detector, dconfig in detectors.items():
if "-" in detector:
detector, _ = detector.split("-")
if "-" in detector:
detector, _ = detector.split("-")
cmd = ["python", "-m", "xfel_calibrate.calibrate",
detector, "CORRECT"]
for key, value in dconfig.items():
......@@ -574,7 +581,7 @@ async def server_runner(config, mode):
else:
cmd += ["--{}".format(key)]
if priority:
cmd += ["--priority", str(priority)]
cmd += ["--priority", str(priority)]
ret = await run_correction(job_db, cmd, mode, proposal,
runnr, rid)
status.append(ret)
......@@ -586,7 +593,8 @@ async def server_runner(config, mode):
instrument, cycle, proposal)
try:
asyncio.ensure_future(do_action(copy.copy(action), copy.copy(payload)))
asyncio.ensure_future(
do_action(copy.copy(action), copy.copy(payload)))
except Exception as e: # actions that fail are only error logged
logging.error(str(e))
......@@ -614,4 +622,3 @@ if __name__ == "__main__":
loop.create_task(update_job_db(config))
loop.run_until_complete(server_runner(config, mode))
loop.close()
config-repo:
url: https://git.xfel.eu/gitlab/detectors/calibration_configurations.git
local-path: /home/haufs/calibration_config/
url:
local-path: /home/xcal/calibration_config/
web-service:
port: 5555
bind-to: tcp://*
allowed-ips:
job-db: ./webservice_jobs.sqlite
job-update-interval: 30
job-update-interval: 60
job-timeout: 3600
metadata-client:
user-id:
user-secret:
user-email:
user-id:
user-secret:
user-email:
metadata-web-app-url: 'https://in.xfel.eu/metadata'
metadata-web-app-url: 'https://in.xfel.eu/metadata'
token-url: 'https://in.xfel.eu/metadata/oauth/token'
......@@ -24,4 +24,9 @@ metadata-client:
correct:
in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw
out-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/proc
out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}
dark:
in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw
out-folder: /gpfs/exfel/u/usr/{instrument}/{cycle}/p{proposal}/dark/runs_{runs}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment