diff --git a/webservice.py b/webservice.py deleted file mode 100644 index b708b585a91290ac0aab9a29be918f9110a63974..0000000000000000000000000000000000000000 --- a/webservice.py +++ /dev/null @@ -1,84 +0,0 @@ -import asyncio -import ast -import copy -import glob -import socket -import subprocess -import sys -import yaml -import zmq -import zmq.auth.thread - -async def run_cmd(cmd, socket, mode): - if mode == "prod": - ret = subprocess.run(cmd) - socket.send("Started calibration: {}".format(ret.returncode).encode()) - else: - print(" ".join(cmd)) - socket.send("Simulated calibratioN: {}".format(" ".join(cmd)).encode()) - -async def server_runner(port, conf_file, mode): - context = zmq.Context() - auth = zmq.auth.thread.ThreadAuthenticator(context) - #auth.start() - #auth.allow(*CFG.SERVICE['ZMQ']['IPS']) - - socket = context.socket(zmq.REP) - socket.zap_domain = b'global' - socket.bind("tcp://*:{}".format(port)) - - with open(conf_file, "r") as f: - det_mapping = yaml.load(f.read()) - - while True: - response = socket.recv_multipart() - if isinstance(response, list) and len(response) == 1: - try: - response = eval(response[0]) #ast.literal_eval(response[0]) - except: - socket.send("Unknwon request".encode()) - continue - - action, sase, instrument, cycle, proposal, runnr = response - print(action, sase, instrument, cycle, proposal, runnr) - if action != "correct": - socket.send("Unknown action".encode()) - continue - - in_folder = "/gpfs/exfel/exp/{}/{}/p{}/raw".format(instrument, cycle, proposal) - out_folder = "/gpfs/exfel/exp/{}/{}/p{}/proc".format(instrument, cycle, proposal) - detectors = {} - for detector, config in det_mapping[instrument].items(): - # check if we find files according to mapping in raw run folder - fl = glob.glob("{}/r{:04d}/RAW-*{}*.h5".format(in_folder, int(runnr), config["inset"])) - print("{}/r{:04d}/RAW-*{}*.h5".format(in_folder, int(runnr), config["inset"])) - print("Found {} files".format(len(fl))) - if len(fl): - thisconf = copy.copy(config) - thisconf["in-folder"] = in_folder - thisconf["out-folder"] = out_folder - thisconf["run"] = runnr - del thisconf["inset"] # don't need this for xfel-calibrate - detectors[detector] = thisconf - - if len(detectors) == 0: - socket.send("No matching tasks found".encode()) - continue - for detector, config in detectors.items(): - cmd = ["xfel-calibrate", "CORRECT"] - cmd += [detector] - for key, value in config.items(): - if not isinstance(value, bool): - cmd += ["--{}".format(key), str(value)] - else: - cmd += ["--{}".format(key)] - await run_cmd(cmd, socket, mode) - - -if __name__ == "__main__": - port = sys.argv[1] - conf_file = sys.argv[2] - mode = sys.argv[3] - loop = asyncio.get_event_loop() - loop.run_until_complete(server_runner(port, conf_file, mode)) - loop.close() diff --git a/webservice/errors.py b/webservice/errors.py new file mode 100644 index 0000000000000000000000000000000000000000..d16e9c817e23332e74e5ea93e2c51a89210d31fd --- /dev/null +++ b/webservice/errors.py @@ -0,0 +1,17 @@ +class Errors: + + REQUEST_FAILED = "FAILED: request could not be parsed" + UNKNOWN_ACTION = "FAILED: action {} is not known!" + PATH_NOT_FOUND = "FAILED: run at {} not found!" + CONFIG_ERROR = "FAILED: configuration at {} couldn't be parsed, using default!" + CONFIG_NOT_FOUND = "WARN: configuration at {} not found, using default!" + NO_DEFAULT_CONFIG = "FAILED: No default config for instrument {}, detector {} exists!" + JOB_LAUNCH_FAILED = "FAILED: Failed executing command: {}" + UPLOAD_CONFIG_FAILED = "FAILED: Config for cycle {}, proposal {} could not be uploaded!" + + +class Success: + + UPLOADED_CONFIG = "SUCCESS: Uploaded config for cycle {}, proposal {}" + START_CORRECTION = "SUCCESS: Started correction: {}" + START_CORRECTION_SIM = "SUCCESS: Started simulated correction: {}" diff --git a/webservice/test_web.py b/webservice/test_web.py new file mode 100644 index 0000000000000000000000000000000000000000..3f19c1e351572673e39045a1484869da529a866c --- /dev/null +++ b/webservice/test_web.py @@ -0,0 +1,31 @@ +import sys +import urllib.parse +import zmq +con = zmq.Context() +socket = con.socket(zmq.REQ) +con = socket.connect("tcp://localhost:5003") +if sys.argv[1] == "correct": + msg = "','".join(["correct", "SASE1", "SPB", "201831", "900039", "431"]) +if sys.argv[1] == "upload-yaml": + yaml = """ +SPB: + AGIPD: + inset: AGIPD + calfile: foo + mem-cells: 176 + blc-noise: yes + blc-noise-threshold: 100 + instance: AGIPD1M1 + JUNGFRAU: + inset: DA05 + calfile: xxx +FXE: + LPD: + inset: LPD + calfile: xxx + non-linear-gain: yes + +""" + msg = "','".join(["upload-yaml", "SASE1", "SPB", "201831", "900039", urllib.parse.quote_plus(yaml)]) +socket.send("['{}']".format(msg).encode()) + diff --git a/webservice/web.log b/webservice/web.log new file mode 100644 index 0000000000000000000000000000000000000000..c1cb13b5cf5e0e5197753e599a23788c346202f1 --- /dev/null +++ b/webservice/web.log @@ -0,0 +1,23 @@ +INFO:root:Config repo is initialized + 693 MainThread Config repo is initialized +2019-02-13 16:54:21,433 - asyncio - DEBUG - Using selector: EpollSelector +2019-02-13 16:54:21,437 - git.cmd - DEBUG - Popen(['git', 'pull', '-v', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None) +2019-02-13 16:54:21,929 - root - INFO - Config repo is initialized +2019-02-13 16:54:24,637 - root - DEBUG - SUCCESS: Started simulated correction: ['xfel-calibrate', 'CORRECT', 'AGIPD', '--calfile', 'foo', '--mem-cells', '176', '--blc-noise', '--blc-noise-threshold', '100', '--instance', 'AGIPD1M1', '--in-folder', '/gpfs/exfel/exp/SPB/201831/p900039/raw', '--out-folder', '/gpfs/exfel/exp/SPB/201831/p900039/proc', '--run', '431'] +2019-02-13 16:55:41,116 - asyncio - DEBUG - Using selector: EpollSelector +2019-02-13 16:55:41,119 - git.cmd - DEBUG - Popen(['git', 'pull', '-v', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None) +2019-02-13 16:55:41,603 - root - INFO - Config repo is initialized +2019-02-13 16:55:56,725 - asyncio - DEBUG - Using selector: EpollSelector +2019-02-13 16:55:56,727 - git.cmd - DEBUG - Popen(['git', 'pull', '-v', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None) +2019-02-13 16:56:05,292 - root - INFO - Config repo is initialized +2019-02-13 16:56:05,397 - root - DEBUG - SUCCESS: Started simulated correction: ['xfel-calibrate', 'CORRECT', 'AGIPD', '--calfile', 'foo', '--mem-cells', '176', '--blc-noise', '--blc-noise-threshold', '100', '--instance', 'AGIPD1M1', '--in-folder', '/gpfs/exfel/exp/SPB/201831/p900039/raw', '--out-folder', '/gpfs/exfel/exp/SPB/201831/p900039/proc', '--run', '431'] +2019-02-13 16:56:45,454 - asyncio - DEBUG - Using selector: EpollSelector +2019-02-13 16:56:45,457 - git.cmd - DEBUG - Popen(['git', 'pull', '-v', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None) +2019-02-13 16:56:45,918 - root - INFO - Config repo is initialized +2019-02-13 16:56:49,164 - root - DEBUG - SUCCESS: Started simulated correction: ['xfel-calibrate', 'CORRECT', 'AGIPD', '--calfile', 'foo', '--mem-cells', '176', '--blc-noise', '--blc-noise-threshold', '100', '--instance', 'AGIPD1M1', '--in-folder', '/gpfs/exfel/exp/SPB/201831/p900039/raw', '--out-folder', '/gpfs/exfel/exp/SPB/201831/p900039/proc', '--run', '431'] +2019-02-13 16:56:52,274 - root - DEBUG - SUCCESS: Started simulated correction: ['xfel-calibrate', 'CORRECT', 'AGIPD', '--calfile', 'foo', '--mem-cells', '176', '--blc-noise', '--blc-noise-threshold', '100', '--instance', 'AGIPD1M1', '--in-folder', '/gpfs/exfel/exp/SPB/201831/p900039/raw', '--out-folder', '/gpfs/exfel/exp/SPB/201831/p900039/proc', '--run', '431'] +2019-02-13 16:57:38,272 - git.cmd - DEBUG - Popen(['git', 'pull', '-v', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None) +2019-02-13 16:57:38,794 - git.cmd - DEBUG - Popen(['git', 'cat-file', '--batch-check'], cwd=/home/haufs/calibration_config, universal_newlines=False, shell=None) +2019-02-13 16:57:38,814 - git.cmd - DEBUG - Popen(['git', 'cat-file', '--batch'], cwd=/home/haufs/calibration_config, universal_newlines=False, shell=None) +2019-02-13 16:57:38,825 - git.cmd - DEBUG - Popen(['git', 'push', '--porcelain', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None) +2019-02-13 16:57:40,961 - root - INFO - SUCCESS: Uploaded config for cycle 201831, proposal 900039 diff --git a/webservice/webservice.py b/webservice/webservice.py new file mode 100644 index 0000000000000000000000000000000000000000..5d18108435966797330dd9f0d97299d65972f40a --- /dev/null +++ b/webservice/webservice.py @@ -0,0 +1,153 @@ +import asyncio +import ast +import copy +from datetime import datetime +from git import Repo, InvalidGitRepositoryError +import glob +import logging +import os +import socket +import subprocess +import sys +import urllib.parse +import yaml +import zmq +import zmq.auth.thread + +from errors import Errors, Success + + +def init_config_repo(config): + os.makedirs(config['local-path'], exist_ok=True) + # check if it is a repo + try: + repo = Repo(config['local-path']) + except InvalidGitRepositoryError: + repo = Repo.clone_from(config['url'], config['local-path']) + repo.remote().pull() + logging.info("Config repo is initialized") + + +async def upload_config(socket, config, yaml, instrument, cycle, proposal): + repo = Repo(config['local-path']) + # assure we are on most current version + repo.remote().pull() + prop_dir = os.path.join(repo.working_tree_dir, cycle) + os.makedirs(prop_dir, exist_ok=True) + with open("{}/{}.yaml".format(prop_dir, proposal), "w") as f: + f.write(yaml) + fpath = "{}/{}.yaml".format(prop_dir, proposal) + repo.index.add([fpath]) + repo.index.commit("Update to proposal YAML: {}".format(datetime.now().isoformat())) + repo.remote().push() + logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal)) + socket.send(Success.UPLOADED_CONFIG.format(cycle, proposal).encode()) + + +async def run_cmd(cmd, socket, mode): + if mode == "prod": + ret = subprocess.run(cmd) + if ret.returncode == 0: + logging.info(Success.START_CORRECTION.format(cmd)) + socket.send(Success.START_CORRECTION.format(cmd).encode()) + else: + logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode)) + socket.send(Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode).encode()) + + else: + logging.debug(Success.START_CORRECTION_SIM.format(cmd)) + socket.send(Success.START_CORRECTION_SIM.format(cmd).encode()) + + +async def server_runner(conf_file, mode): + + with open(conf_file, "r") as f: + config = yaml.load(f.read()) + + init_config_repo(config['config-repo']) + + context = zmq.Context() + auth = zmq.auth.thread.ThreadAuthenticator(context) + #auth.start() + #auth.allow(*CFG.SERVICE['ZMQ']['IPS']) + + socket = context.socket(zmq.REP) + socket.zap_domain = b'global' + socket.bind("{}:{}".format(config['web-service']['bind-to'], config['web-service']['port'])) + + while True: + response = socket.recv_multipart() + if isinstance(response, list) and len(response) == 1: + try: + response = eval(response[0]) #ast.literal_eval(response[0]) + except Exception as e: + logging.error(str(e)) + socket.send(Errors.REQUEST_FAILED.encode()) + continue + action, payload = response[0], response[1:] + if action not in ["correct", 'upload-yaml']: + logging.warn(Errors.UNKNOWN_ACTION.format(action)) + socket.send(Errors.UNKNOWN_ACTION.format(action).encode()) + continue + + if action == 'correct': + sase, instrument, cycle, proposal, runnr = payload + specific_conf_file = "{}/{}/{}.yaml".format(config['config-repo']['local-path'], cycle, proposal) + if os.path.exists(specific_conf_file): + with open(specific_conf_file, "r") as f: + pconf = yaml.load(f.read()) + else: + print("Using default file, as {} does not exist".format(specific_conf_file)) + default_file = "{}/default.yaml".format(config['config-repo']['local-path']) + with open(default_file, "r") as f: + pconf = yaml.load(f.read()) + + in_folder = "/gpfs/exfel/exp/{}/{}/p{}/raw".format(instrument, cycle, proposal) + out_folder = "/gpfs/exfel/exp/{}/{}/p{}/proc".format(instrument, cycle, proposal) + detectors = {} + rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) + if not os.path.exists(rpath): + logging.error(Errors.PATH_NOT_FOUND.format(rpath)) + socket.send(Errors.PATH_NOT_FOUND.format(rpath).encode()) + for detector, dconfig in pconf[instrument].items(): + # check if we find files according to mapping in raw run folder + fl = glob.glob("{}/RAW-*{}*.h5".format(rpath, dconfig["inset"])) + if len(fl): + thisconf = copy.copy(dconfig) + thisconf["in-folder"] = in_folder + thisconf["out-folder"] = out_folder + thisconf["run"] = runnr + del thisconf["inset"] # don't need this for xfel-calibrate + detectors[detector] = thisconf + + if len(detectors) == 0: + logging.warn(Errors.NOTHING_TO_DO.format(rpath)) + socket.send(Errors.NOTHING_TO_DO.format(rpath).encode()) + continue + for detector, dconfig in detectors.items(): + cmd = ["xfel-calibrate", "CORRECT"] + cmd += [detector] + for key, value in dconfig.items(): + if not isinstance(value, bool): + cmd += ["--{}".format(key), str(value)] + else: + cmd += ["--{}".format(key)] + await run_cmd(cmd, socket, mode) + + if action == 'upload-yaml': + sase, instrument, cycle, proposal, this_yaml = payload + this_yaml = urllib.parse.unquote_plus(this_yaml) + await upload_config(socket, config['config-repo'], this_yaml, instrument, cycle, proposal) + + +if __name__ == "__main__": + conf_file = sys.argv[1] + mode = sys.argv[2] + if len(sys.argv) > 3: + logfile = sys.arv[3] + else: + logfile = "./web.log" + logging.basicConfig(filename=logfile, level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + loop = asyncio.get_event_loop() + loop.run_until_complete(server_runner(conf_file, mode)) + loop.close() diff --git a/webservice/webservice.yaml b/webservice/webservice.yaml new file mode 100644 index 0000000000000000000000000000000000000000..706128f56e2ae4926c498741acd44d712f6378a1 --- /dev/null +++ b/webservice/webservice.yaml @@ -0,0 +1,7 @@ +config-repo: + url: https://haufs:E8wEEeAnzGmdsxEZWk8N@git.xfel.eu/gitlab/detectors/calibration_configurations.git + local-path: /home/haufs/calibration_config/ + +web-service: + port: 5003 + bind-to: tcp://*