Skip to content
Snippets Groups Projects
Commit 5c9a5587 authored by Steffen Hauf's avatar Steffen Hauf
Browse files

Move webservice to own directory

parent 28af2814
No related branches found
No related tags found
1 merge request!44add a web service to launch calibration from MDC
import asyncio
import ast
import copy
import glob
import socket
import subprocess
import sys
import yaml
import zmq
import zmq.auth.thread
async def run_cmd(cmd, socket, mode):
if mode == "prod":
ret = subprocess.run(cmd)
socket.send("Started calibration: {}".format(ret.returncode).encode())
else:
print(" ".join(cmd))
socket.send("Simulated calibratioN: {}".format(" ".join(cmd)).encode())
async def server_runner(port, conf_file, mode):
context = zmq.Context()
auth = zmq.auth.thread.ThreadAuthenticator(context)
#auth.start()
#auth.allow(*CFG.SERVICE['ZMQ']['IPS'])
socket = context.socket(zmq.REP)
socket.zap_domain = b'global'
socket.bind("tcp://*:{}".format(port))
with open(conf_file, "r") as f:
det_mapping = yaml.load(f.read())
while True:
response = socket.recv_multipart()
if isinstance(response, list) and len(response) == 1:
try:
response = eval(response[0]) #ast.literal_eval(response[0])
except:
socket.send("Unknwon request".encode())
continue
action, sase, instrument, cycle, proposal, runnr = response
print(action, sase, instrument, cycle, proposal, runnr)
if action != "correct":
socket.send("Unknown action".encode())
continue
in_folder = "/gpfs/exfel/exp/{}/{}/p{}/raw".format(instrument, cycle, proposal)
out_folder = "/gpfs/exfel/exp/{}/{}/p{}/proc".format(instrument, cycle, proposal)
detectors = {}
for detector, config in det_mapping[instrument].items():
# check if we find files according to mapping in raw run folder
fl = glob.glob("{}/r{:04d}/RAW-*{}*.h5".format(in_folder, int(runnr), config["inset"]))
print("{}/r{:04d}/RAW-*{}*.h5".format(in_folder, int(runnr), config["inset"]))
print("Found {} files".format(len(fl)))
if len(fl):
thisconf = copy.copy(config)
thisconf["in-folder"] = in_folder
thisconf["out-folder"] = out_folder
thisconf["run"] = runnr
del thisconf["inset"] # don't need this for xfel-calibrate
detectors[detector] = thisconf
if len(detectors) == 0:
socket.send("No matching tasks found".encode())
continue
for detector, config in detectors.items():
cmd = ["xfel-calibrate", "CORRECT"]
cmd += [detector]
for key, value in config.items():
if not isinstance(value, bool):
cmd += ["--{}".format(key), str(value)]
else:
cmd += ["--{}".format(key)]
await run_cmd(cmd, socket, mode)
if __name__ == "__main__":
port = sys.argv[1]
conf_file = sys.argv[2]
mode = sys.argv[3]
loop = asyncio.get_event_loop()
loop.run_until_complete(server_runner(port, conf_file, mode))
loop.close()
class Errors:
REQUEST_FAILED = "FAILED: request could not be parsed"
UNKNOWN_ACTION = "FAILED: action {} is not known!"
PATH_NOT_FOUND = "FAILED: run at {} not found!"
CONFIG_ERROR = "FAILED: configuration at {} couldn't be parsed, using default!"
CONFIG_NOT_FOUND = "WARN: configuration at {} not found, using default!"
NO_DEFAULT_CONFIG = "FAILED: No default config for instrument {}, detector {} exists!"
JOB_LAUNCH_FAILED = "FAILED: Failed executing command: {}"
UPLOAD_CONFIG_FAILED = "FAILED: Config for cycle {}, proposal {} could not be uploaded!"
class Success:
UPLOADED_CONFIG = "SUCCESS: Uploaded config for cycle {}, proposal {}"
START_CORRECTION = "SUCCESS: Started correction: {}"
START_CORRECTION_SIM = "SUCCESS: Started simulated correction: {}"
import sys
import urllib.parse
import zmq
con = zmq.Context()
socket = con.socket(zmq.REQ)
con = socket.connect("tcp://localhost:5003")
if sys.argv[1] == "correct":
msg = "','".join(["correct", "SASE1", "SPB", "201831", "900039", "431"])
if sys.argv[1] == "upload-yaml":
yaml = """
SPB:
AGIPD:
inset: AGIPD
calfile: foo
mem-cells: 176
blc-noise: yes
blc-noise-threshold: 100
instance: AGIPD1M1
JUNGFRAU:
inset: DA05
calfile: xxx
FXE:
LPD:
inset: LPD
calfile: xxx
non-linear-gain: yes
"""
msg = "','".join(["upload-yaml", "SASE1", "SPB", "201831", "900039", urllib.parse.quote_plus(yaml)])
socket.send("['{}']".format(msg).encode())
INFO:root:Config repo is initialized
693 MainThread Config repo is initialized
2019-02-13 16:54:21,433 - asyncio - DEBUG - Using selector: EpollSelector
2019-02-13 16:54:21,437 - git.cmd - DEBUG - Popen(['git', 'pull', '-v', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None)
2019-02-13 16:54:21,929 - root - INFO - Config repo is initialized
2019-02-13 16:54:24,637 - root - DEBUG - SUCCESS: Started simulated correction: ['xfel-calibrate', 'CORRECT', 'AGIPD', '--calfile', 'foo', '--mem-cells', '176', '--blc-noise', '--blc-noise-threshold', '100', '--instance', 'AGIPD1M1', '--in-folder', '/gpfs/exfel/exp/SPB/201831/p900039/raw', '--out-folder', '/gpfs/exfel/exp/SPB/201831/p900039/proc', '--run', '431']
2019-02-13 16:55:41,116 - asyncio - DEBUG - Using selector: EpollSelector
2019-02-13 16:55:41,119 - git.cmd - DEBUG - Popen(['git', 'pull', '-v', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None)
2019-02-13 16:55:41,603 - root - INFO - Config repo is initialized
2019-02-13 16:55:56,725 - asyncio - DEBUG - Using selector: EpollSelector
2019-02-13 16:55:56,727 - git.cmd - DEBUG - Popen(['git', 'pull', '-v', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None)
2019-02-13 16:56:05,292 - root - INFO - Config repo is initialized
2019-02-13 16:56:05,397 - root - DEBUG - SUCCESS: Started simulated correction: ['xfel-calibrate', 'CORRECT', 'AGIPD', '--calfile', 'foo', '--mem-cells', '176', '--blc-noise', '--blc-noise-threshold', '100', '--instance', 'AGIPD1M1', '--in-folder', '/gpfs/exfel/exp/SPB/201831/p900039/raw', '--out-folder', '/gpfs/exfel/exp/SPB/201831/p900039/proc', '--run', '431']
2019-02-13 16:56:45,454 - asyncio - DEBUG - Using selector: EpollSelector
2019-02-13 16:56:45,457 - git.cmd - DEBUG - Popen(['git', 'pull', '-v', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None)
2019-02-13 16:56:45,918 - root - INFO - Config repo is initialized
2019-02-13 16:56:49,164 - root - DEBUG - SUCCESS: Started simulated correction: ['xfel-calibrate', 'CORRECT', 'AGIPD', '--calfile', 'foo', '--mem-cells', '176', '--blc-noise', '--blc-noise-threshold', '100', '--instance', 'AGIPD1M1', '--in-folder', '/gpfs/exfel/exp/SPB/201831/p900039/raw', '--out-folder', '/gpfs/exfel/exp/SPB/201831/p900039/proc', '--run', '431']
2019-02-13 16:56:52,274 - root - DEBUG - SUCCESS: Started simulated correction: ['xfel-calibrate', 'CORRECT', 'AGIPD', '--calfile', 'foo', '--mem-cells', '176', '--blc-noise', '--blc-noise-threshold', '100', '--instance', 'AGIPD1M1', '--in-folder', '/gpfs/exfel/exp/SPB/201831/p900039/raw', '--out-folder', '/gpfs/exfel/exp/SPB/201831/p900039/proc', '--run', '431']
2019-02-13 16:57:38,272 - git.cmd - DEBUG - Popen(['git', 'pull', '-v', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None)
2019-02-13 16:57:38,794 - git.cmd - DEBUG - Popen(['git', 'cat-file', '--batch-check'], cwd=/home/haufs/calibration_config, universal_newlines=False, shell=None)
2019-02-13 16:57:38,814 - git.cmd - DEBUG - Popen(['git', 'cat-file', '--batch'], cwd=/home/haufs/calibration_config, universal_newlines=False, shell=None)
2019-02-13 16:57:38,825 - git.cmd - DEBUG - Popen(['git', 'push', '--porcelain', 'origin'], cwd=/home/haufs/calibration_config, universal_newlines=True, shell=None)
2019-02-13 16:57:40,961 - root - INFO - SUCCESS: Uploaded config for cycle 201831, proposal 900039
import asyncio
import ast
import copy
from datetime import datetime
from git import Repo, InvalidGitRepositoryError
import glob
import logging
import os
import socket
import subprocess
import sys
import urllib.parse
import yaml
import zmq
import zmq.auth.thread
from errors import Errors, Success
def init_config_repo(config):
os.makedirs(config['local-path'], exist_ok=True)
# check if it is a repo
try:
repo = Repo(config['local-path'])
except InvalidGitRepositoryError:
repo = Repo.clone_from(config['url'], config['local-path'])
repo.remote().pull()
logging.info("Config repo is initialized")
async def upload_config(socket, config, yaml, instrument, cycle, proposal):
repo = Repo(config['local-path'])
# assure we are on most current version
repo.remote().pull()
prop_dir = os.path.join(repo.working_tree_dir, cycle)
os.makedirs(prop_dir, exist_ok=True)
with open("{}/{}.yaml".format(prop_dir, proposal), "w") as f:
f.write(yaml)
fpath = "{}/{}.yaml".format(prop_dir, proposal)
repo.index.add([fpath])
repo.index.commit("Update to proposal YAML: {}".format(datetime.now().isoformat()))
repo.remote().push()
logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal))
socket.send(Success.UPLOADED_CONFIG.format(cycle, proposal).encode())
async def run_cmd(cmd, socket, mode):
if mode == "prod":
ret = subprocess.run(cmd)
if ret.returncode == 0:
logging.info(Success.START_CORRECTION.format(cmd))
socket.send(Success.START_CORRECTION.format(cmd).encode())
else:
logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode))
socket.send(Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode).encode())
else:
logging.debug(Success.START_CORRECTION_SIM.format(cmd))
socket.send(Success.START_CORRECTION_SIM.format(cmd).encode())
async def server_runner(conf_file, mode):
with open(conf_file, "r") as f:
config = yaml.load(f.read())
init_config_repo(config['config-repo'])
context = zmq.Context()
auth = zmq.auth.thread.ThreadAuthenticator(context)
#auth.start()
#auth.allow(*CFG.SERVICE['ZMQ']['IPS'])
socket = context.socket(zmq.REP)
socket.zap_domain = b'global'
socket.bind("{}:{}".format(config['web-service']['bind-to'], config['web-service']['port']))
while True:
response = socket.recv_multipart()
if isinstance(response, list) and len(response) == 1:
try:
response = eval(response[0]) #ast.literal_eval(response[0])
except Exception as e:
logging.error(str(e))
socket.send(Errors.REQUEST_FAILED.encode())
continue
action, payload = response[0], response[1:]
if action not in ["correct", 'upload-yaml']:
logging.warn(Errors.UNKNOWN_ACTION.format(action))
socket.send(Errors.UNKNOWN_ACTION.format(action).encode())
continue
if action == 'correct':
sase, instrument, cycle, proposal, runnr = payload
specific_conf_file = "{}/{}/{}.yaml".format(config['config-repo']['local-path'], cycle, proposal)
if os.path.exists(specific_conf_file):
with open(specific_conf_file, "r") as f:
pconf = yaml.load(f.read())
else:
print("Using default file, as {} does not exist".format(specific_conf_file))
default_file = "{}/default.yaml".format(config['config-repo']['local-path'])
with open(default_file, "r") as f:
pconf = yaml.load(f.read())
in_folder = "/gpfs/exfel/exp/{}/{}/p{}/raw".format(instrument, cycle, proposal)
out_folder = "/gpfs/exfel/exp/{}/{}/p{}/proc".format(instrument, cycle, proposal)
detectors = {}
rpath = "{}/r{:04d}/".format(in_folder, int(runnr))
if not os.path.exists(rpath):
logging.error(Errors.PATH_NOT_FOUND.format(rpath))
socket.send(Errors.PATH_NOT_FOUND.format(rpath).encode())
for detector, dconfig in pconf[instrument].items():
# check if we find files according to mapping in raw run folder
fl = glob.glob("{}/RAW-*{}*.h5".format(rpath, dconfig["inset"]))
if len(fl):
thisconf = copy.copy(dconfig)
thisconf["in-folder"] = in_folder
thisconf["out-folder"] = out_folder
thisconf["run"] = runnr
del thisconf["inset"] # don't need this for xfel-calibrate
detectors[detector] = thisconf
if len(detectors) == 0:
logging.warn(Errors.NOTHING_TO_DO.format(rpath))
socket.send(Errors.NOTHING_TO_DO.format(rpath).encode())
continue
for detector, dconfig in detectors.items():
cmd = ["xfel-calibrate", "CORRECT"]
cmd += [detector]
for key, value in dconfig.items():
if not isinstance(value, bool):
cmd += ["--{}".format(key), str(value)]
else:
cmd += ["--{}".format(key)]
await run_cmd(cmd, socket, mode)
if action == 'upload-yaml':
sase, instrument, cycle, proposal, this_yaml = payload
this_yaml = urllib.parse.unquote_plus(this_yaml)
await upload_config(socket, config['config-repo'], this_yaml, instrument, cycle, proposal)
if __name__ == "__main__":
conf_file = sys.argv[1]
mode = sys.argv[2]
if len(sys.argv) > 3:
logfile = sys.arv[3]
else:
logfile = "./web.log"
logging.basicConfig(filename=logfile, level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
loop = asyncio.get_event_loop()
loop.run_until_complete(server_runner(conf_file, mode))
loop.close()
config-repo:
url: https://haufs:E8wEEeAnzGmdsxEZWk8N@git.xfel.eu/gitlab/detectors/calibration_configurations.git
local-path: /home/haufs/calibration_config/
web-service:
port: 5003
bind-to: tcp://*
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment