Skip to content
Snippets Groups Projects
Commit 6958d2b1 authored by Steffen Hauf's avatar Steffen Hauf
Browse files

Handle multiple detectors of the same type for one instrument, add convenience scripts

See merge request detectors/pycalibration!56
parents 3d92fc82 95027ba1
No related branches found
No related tags found
1 merge request!56Handle multiple detectors of the same type for one instrument, add convenience scripts
import argparse
from datetime import datetime
import sys
from time import sleep
import urllib.parse
import zmq
parser = argparse.ArgumentParser(
description='Manually request correction for a given run. Will wait on data transfers to complete first!')
parser.add_argument('--proposal', type=str, help='The proposal number, without leading p, but with leading zeros')
parser.add_argument('--instrument', type=str, choices=["SPB", "MID", "FXE", "SCS", "SQS", "HED"], help='The instrument')
parser.add_argument('--cycle', type=str, help='The facility cycle')
parser.add_argument('--run', type=int, help='Run number as an integer')
parser.add_argument('--mdc-id', type=int, help='Run id from MDC')
parser.add_argument('--priority', type=int, help='Priority to launch', choices=[1,2], default=2)
args = vars(parser.parse_args())
con = zmq.Context()
socket = con.socket(zmq.REQ)
con = socket.connect("tcp://max-exfl016:5555")
msg = "','".join(["correct", str(args["mdc_id"]), "SASEX", args["instrument"], args["cycle"], args["proposal"], str(args["run"]), str(args["priority"])])
socket.send("['{}']".format(msg).encode())
resp = socket.recv_multipart()[0]
print(resp.decode())
...@@ -11,7 +11,7 @@ class Errors: ...@@ -11,7 +11,7 @@ class Errors:
UPLOAD_CONFIG_FAILED = "FAILED: Config for cycle {}, proposal {} could not be uploaded!, please contact det-support@xfel.eu" UPLOAD_CONFIG_FAILED = "FAILED: Config for cycle {}, proposal {} could not be uploaded!, please contact det-support@xfel.eu"
TRANSFER_EVAL_FAILED = "FAILED: Evaluating transfer to offline failed for proposal {}, run {}, please contact det-support@xfel.eu" TRANSFER_EVAL_FAILED = "FAILED: Evaluating transfer to offline failed for proposal {}, run {}, please contact det-support@xfel.eu"
MDC_RESPONSE = "FAILED: Response error from MDC: {}" MDC_RESPONSE = "FAILED: Response error from MDC: {}"
NOT_CONFIGURED = "FAILED: instrument not configured!" NOT_CONFIGURED = "FAILED: instrument not configured, please contact det-support@xfel.eu"
class MDC: class MDC:
......
...@@ -13,14 +13,29 @@ parser.add_argument('--cycle', type=str, help='The facility cycle') ...@@ -13,14 +13,29 @@ parser.add_argument('--cycle', type=str, help='The facility cycle')
parser.add_argument('--run-high', type=str, help='Run number of high gain data as an integer') parser.add_argument('--run-high', type=str, help='Run number of high gain data as an integer')
parser.add_argument('--run-med', type=str, help='Run number of medium gain data as an integer') parser.add_argument('--run-med', type=str, help='Run number of medium gain data as an integer')
parser.add_argument('--run-low', type=str, help='Run number of low gain data as an integer') parser.add_argument('--run-low', type=str, help='Run number of low gain data as an integer')
parser.add_argument('--run', type=str, help='Run number as an integer')
args = vars(parser.parse_args()) args = vars(parser.parse_args())
con = zmq.Context() con = zmq.Context()
socket = con.socket(zmq.REQ) socket = con.socket(zmq.REQ)
con = socket.connect("tcp://max-exfl016:5555") con = socket.connect("tcp://max-exfl016:5555")
uuid = str(datetime.now().timestamp().as_integer_ratio()[0])+args["run_high"]
msg = "','".join(["dark", uuid, "SASEX", args["instrument"], args["cycle"], args["proposal"], "(\"run-high\", \"{}\")".format(args["run_high"]), "(\"run-med\", \"{}\")".format(args["run_med"]), "(\"run-low\", \"{}\")".format(args["run_low"])]) uuid = str(datetime.now().timestamp().as_integer_ratio()[0])
parm_list = ["dark", uuid, "SASEX", args["instrument"], args["cycle"], args["proposal"]]
if "run_high" in args and args["run_high"]:
parm_list += ["(\"run-high\", \"{}\")".format(args["run_high"])]
if "run_med" in args and args["run_med"]:
parm_list += ["(\"run-med\", \"{}\")".format(args["run_med"])]
if "run_low" in args and args["run_low"]:
parm_list += ["(\"run-low\", \"{}\")".format(args["run_low"])]
if "run" in args and args["run"]:
parm_list += ["(\"run\", \"{}\")".format(args["run"])]
msg = "','".join(parm_list)
socket.send("['{}']".format(msg).encode()) socket.send("['{}']".format(msg).encode())
resp = socket.recv_multipart()[0] resp = socket.recv_multipart()[0]
print(resp.decode()) print(resp.decode())
...@@ -33,5 +48,4 @@ while True: ...@@ -33,5 +48,4 @@ while True:
print(r+"\r", end=' ', flush=True) print(r+"\r", end=' ', flush=True)
sleep(10) sleep(10)
import sys
import urllib.parse
import zmq
con = zmq.Context()
socket = con.socket(zmq.REQ)
con = socket.connect("tcp://localhost:5555")
if sys.argv[1] == "correct":
# msg = "','".join(["correct", "23440", "SASE1", "SPB", "201921", "2413", "4"])
msg = "','".join(["correct", "20199", "SASE1", "FXE", "201831", "900037", "229"])
if sys.argv[1] == "dark":
msg = "','".join(["dark", "20829", "SASE1", "SPB", "201930", "900062", "(\"run-high\", \"264\")", "(\"run-med\", \"265\")", "(\"run-low\", \"266\")"])
if sys.argv[1] == "upload-yaml":
yaml = """
SPB:
AGIPD:
inset: AGIPD
calfile: foo
mem-cells: 176
blc-noise: yes
blc-noise-threshold: 100
instance: AGIPD1M1
JUNGFRAU:
inset: DA02
calfile: xxx
FXE:
LPD:
inset: LPD
calfile: xxx
non-linear-gain: yes
"""
msg = "','".join(["upload-yaml", "SASE1", "SPB", "201831", "900039", urllib.parse.quote_plus(yaml)])
socket.send("['{}']".format(msg).encode())
...@@ -259,7 +259,7 @@ async def run_correction(conn, cmd, mode, proposal, run, rid): ...@@ -259,7 +259,7 @@ async def run_correction(conn, cmd, mode, proposal, run, rid):
execution. execution.
""" """
if mode == "prod": if mode == "prod":
print(cmd) logging.info(" ".join(cmd))
ret = subprocess.run(cmd, stdout=subprocess.PIPE) ret = subprocess.run(cmd, stdout=subprocess.PIPE)
if ret.returncode == 0: if ret.returncode == 0:
logging.info(Success.START_CORRECTION.format(proposal, run)) logging.info(Success.START_CORRECTION.format(proposal, run))
...@@ -414,7 +414,7 @@ async def server_runner(config, mode): ...@@ -414,7 +414,7 @@ async def server_runner(config, mode):
with open(default_file, "r") as f: with open(default_file, "r") as f:
pconf = yaml.load(f.read())[action] pconf = yaml.load(f.read())[action]
if instrument not in pconf: if instrument not in pconf:
socket.send(Error.NOT_CONFIGURED.encode()) socket.send(Errors.NOT_CONFIGURED.encode())
return return
in_folder = config[action]['in-folder'].format( in_folder = config[action]['in-folder'].format(
...@@ -430,6 +430,8 @@ async def server_runner(config, mode): ...@@ -430,6 +430,8 @@ async def server_runner(config, mode):
rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) rpath = "{}/r{:04d}/".format(in_folder, int(runnr))
async def wait_on_transfer(): async def wait_on_transfer():
if 'pnfs' in os.path.realpath(rpath): # dcache files are assumed migrated
return True
rstr = None rstr = None
ret = None ret = None
max_tries = 300 # 3000s max_tries = 300 # 3000s
...@@ -437,7 +439,7 @@ async def server_runner(config, mode): ...@@ -437,7 +439,7 @@ async def server_runner(config, mode):
while not os.path.exists(rpath): while not os.path.exists(rpath):
await asyncio.sleep(10) await asyncio.sleep(10)
# await asyncio.sleep(1) # await asyncio.sleep(1)
while rstr is None or 'status="online"' in rstr or ret.returncode != 0: while rstr is None or 'status="online"' in rstr or 'status="Online"' in rstr or ret.returncode != 0:
await asyncio.sleep(10) await asyncio.sleep(10)
ret = subprocess.run( ret = subprocess.run(
["getfattr", "-n", "user.status", rpath], ["getfattr", "-n", "user.status", rpath],
...@@ -503,8 +505,10 @@ async def server_runner(config, mode): ...@@ -503,8 +505,10 @@ async def server_runner(config, mode):
detectors[detector] = thisconf detectors[detector] = thisconf
print("Detectors:", detectors) print("Detectors:", detectors)
for detector, dconfig in detectors.items(): for detector, dconfig in detectors.items():
if "-" in detector:
detector, _ = detector.split("-")
cmd = ["python", "-m", "xfel_calibrate.calibrate", cmd = ["python", "-m", "xfel_calibrate.calibrate",
detector, "DARK"] detector, "DARK", '--priority', '1']
run_config = [] run_config = []
for typ, run in run_mapping.items(): for typ, run in run_mapping.items():
if "no_mapping" in typ: if "no_mapping" in typ:
...@@ -560,6 +564,8 @@ async def server_runner(config, mode): ...@@ -560,6 +564,8 @@ async def server_runner(config, mode):
return return
status = [] status = []
for detector, dconfig in detectors.items(): for detector, dconfig in detectors.items():
if "-" in detector:
detector, _ = detector.split("-")
cmd = ["python", "-m", "xfel_calibrate.calibrate", cmd = ["python", "-m", "xfel_calibrate.calibrate",
detector, "CORRECT"] detector, "CORRECT"]
for key, value in dconfig.items(): for key, value in dconfig.items():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment