Skip to content
Snippets Groups Projects
Commit b2c5dc6d authored by Steffen Hauf's avatar Steffen Hauf
Browse files

First working version with MDC interface

parent 974c8288
No related branches found
No related tags found
1 merge request!44add a web service to launch calibration from MDC
......@@ -3,10 +3,12 @@ import asyncio
import ast
import copy
from datetime import datetime
import getpass
from git import Repo, InvalidGitRepositoryError
import glob
import logging
import os
import sqlite3
import socket
import subprocess
import sys
......@@ -17,6 +19,15 @@ import zmq.auth.thread
from errors import Errors, Success
def init_job_db(config):
conn = sqlite3.connect(config['web-service']['job-db'])
c = conn.cursor()
try:
c.execute("SELECT * FROM jobs")
except:
c.execute("CREATE TABLE jobs(id, cycle, proposal, run)")
return conn
def init_config_repo(config):
""" Make sure the configuration repo is present and up-to-data
......@@ -64,9 +75,17 @@ async def upload_config(socket, config, yaml, instrument, cycle, proposal):
repo.remote().push()
logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal))
socket.send(Success.UPLOADED_CONFIG.format(cycle, proposal).encode())
def slurm_status(config, filter_user=True):
cmd = ["squeue"]
if filter_user:
cmd += ["-u", getpass.getuser()]
ret = subprocess.run(cmd)
print(ret)
async def run_cmd(cmd, mode, proposal, run):
async def run_cmd(job_db, cmd, mode, proposal, run, rpath):
""" Run a correction command
:param cmd: to run, should be a in list for as expected by subprocess.run
......@@ -79,11 +98,14 @@ async def run_cmd(cmd, mode, proposal, run):
execution.
"""
if mode == "prod":
ret = subprocess.run(cmd)
ret = subprocess.run(cmd, stdout=subprocess.PIPE)
if ret.returncode == 0:
logging.info(Success.START_CORRECTION.format(proposal, run))
# enter jobs in job db
#c = conn.cursor()
#c.execute("INSERT INTO jobs VALUES ({jobid}, {proposal}, {run}, "QUEUED")")
logging.debug(cmd)
return Success.START_CORRECTION.format(cmd)
return Success.START_CORRECTION.format(proposal, run)
else:
logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode))
return Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode)
......@@ -138,12 +160,13 @@ async def server_runner(conf_file, mode):
config = yaml.load(f.read())
init_config_repo(config['config-repo'])
job_db = None #init_job_db(config)
context = zmq.Context()
auth = zmq.auth.thread.ThreadAuthenticator(context)
if mode == "prod":
if mode == "prod-auth":
auth.start()
auth.allow(config['allowed-ips'])
auth.allow(config['web-service']['allowed-ips'])
socket = context.socket(zmq.REP)
socket.zap_domain = b'global'
......@@ -180,10 +203,33 @@ async def server_runner(conf_file, mode):
out_folder = config['correct']['out-folder'].format(instrument=instrument, cycle=cycle, proposal=proposal)
detectors = {}
rpath = "{}/r{:04d}/".format(in_folder, int(runnr))
if not os.path.exists(rpath):
logging.error(Errors.PATH_NOT_FOUND.format(rpath))
socket.send(Errors.PATH_NOT_FOUND.format(rpath).encode())
continue
async def wait_on_transfer():
rstr = None
ret = None
max_wait = 60 # 600s
tries = 0
while not os.path.exists(rpath):
await asyncio.sleep(10)
await asyncio.sleep(1)
while rstr is None or 'status="online"' in rstr or ret.returncode != 0:
print(" ".join(["getfattr", "-n", "user.status", rpath]))
ret = subprocess.run(["getfattr", "-n", "user.status", rpath], stdout=subprocess.PIPE)
rstr = ret.stdout.decode()
await asyncio.sleep(10)
if tries > max_tries:
return False
tries += 1
return ret.returncode == 0
msg = "Queued proposal {}, run {} for offline calibration".format(proposal, runnr)
socket.send(msg.encode())
transfer_complete = await wait_on_transfer()
if not transfer_complete:
logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr))
return Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr)
for detector, dconfig in pconf[instrument].items():
# check if we find files according to mapping in raw run folder
fl = glob.glob("{}/RAW-*{}*.h5".format(rpath, dconfig["inset"]))
......@@ -201,16 +247,15 @@ async def server_runner(conf_file, mode):
continue
status = []
for detector, dconfig in detectors.items():
cmd = ["xfel-calibrate", "CORRECT"]
cmd += [detector]
cmd = ["python", "-m", "xfel_calibrate.calibrate", detector, "CORRECT"]
for key, value in dconfig.items():
if not isinstance(value, bool):
cmd += ["--{}".format(key), str(value)]
else:
cmd += ["--{}".format(key)]
ret = await run_cmd(cmd, mode, proposal, runnr)
ret = await run_cmd(job_db, cmd, mode, proposal, runnr, dconfig["in-folder"])
status.append(ret)
socket.send(("\n".join(status)).encode())
#socket.send(("\n".join(status)).encode())
if action == 'upload-yaml':
sase, instrument, cycle, proposal, this_yaml = payload
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment