diff --git a/test_web.py b/test_web.py new file mode 100644 index 0000000000000000000000000000000000000000..ec27764df77e87e28780822dcf3de001671d9cfe --- /dev/null +++ b/test_web.py @@ -0,0 +1,6 @@ +import zmq +con = zmq.Context() +socket = con.socket(zmq.REQ) +con = socket.connect("tcp://localhost:5004") +msg = "','".join(["correct", "SASE1", "SPB", "201831", "900039", "431"]) +socket.send("['{}']".format(msg).encode()) diff --git a/webconfig.yaml b/webconfig.yaml new file mode 100644 index 0000000000000000000000000000000000000000..af30cdad6039a2115fd7637f4a44cdb892104990 --- /dev/null +++ b/webconfig.yaml @@ -0,0 +1,16 @@ +SPB: + AGIPD: + inset: AGIPD + calfile: xxx + mem-cells: 128 + blc-noise: yes + blc-noise-threshold: 100 + instance: AGIPD1M1 + JUNGFRAU: + inset: DA05 + calfile: xxx +FXE: + LPD: + inset: LPD + calfile: xxx + non-linear-gain: yes diff --git a/webservice.py b/webservice.py new file mode 100644 index 0000000000000000000000000000000000000000..b708b585a91290ac0aab9a29be918f9110a63974 --- /dev/null +++ b/webservice.py @@ -0,0 +1,84 @@ +import asyncio +import ast +import copy +import glob +import socket +import subprocess +import sys +import yaml +import zmq +import zmq.auth.thread + +async def run_cmd(cmd, socket, mode): + if mode == "prod": + ret = subprocess.run(cmd) + socket.send("Started calibration: {}".format(ret.returncode).encode()) + else: + print(" ".join(cmd)) + socket.send("Simulated calibratioN: {}".format(" ".join(cmd)).encode()) + +async def server_runner(port, conf_file, mode): + context = zmq.Context() + auth = zmq.auth.thread.ThreadAuthenticator(context) + #auth.start() + #auth.allow(*CFG.SERVICE['ZMQ']['IPS']) + + socket = context.socket(zmq.REP) + socket.zap_domain = b'global' + socket.bind("tcp://*:{}".format(port)) + + with open(conf_file, "r") as f: + det_mapping = yaml.load(f.read()) + + while True: + response = socket.recv_multipart() + if isinstance(response, list) and len(response) == 1: + try: + response = eval(response[0]) #ast.literal_eval(response[0]) + except: + socket.send("Unknwon request".encode()) + continue + + action, sase, instrument, cycle, proposal, runnr = response + print(action, sase, instrument, cycle, proposal, runnr) + if action != "correct": + socket.send("Unknown action".encode()) + continue + + in_folder = "/gpfs/exfel/exp/{}/{}/p{}/raw".format(instrument, cycle, proposal) + out_folder = "/gpfs/exfel/exp/{}/{}/p{}/proc".format(instrument, cycle, proposal) + detectors = {} + for detector, config in det_mapping[instrument].items(): + # check if we find files according to mapping in raw run folder + fl = glob.glob("{}/r{:04d}/RAW-*{}*.h5".format(in_folder, int(runnr), config["inset"])) + print("{}/r{:04d}/RAW-*{}*.h5".format(in_folder, int(runnr), config["inset"])) + print("Found {} files".format(len(fl))) + if len(fl): + thisconf = copy.copy(config) + thisconf["in-folder"] = in_folder + thisconf["out-folder"] = out_folder + thisconf["run"] = runnr + del thisconf["inset"] # don't need this for xfel-calibrate + detectors[detector] = thisconf + + if len(detectors) == 0: + socket.send("No matching tasks found".encode()) + continue + for detector, config in detectors.items(): + cmd = ["xfel-calibrate", "CORRECT"] + cmd += [detector] + for key, value in config.items(): + if not isinstance(value, bool): + cmd += ["--{}".format(key), str(value)] + else: + cmd += ["--{}".format(key)] + await run_cmd(cmd, socket, mode) + + +if __name__ == "__main__": + port = sys.argv[1] + conf_file = sys.argv[2] + mode = sys.argv[3] + loop = asyncio.get_event_loop() + loop.run_until_complete(server_runner(port, conf_file, mode)) + loop.close()