Skip to content
Snippets Groups Projects
Commit 19d161a1 authored by Steffen Hauf's avatar Steffen Hauf
Browse files

add a web service to launch calibration from MDC

See merge request detectors/pycalibration!44
parents 60e113ad 7f8b8ee0
No related branches found
No related tags found
1 merge request!44add a web service to launch calibration from MDC
......@@ -30,3 +30,6 @@ docs/source/_notebooks/*
docs/source/test_rsts/*
docs/source/test_results.rst
docs/source/_static/reports
webservice/webservice.yaml
webservice/*.log
webservice/*sqlite
Offline Calibration Webservice
==============================
The offline calibration webservice interacts with the Metadata Catalogue (MDC),
such that migration of data to the offline cluster automatically triggers
calibration jobs on relevant files.
Installation
------------
Installation should only be performed by trained personal.
**Prerequisites**
The service needs to be installed under a functional user account which
* has read permission to the *raw* data folder on the Maxwell cluster
* has write permission to the *proc* folders for outputting corrected data
* is allowed to launch SLURM jobs on the cluster
The hosting system needs to be accessible via ZMQ calls from the MDC.
This requires appropriate DMZ settings. Additionally, it needs to be able
to interact with the MDC via the MDC client interface
**Installation of Dependencies**
The installation requirements can be found in the *requirements.txt* file.
Additionally, the *xfel-calibrate* environment needs to be installed:
1. clone the *pycalibration* repo into a directory of your choice:
``` bash
git clone https://git.xfel.eu/gitlab/detectors/pycalibration.git .
```
2. pick the python environment to install into. On Maxwell the anaconda/3
environment will work:
``` bash
module load anaconda/3
```
3. install the *xfel-calibrate* environment
``` bash
pip install --user -r requirements.txt
```
4. some correction notebooks require pyDetLib. It requires manual installation in
a non-Karabo python environment
``` bash
mkdir pydetlib
cd pydetlib
git clone https://git.xfel.eu/gitlab/karaboDevices/pyDetLib.git .
pip install --user ./lib/requirements.txt
pip install --user pycuda
pip install --user ./lib/
cd ..
5. install the separate requirements for the webservice:
``` bash
cd webservice
pip install --user -r requirements.txt
```
6. install the metadata_client library, according to instructions at
https://git.xfel.eu/gitlab/ITDM/metadata_client
You are now good to go.
Configuration
-------------
Configuration is done through the *webservice.yaml* file in the webservice directory.
On a new installation you will likely need to change the following parameters.
In the **config-repo** section, the configuration repository needs to be configured:
``` YAML
config-repo:
url: https://git.xfel.eu/gitlab/detectors/calibration_configurations.git
local-path: /home/haufs/calibration_config/
```
Here you should prepend the *url* entry with a gitlab access token, that provides access
to the calibration_configurations repository.
In the **web-service** section, the webservice itself is configured:
``` YAML
web-service:
port: 5555
bind-to: tcp://*
allowed-ips: '111.222.222.111', ''111.222.222.112'
job-db: ./webservice_jobs.sqlite
job-update-interval: 30
job-timeout: 3600
```
In case you want to use authentication, add a list of *allowed-ips*.
In the **metadata-client** section, the client interface to the MDC is configured:
``` YAML
metadata-client:
user-id:
user-secret:
user-email:
metadata-web-app-url: 'https://in.xfel.eu/metadata'
metadata-web-app-url: 'https://in.xfel.eu/metadata'
token-url: 'https://in.xfel.eu/metadata/oauth/token'
refresh-url: 'https://in.xfel.eu/metadata/oauth/token'
auth-url: 'https://in.xfel.eu/metadata/oauth/authorize'
scope: ''
base-api-url: 'https://in.xfel.eu/metadata/api/'
```
Here, user-ids, secrets, email etc as provided by ITDM need to be entered.
Finally, sections for the individual *actions* the service knows can be configured.
Currently, the only action is *correct*:
``` YAML
correct:
in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw
out-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/proc
```
It expects templates for the input and output file paths to be provided.
Starting the Service
--------------------
The webservice can be started as a normal python process:
``` bash
python webservice.py --mode [prod | prod-auth | sim]
```
The available modes are:
* prod: production mode
* prod-auth: production mode with authentication on ZMQ
* sim: simulation mode, no actual *xfel-calibrate* jobs are launched.
Use
``` bash
python webservice.py --help
```
to display a list of available options.
\ No newline at end of file
class Errors:
REQUEST_FAILED = "FAILED: request could not be parsed, please contact det-support@xfel.eu"
REQUEST_MALFORMED = "FAILED: request {} is malformed, please contact det-support@xfel.eu"
UNKNOWN_ACTION = "FAILED: action {} is not known!, please contact det-support@xfel.eu"
PATH_NOT_FOUND = "FAILED: run at {} not found!, please contact det-support@xfel.eu"
NOTHING_TO_DO = "WARN: nothing to calibrate in path {}, please contact det-support@xfel.eu"
CONFIG_ERROR = "FAILED: configuration at {} couldn't be parsed, using default!, please contact det-support@xfel.eu"
CONFIG_NOT_FOUND = "WARN: configuration at {} not found, using default!, please contact det-support@xfel.eu"
NO_DEFAULT_CONFIG = "FAILED: No default config for instrument {}, detector {} exists!, please contact det-support@xfel.eu"
JOB_LAUNCH_FAILED = "FAILED: Failed executing command: {}, code: {}, please contact det-support@xfel.eu"
UPLOAD_CONFIG_FAILED = "FAILED: Config for cycle {}, proposal {} could not be uploaded!, please contact det-support@xfel.eu"
TRANSFER_EVAL_FAILED = "FAILED: Evaluating transfer to offline failed for proposal {}, run {}, please contact det-support@xfel.eu"
MDC_RESPONSE = "FAILED: Response error from MDC: {}"
class MDC:
MIGRATION_TIMEOUT = "Timeout waiting for migration. Contact det-support@xfel.eu"
NOTHING_TO_DO = "Nothing to calibrate for this run, copied raw data only"
class Success:
UPLOADED_CONFIG = "SUCCESS: Uploaded config for cycle {}, proposal {}"
START_CORRECTION = "SUCCESS: Started correction: proposal {}, run {}"
START_CORRECTION_SIM = "SUCCESS: Started simulated correction: proposal {}, run {}"
QUEUED = "Queued proposal {}, run {} for offline calibration"
gitpython
zmq
import sys
import urllib.parse
import zmq
con = zmq.Context()
socket = con.socket(zmq.REQ)
con = socket.connect("tcp://localhost:5555")
if sys.argv[1] == "correct":
msg = "','".join(["correct", "20829", "SASE1", "SPB", "201831", "900039", "575"])
if sys.argv[1] == "upload-yaml":
yaml = """
SPB:
AGIPD:
inset: AGIPD
calfile: foo
mem-cells: 176
blc-noise: yes
blc-noise-threshold: 100
instance: AGIPD1M1
JUNGFRAU:
inset: DA02
calfile: xxx
FXE:
LPD:
inset: LPD
calfile: xxx
non-linear-gain: yes
"""
msg = "','".join(["upload-yaml", "SASE1", "SPB", "201831", "900039", urllib.parse.quote_plus(yaml)])
socket.send("['{}']".format(msg).encode())
import argparse
import asyncio
import copy
import getpass
import glob
import logging
import os
import sqlite3
import subprocess
import urllib.parse
from datetime import datetime
import yaml
import zmq
import zmq.asyncio
import zmq.auth.thread
from dateutil import parser as timeparser
from git import Repo, InvalidGitRepositoryError
from metadata_client.metadata_client import MetadataClient
from messages import Errors, MDC, Success
async def init_job_db(config):
""" Initialize the sqlite job database
A new database is created if no pre-existing one is present. A single
table is created: jobs, which has columns:
rid, id, proposal, run, flg, status
:param config: the configuration parsed from the webservice YAML config
:return: a sqlite3 connection instance to the database
"""
logging.info("Initializing database")
conn = sqlite3.connect(config['web-service']['job-db'])
c = conn.cursor()
try:
c.execute("SELECT * FROM jobs")
except:
logging.info("Creating initial job database")
c.execute("CREATE TABLE jobs(rid, id, proposal, run, flg, status)")
return conn
async def init_md_client(config):
""" Initialize an MDC client connection
:param config: the configuration parsed from the webservice YAML config
:return: an MDC client connection
"""
mdconf = config['metadata-client']
client_conn = MetadataClient(client_id=mdconf['user-id'],
client_secret=mdconf['user-secret'],
user_email=mdconf['user-email'],
token_url=mdconf['token-url'],
refresh_url=mdconf['refresh-url'],
auth_url=mdconf['auth-url'],
scope=mdconf['scope'],
base_api_url=mdconf['base-api-url'])
return client_conn
def init_config_repo(config):
""" Make sure the configuration repo is present and up-to-data
:param config: the configuration defined in the `config-repo` section
"""
os.makedirs(config['local-path'], exist_ok=True)
# check if it is a repo
try:
repo = Repo(config['local-path'])
except InvalidGitRepositoryError:
repo = Repo.clone_from(config['url'], config['local-path'])
repo.remote().pull()
logging.info("Config repo is initialized")
async def upload_config(socket, config, yaml, instrument, cycle, proposal):
""" Upload a new configuration YAML
:param socket: ZMQ socket to send reply on
:param config: the configuration defined in the `config-repo` section
of the webservice.yaml configuration.
:param yaml: the YAML contents to update
:param instrument: instrument for which the update is for
:param cycle: the facility cylce the update is for
:param proposal: the proposal the update is for
The YAML contents will be placed into a file at
{config.local-path}/{cycle}/{proposal}.yaml
If it exists it is overwritten and then the new version is pushed to
the configuration git repo.
"""
repo = Repo(config['local-path'])
# assure we are on most current version
repo.remote().pull()
prop_dir = os.path.join(repo.working_tree_dir, cycle)
os.makedirs(prop_dir, exist_ok=True)
with open("{}/{}.yaml".format(prop_dir, proposal), "w") as f:
f.write(yaml)
fpath = "{}/{}.yaml".format(prop_dir, proposal)
repo.index.add([fpath])
repo.index.commit(
"Update to proposal YAML: {}".format(datetime.now().isoformat()))
repo.remote().push()
logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal))
socket.send(Success.UPLOADED_CONFIG.format(cycle, proposal).encode())
async def slurm_status(filter_user=True):
""" Return the status of slurm jobs by calling squeue
:param filter_user: set to true to filter ony jobs from current user
:return: a dictionary indexed by slurm jobid and containing a tuple
of (status, run time) as values.
"""
cmd = ["squeue"]
if filter_user:
cmd += ["-u", getpass.getuser()]
ret = subprocess.run(cmd, stdout=subprocess.PIPE)
if ret.returncode == 0:
rlines = ret.stdout.decode().split("\n")
statii = {}
for r in rlines[1:]:
try:
jobid, _, _, _, status, runtime, _, _ = r.split()
jobid = jobid.strip()
statii[jobid] = status, runtime
except:
pass
return statii
return None
async def update_job_db(config):
""" Update the job database and send out updates to MDC
:param config: configuration parsed from webservice YAML
"""
logging.info("Starting config db handling")
conn = await init_job_db(config)
mdc = await init_md_client(config)
while True:
statii = await slurm_status()
c = conn.cursor()
c.execute("SELECT * FROM jobs")
combined = {}
for r in c.fetchall():
rid, jobid, proposal, run, flg, status = r
if jobid in statii:
slstatus, runtime = statii[jobid]
query = "UPDATE jobs SET status='{status} - {runtime}' WHERE id LIKE '{jobid}'" # noqa
c.execute(query.format(status=slstatus,
runtime=runtime,
jobid=jobid))
elif not "QUEUED" in status:
c.execute("DELETE FROM jobs WHERE id LIKE '{jobid}'".format(
jobid=jobid))
cflg, cstatus = combined.get(rid, ([], []))
cflg.append("A")
cstatus.append('DONE')
combined[rid] = cflg, cstatus
else:
# check for timed out jobs
_, start_time = status.split(": ")
dt = datetime.now() - timeparser.parse(start_time)
if dt.total_seconds() > config['web-service']['job-timeout']:
c.execute(
"DELETE FROM jobs WHERE id LIKE '{jobid}'".format(
jobid=jobid))
cflg, cstatus = combined.get(rid, ([], []))
cflg.append("R")
cstatus.append('PENDING SLURM SCHEDULING / TIMED OUT?')
combined[rid] = cflg, cstatus
conn.commit()
c.execute("SELECT * FROM jobs")
for r in c.fetchall():
rid, jobid, proposal, run, flg, status = r
logging.debug(
"Job {}, proposal {}, run {} has status {}".format(jobid,
proposal,
run,
status))
cflg, cstatus = combined.get(rid, ([], []))
cflg.append(flg)
cstatus.append(status)
combined[rid] = cflg, cstatus
flg_order = {"R": 2, "A": 1, "NA": 0}
for rid, value in combined.items():
flgs, statii = value
flg = max(flgs, key=lambda i: flg_order[i])
msg = "\n".join(statii)
response = mdc.update_run_api(rid, {'flg_cal_data_status': flg,
'cal_pipeline_reply': msg})
if response.status_code != 200:
logging.error(Errors.MDC_RESPONSE.format(response))
await asyncio.sleep(int(config['web-service']['job-update-interval']))
async def copy_untouched_files(file_list, out_folder, run):
""" Copy those files whicha are not touched by calibration to outpot dir
:param file_list: The list of files to copy
:param out_folder: The output folder
:param run: The run which is being handled
Copying is done via an asyncio subprocess call
"""
os.makedirs("{}/r{:04d}".format(out_folder, int(run)), exist_ok=True)
for f in file_list:
of = f.replace("raw", "proc").replace("RAW", "CORR")
cmd = ["rsync", "-av", f, of]
await asyncio.subprocess.create_subprocess_shell(" ".join(cmd))
logging.info("Copying {} to {}".format(f, of))
async def run_correction(conn, cmd, mode, proposal, run, rid):
""" Run a correction command
:param cmd: to run, should be a in list for as expected by subprocess.run
:param mode: "prod" or "sim", in the latter case nothing will be executed
but the command will be logged
:param proposal: proposal the command was issued for
:param run: run the command was issued for
:param: rid: run id in the MDC
Returns a formatted Success or Error message indicating outcome of the
execution.
"""
if mode == "prod":
ret = subprocess.run(cmd, stdout=subprocess.PIPE)
if ret.returncode == 0:
logging.info(Success.START_CORRECTION.format(proposal, run))
# enter jobs in job db
c = conn.cursor()
rstr = ret.stdout.decode()
query = "INSERT INTO jobs VALUES ('{rid}', '{jobid}', '{proposal}', '{run}', 'R', 'QUEUED: {now}')" # noqa
for r in rstr.split("\n"):
if "Submitted job:" in r:
_, jobid = r.split(":")
c.execute(query.format(rid=rid, jobid=jobid.strip(),
proposal=proposal, run=run,
now=datetime.now().isoformat()))
conn.commit()
logging.debug(cmd)
return Success.START_CORRECTION.format(proposal, run)
else:
logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode))
return Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode)
else:
logging.debug(Success.START_CORRECTION_SIM.format(proposal, run))
logging.debug(cmd)
return Success.START_CORRECTION_SIM.format(proposal, run)
async def server_runner(config, mode):
""" The main server loop
The main server loop handles remote requests via a ZMQ interface.
Requests are the form of ZMQ.REQuest and have the format
command, *parms
where *parms is a string-encoded python list as defined by the
commands. The following commands are currently understood:
- correct, with parmeters sase, instrument, cycle, proposal, runnr
where
:param rid: is the runid within the MDC database
:param sase: is the sase beamline
:param instrument: is the instrument
:param cycle: is the facility cycle
:param proposal: is the proposal id
:param runnr: is the run number in integer form, e.g. without leading
"r"
This will trigger a correction process to be launched for that run in
the given cycle and proposal.
- upload-yaml, with parameters sase, instrument, cycle, proposal, yaml
where
:param sase: is the sase beamline
:param instrument: is the instrument
:param cycle: is the facility cycle
:param proposal: is the proposal id
:param yaml: is url-encoded (quotes and spaces) representation of
new YAML file
This will create or replace the existing YAML configuration for the
proposal and cycle with the newly sent one, and then push it to the git
configuration repo.
"""
init_config_repo(config['config-repo'])
job_db = await init_job_db(config)
mdc = await init_md_client(config)
context = zmq.asyncio.Context()
auth = zmq.auth.thread.ThreadAuthenticator(context)
if mode == "prod-auth":
auth.start()
auth.allow(config['web-service']['allowed-ips'])
socket = context.socket(zmq.REP)
socket.zap_domain = b'global'
socket.bind("{}:{}".format(config['web-service']['bind-to'],
config['web-service']['port']))
while True:
response = await socket.recv_multipart()
if isinstance(response, list) and len(response) == 1:
try: # protect against unparseable requests
response = eval(response[0])
except Exception as e:
logging.error(str(e))
socket.send(Errors.REQUEST_FAILED.encode())
continue
if len(
response) < 2: # catch parseable but malformed requests
logging.error(Errors.REQUEST_MALFORMED.format(response))
socket.send(Errors.REQUEST_MALFORMED.format(response).encode())
continue
action, payload = response[0], response[1:]
if action not in ["correct",
'upload-yaml']: # only handle known actions
logging.warn(Errors.UNKNOWN_ACTION.format(action))
socket.send(Errors.UNKNOWN_ACTION.format(action).encode())
continue
async def do_action():
if action == 'correct':
rid, sase, instrument, cycle, proposal, runnr = payload
specific_conf_file = "{}/{}/{}.yaml".format(
config['config-repo']['local-path'], cycle, proposal)
if os.path.exists(specific_conf_file):
with open(specific_conf_file, "r") as f:
pconf = yaml.load(f.read())
else:
default_file = "{}/default.yaml".format(
config['config-repo']['local-path'])
with open(default_file, "r") as f:
pconf = yaml.load(f.read())
in_folder = config['correct']['in-folder'].format(
instrument=instrument, cycle=cycle, proposal=proposal)
out_folder = config['correct']['out-folder'].format(
instrument=instrument, cycle=cycle, proposal=proposal)
detectors = {}
rpath = "{}/r{:04d}/".format(in_folder, int(runnr))
async def wait_on_transfer():
rstr = None
ret = None
max_tries = 60 # 600s
tries = 0
while not os.path.exists(rpath):
await asyncio.sleep(10)
# await asyncio.sleep(1)
while (rstr is None or 'status="online"' in rstr
or ret.returncode != 0):
ret = subprocess.run(
["getfattr", "-n", "user.status", rpath],
stdout=subprocess.PIPE)
rstr = ret.stdout.decode()
await asyncio.sleep(10)
if tries > max_tries:
return False
tries += 1
return ret.returncode == 0
msg = Success.QUEUED.format(proposal, runnr)
socket.send(msg.encode())
transfer_complete = await wait_on_transfer()
if not transfer_complete:
logging.error(
Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr))
msg = MDC.MIGRATION_TIMEOUT
response = mdc.update_run_api(rid,
{'flg_cal_data_status': 'NA',
'cal_pipeline_reply': msg})
if response.status_code != 200:
logging.error(Errors.MDC_RESPONSE.format(response))
corr_file_list = set()
copy_file_list = set(glob.glob("{}/*.h5".format(rpath)))
for detector, dconfig in pconf[instrument].items():
# check if we find files according to
# mapping in raw run folder
fl = glob.glob(
"{}/RAW-*{}*.h5".format(rpath, dconfig["inset"]))
if len(fl):
corr_file_list = corr_file_list.union(set(fl))
thisconf = copy.copy(dconfig)
thisconf["in-folder"] = in_folder
thisconf["out-folder"] = out_folder
thisconf["run"] = runnr
del thisconf[
"inset"] # don't need this for xfel-calibrate
detectors[detector] = thisconf
copy_file_list = copy_file_list.difference(corr_file_list)
asyncio.ensure_future(
copy_untouched_files(copy_file_list, out_folder, runnr))
if len(detectors) == 0:
logging.warn(Errors.NOTHING_TO_DO.format(rpath))
msg = MDC.NOTHING_TO_DO
response = mdc.update_run_api(rid,
{'flg_cal_data_status': 'NA',
'cal_pipeline_reply': msg})
if response.status_code != 200:
logging.error(Errors.MDC_RESPONSE.format(response))
return
status = []
for detector, dconfig in detectors.items():
cmd = ["python", "-m", "xfel_calibrate.calibrate",
detector, "CORRECT"]
for key, value in dconfig.items():
if not isinstance(value, bool):
cmd += ["--{}".format(key), str(value)]
else:
cmd += ["--{}".format(key)]
ret = await run_correction(job_db, cmd, mode, proposal,
runnr, rid)
status.append(ret)
if action == 'upload-yaml':
sase, instrument, cycle, proposal, this_yaml = payload
this_yaml = urllib.parse.unquote_plus(this_yaml)
await upload_config(socket, config['config-repo'], this_yaml,
instrument, cycle, proposal)
try:
asyncio.ensure_future(do_action())
except Exception as e: # actions that fail are only error logged
logging.error(str(e))
parser = argparse.ArgumentParser(
description='Start the calibration webservice')
parser.add_argument('--config-file', type=str, default='./webservice.yaml')
parser.add_argument('--log-file', type=str, default='./web.log')
parser.add_argument('--mode', type=str, default="sim", choices=['sim', 'prod'])
parser.add_argument('--logging', type=str, default="INFO",
choices=['INFO', 'DEBUG', 'ERROR'])
if __name__ == "__main__":
args = vars(parser.parse_args())
conf_file = args["config_file"]
with open(conf_file, "r") as f:
config = yaml.load(f.read())
mode = args["mode"]
logfile = args["log_file"]
fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(filename=logfile,
level=getattr(logging, args['logging']),
format=fmt)
loop = asyncio.get_event_loop()
loop.create_task(update_job_db(config))
loop.run_until_complete(server_runner(config, mode))
loop.close()
config-repo:
url: https://git.xfel.eu/gitlab/detectors/calibration_configurations.git
local-path: /home/haufs/calibration_config/
web-service:
port: 5555
bind-to: tcp://*
allowed-ips:
job-db: ./webservice_jobs.sqlite
job-update-interval: 30
job-timeout: 3600
metadata-client:
user-id:
user-secret:
user-email:
metadata-web-app-url: 'https://in.xfel.eu/metadata'
metadata-web-app-url: 'https://in.xfel.eu/metadata'
token-url: 'https://in.xfel.eu/metadata/oauth/token'
refresh-url: 'https://in.xfel.eu/metadata/oauth/token'
auth-url: 'https://in.xfel.eu/metadata/oauth/authorize'
scope: ''
base-api-url: 'https://in.xfel.eu/metadata/api/'
correct:
in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw
out-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/proc
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment