Skip to content
Snippets Groups Projects
Commit 7f9f1619 authored by Karim Ahmed's avatar Karim Ahmed
Browse files

feat/reportservice

parent 66cd6839
No related branches found
No related tags found
1 merge request!93feat/reportservice
......@@ -33,3 +33,4 @@ docs/source/_static/reports
webservice/webservice.yaml
webservice/*.log
webservice/*sqlite
reportservice/*.log
import asyncio
from datetime import datetime, timedelta
import logging
from dateutil import parser, tz
import yaml
import zmq
async def auto_run():
"""
Run the report service automatically depending on the scheduled times
in the run_time list, read from the config yaml file (cal_conf.yaml)
"""
with open("cal_conf.yaml", 'r') as ymlfile:
cfg = yaml.load(ymlfile)
# time index that points at a timestamp, when the next
# report service run takes place.
tidx = 0
# list of timestamps for the report service runs
run_time = cfg['GLOBAL']['run-on']
# Begin-for-loop to parse the run time-stamps
for i, ti in enumerate(run_time):
run_time[i] = parser.parse(ti)
# End-for-loop
# Begin-while-loop
while True:
time_now = datetime.utcnow().replace(tzinfo=tz.tzutc())
sched_time = run_time[tidx]
# Begin-IF no-timezone, use UTC
if sched_time.tzinfo is None:
sched_time = sched_time.replace(tzinfo=tz.tzutc())
# End-IF
# Begin-IF a run-timestamp passed,
# do automatic-run.
if time_now > sched_time:
con = zmq.Context()
sock = con.socket(zmq.REQ)
port = cfg['GLOBAL']['server-port']
sock.connect(port)
sock.send_pyobj(['all'])
msg = sock.recv_pyobj()
logging.info('{} Automatic Run'
.format(msg))
# Changing run_time to the sametime next week
run_time[tidx] = sched_time + timedelta(weeks=1)
tidx = tidx + 1 if tidx != len(run_time)-1 else 0
# End-IF
# check every 10mins, if there is
# a need for an automatic-run.
await asyncio.sleep(600)
# End-while-loop
logfile = './cal.log'
logging.basicConfig(filename=logfile, filemode='a+',
level=logging.INFO,
format='%(levelname)-6s: %(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
loop = asyncio.get_event_loop()
loop.run_until_complete(auto_run())
loop.close()
This diff is collapsed.
GLOBAL:
git:
figures-local: "/gpfs/exfel/data/scratch/ahmedk/calibration/DetectorCharacterization/figures"
repo-local: "/gpfs/exfel/data/scratch/ahmedk/calibration/DetectorCharacterization/"
figures-remote: "http://git@git.xfel.eu/gitlab/detectors/DetectorCharacterization.git"
server-port: "tcp://max-exfl015:5555"
run-on:
- Monday 08:30:00 UTC
- Thursday 10:33:00 UTC
report-service:
port: 5555
bind-to: tcp://*
allowed-ips:
job-db: ./webservice_jobs.sqlite
job-update-interval: 30
job-timeout: 3600
SPB:
AGIPD1M1:
det-type:
- "AGIPD"
- "STATS_FROM_DB"
start-date: "2019-01-01"
end-date: "2019-12-12"
constants:
- "Noise"
- "SlopesFF"
- "SlopesPC"
- "Offset"
modules: "0-15"
bias-voltages:
- 300
- 500
mem-cells:
- 128
- 176
photon-energy: 9.2
use-existing: "/gpfs/exfel/data/scratch/karnem/testAGIPD16_21/"
out-folder: "/gpfs/exfel/data/scratch/karnem/testAGIPD16_21/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl016:8015#8025"
import logging
import os
import sys
import yaml
import zmq
logfile = './cal.log'
logging.basicConfig(filename=logfile, filemode='a+',
level=logging.INFO,
format='%(levelname)-6s: %(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
async def manual_launch(req_conf):
"""
Run the report service manually from any machine
and provide the requested configuration for
reports generation.
:param req_conf: a list for generating reports for the
requested Instruments. This list can
contain the Instruments names e.g ['SPB']
or ['all'] for generating reports for all
instruments in the "cal_conf.yaml".
This can also be a customized conf. file(dict)
for test purposes.
"""
with open("cal_conf.yaml", 'r') as ymlfile:
cfg = yaml.load(ymlfile)
port = cfg['GLOBAL']['server-port']
con = zmq.Context()
socket = con.socket(zmq.REQ)
con = socket.connect(port)
socket.send_pyobj(req_conf)
msg = socket.recv_pyobj()
logging.info('{} Manual Run'.format(msg))
class Errors:
REQUEST_MALFORMED = "FAILED: request {} is malformed, please contact det-support@xfel.eu"
INSTRUMENT_NOT_FOUND = "FAILED: Instrument {} is not known!, please contact det-support@xfel.eu"
import argparse
import asyncio
import copy
import getpass
import glob
import logging
import os
import subprocess
import shutil
import sqlite3
import sys
from time import sleep
import urllib.parse
from git import Repo, InvalidGitRepositoryError
import yaml
import zmq
import zmq.asyncio
from messages import Errors
loop = asyncio.get_event_loop()
def init_config_repo(config):
""" Make sure the configuration repo. is present and up-to-data
:param config: the configuration defined in the `config-repo` section
"""
os.makedirs(config['repo-local'], exist_ok=True)
try:
# Check if it is a git-repo.
repo = Repo(config['repo-local'])
except InvalidGitRepositoryError:
# clone the repo.
repo = Repo.clone_from(config['figures-remote'],
config['repo-local'])
# make sure it is updated
repo.remote().pull()
logging.info("Config repo is initialized")
async def parse_output(output):
joblist = []
for line in output.split('\n'):
if 'Submitted' in line:
joblist.append(line.split()[2])
logging.info('joblist: {}'.format(joblist))
return joblist
async def wait_jobs(joblist):
counter = 0
while True:
found_jobs = set()
output = subprocess.check_output(['squeue']).decode('utf8')
for line in output.split("\n"):
for job in joblist:
if str(job) in line:
found_jobs.add(job)
if len(found_jobs) == 0:
logging.info('Plot modules is done')
break
sys.stdout.write('%d\r' % counter)
await asyncio.sleep(10)
counter += 10
async def get_run_base(detector):
run_base = ['xfel-calibrate'] + detector['det-type']
for key, item in detector.items():
if key in ['det-type']:
continue
run_base += ['--{}'.format(str(key))]
if not isinstance(item, list):
run_base += [str(item)]
else:
for val in item:
run_base += [str(val)]
return run_base
async def delete_tmp_folder(fpath):
""" Delete temporary folders e.g. the out-folder of new generated
plots.
:param fpath: the folder path that needs to be deleted.
"""
cmd = ["rm", "-rf", fpath]
await asyncio.subprocess.create_subprocess_shell(" ".join(cmd))
async def cp_with_conc_lim(f, path, sem):
""" Copying with concurrency limitation
:param f: the main file with it's current path.
:param path: the path, where f is copied to.
:param sem: Semaphore is a variable that control
access to common resources.
"""
async with sem:
cmd = ["rsync", "-a", f, path]
await asyncio.subprocess.create_subprocess_shell(" ".join(cmd))
async def push_figures(repo_master, addf):
""" Upload new figures
:param repo_master: the local git-repository.
:param addf: the generated figures to be added.
"""
repo = Repo(repo_master)
repo.index.add(addf)
repo.index.commit("Add new {} figures".format(len(addf)))
repo.remote().push()
logging.info('Pushed to git')
async def server_runner(config):
"""
The main server loop. After pulling the latest changes
of the DC project, it awaits receiving configurations
on how to generate the requested reports.
Depending on receiving a conf yaml file or a list of
instruments, the server proceeds by generating figures.
Figures are copied to the corresponding folder in the
DC project and an add-commit-push is performed to
update the remote project and build reports that can
be accessed through ReadTheDocs.
"""
# perform git-dir checks and pull the project for updates.
init_config_repo(config['GLOBAL']['git'])
logging.info("report service port: {}:{}"
.format(config['GLOBAL']['report-service']['bind-to'],
config['GLOBAL']['report-service']['port']))
context = zmq.asyncio.Context()
socket = context.socket(zmq.REP)
socket.bind("{}:{}".format(config['GLOBAL']['report-service']['bind-to'],
config['GLOBAL']['report-service']['port']))
while True:
response = await socket.recv_pyobj()
socket.send_pyobj('Build DC reports through -->')
logging.info("response: {}".format(response))
# Check if response is a list or a dict.
# if list, it should either have instrument names or ['all'].
# if dict, it should acquires the details of the requested reports
# for generation. As it will be used instead of cal_conf.yaml
# reports config file
req_cfg = {}
#Begin-IF response dict or list
if isinstance(response, dict):
req_cfg = response
elif isinstance(response, list):
if len(response) == 1 and response[0] == 'all':
req_cfg = config
else:
req_cfg['GLOBAL'] = config[instr]
for instr in response:
try:
req_cfg[instr] = config[instr]
except:
logging.error(
Errors.INSTRUMENT_NOT_FOUND.format(instr))
continue
else:
logging.error(Errors.REQUEST_MALFORMED.format(response))
continue
#End-IF response dict or list
logging.info('Requested Configuration: {}'.format(req_cfg))
# Begin-asyncio-function(do_action)
async def do_action(cfg):
logging.info('Run plot production')
all_new_files = []
# Begin-for-loop over instruments
for instr_name, instrument in cfg.items():
if instr_name == "GLOBAL":
continue
# Begin-for-loop over detectors
for det_name, detector in instrument.items():
logging.info('Process detector: {}'.format(det_name))
logging.debug('Config information: {}'.format(detector))
run_base = await get_run_base(detector)
try:
output = subprocess.check_output(run_base) \
.decode('utf8')
logging.info(
'Submission information: \n{}'.format(output))
logging.info('Run plots {}'.format(run_base))
except Exception as e:
logging.error('Submission failed: {}'.format(e))
exit(1)
joblist = await parse_output(output)
try:
# make sure to timeout if waiting for jobs
# took more than 10 mins.
await asyncio.wait_for(wait_jobs(joblist), timeout=600)
logging.info('Jobs finished')
except asyncio.TimeoutError:
logging.error('Jobs timeout!')
# Copy all plots
out_folder = detector['out-folder']
path = cfg['GLOBAL']['git']['figures-local']
figures = glob.glob("{}/*png".format(out_folder))
logging.info('Copy figures to: {}'.format(path))
det_new_files = {}
# Begin-for-loop over new-figures
for f in figures:
const = f.split('/')[-1].split('_')[0]
fpath = '{}/{}/{}'.format(path, det_name, const)
os.makedirs(fpath, exist_ok=True)
det_new_files[f] = fpath
sem = asyncio.Semaphore(50)
all_new_files.append(
'{}/{}'.format(fpath, f.split('/')[-1]))
# End-for-loop over new-figures
await asyncio.gather(*[cp_with_conc_lim(k, v, sem)
for k, v in det_new_files.items()])
logging.info('Figures Copied')
# End-for-loop over detectors
# End-for-loop over instruments
asyncio.ensure_future(
push_figures(cfg['GLOBAL']['git']['repo-local'],
all_new_files))
logging.info('All done')
return
# End-asyncio-function(do_action)
# TODO delete tmp file
try:
asyncio.ensure_future(do_action(copy.copy(req_cfg)))
except Exception as e: # actions that fail are only error logged
logging.error(str(e))
break
parser = argparse.ArgumentParser(description='Start the report service')
parser.add_argument('--config-file', type=str, default='./cal_conf.yaml')
parser.add_argument('--log-file', type=str, default='./cal.log')
parser.add_argument('--logging', type=str, default="INFO",
choices=['INFO', 'DEBUG', 'ERROR'])
if __name__ == "__main__":
args = vars(parser.parse_args())
conf_file = args["config_file"]
with open(conf_file, "r") as f:
config = yaml.load(f.read())
logfile = args["log_file"]
fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(filename=logfile, filemode='a+',
level=getattr(logging, args['logging']),
format='%(levelname)-6s: %(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
loop = asyncio.get_event_loop()
loop.run_until_complete(server_runner(config))
loop.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment