Skip to content
Snippets Groups Projects
Commit 8577b776 authored by Karim Ahmed's avatar Karim Ahmed
Browse files

Remove report service directory

parent 71377cf5
No related branches found
No related tags found
1 merge request!1058Remove report service directory
Offline Calibration Reportservice
=================================
The Reportserivce is a service responsible for handling requests (manual or automatic triggers)
for generating the DetectorCharacterization reports based on the requested configurations.
The Reportservice mainly consists of a service, clients and YAML configuration.
The service keeps on listening to any ZMQ requests with a given configurations.
Then based on these configurations, it produces slurm jobs (through xfel-calibrate command line) to generate *.png plots of calibration configurations over time.
Finally these generated plots are pushed on to DetectorCharacterization(DC) remote repository and displayed on the ReadTheDocs(RTD).
Configuration
-------------
It is important to know the machine name and the port, where the reportservice is running, for successful connection.
Starting the Service
--------------------
The reportservice is a python script that can run through:
```bash
python reportservice.py
```
The available command line arguments are:
* --report-conf: The path for the main report configuration yaml file.
* --log-file: The path for the log file.
* --mode: The mode for running the service. Choices are sim[simulation], local and prod[production].
* --logging: The required logs to be written. Choices are INFO, DEBUG and Error.
Modes:
*prod* is the production mode working on the max-exfl-cal001 as xcal user for generating the DC report through RTD
and it should generate a very generalized DC report for the available detectors with information useful for most of the detector experts and users.
*local* is the mode used for generating figures locally without uploading the DC report on RTD or pushing figures
to the git repository, rather generated figures are copied to the local repository and depending on the
given report-fmt(report format) argument an html or a pdf is generated in doc/_build/
of the report service out folder (repo-local).
*sim* is a simulation mode, which is mostly used for debugging purposes and tool development without generating any reports locally or over RTD.
This mode make sure not to do any git interactions and it only works on running the notebooks for generating figures in the out-folder without further work.
Report Configuration(report-conf):
*report_conf.yaml* is the configuration file, which contains all the required information
for operating and connecting to the reportservice.
The global information holds the path to the DetectorCharacterization main tool for displaying the plots.
Also, the information over the server (reportservice.py) and the time for automatic triggers for updating DC plots.
```YAML
Global
git:
repo-local: "/gpfs/exfel/data/scratch/<username>/DetectorCharacterization/"
figures-remote: "http://<username>:<git-access-tocken>@git.xfel.eu/gitlab/detectors/DetectorCharacterization.git"
server-port: "tcp:<host-name>:<port-address>"
report_service:
report-service:
port: <port-address>
bind-to: tcp://*
job-db: ./reportservice_jobs.sqlite
job-update-interval: 30
job-timeout: 12000
```
The YAML configuration file can be modified with all the available parameters, responsible for generating the required plots and DC report on RTD.
```YAML
<instrument>:
<detector>:
det-type:
start-date:
nconstants:
end-date:
constants:
- "Noise"
- "Offset"
modules:
bias-voltages:
mem-cells:
out-folder:
cal-db-timeout: 180000
cal-db-interface: "<cal-db-host-port>"
```
Triggering the service
---------------------
To use the service and generate a DC report corresponding to the report_conf.yaml.
The service can be triggered through two processes:
Automatic Launch:
This is a similar script to reportservice.py that needs to be run using:
```bash
python automatic_run.py
```
* --config-file: The path for the configuration file* --log-file: The path for the log file.
* --logging: The required logs to be written. Choices are INFO, DEBUG and Error
* --log-file: The path for the log file.
Manual Launch:
This manual launch script is currently used for debugging purposes, only.
The available command line arguments are:
* --config-file: The path for the configuration file
* --instrument: A selected list of instruments to generate a report for. This instrument must be in the report_conf.yaml. The default for this argument is ['all]
* --overwrite-conf: A bool for indicating a new report configuration file(conf-file) should be sent instead of the default report_conf.yaml,
which is used by report_service.py from the start.
* --log-file: The path for the log file.
* --report-fmt: The output DC report format. Choices are pdf or html
* --upload: A bool for uploading the figure to out-folder of the report service(repo-local) and generating a report. Default is False
* --logging: The required log mode to be used. Choices are INFO, DEBUG and Error
import argparse
import asyncio
import logging
from datetime import datetime, timedelta
import yaml
import zmq
import zmq.asyncio
from dateutil import parser, tz
async def auto_run(cfg, timeout=3000):
"""
Run the report service automatically depending on the scheduled times
in the run_time list, read from the config yaml file (report_conf.yaml)
"""
# time index that points at a timestamp, when the next
# report service run takes place.
tidx = 0
# list of timestamps for the report service runs
run_time = cfg['GLOBAL']['run-on']
request = {}
request['req'] = ['all']
request['upload'] = True
request['report-fmt'] = 'html'
for i, ti in enumerate(run_time):
run_time[i] = parser.parse(ti)
while True:
time_now = datetime.utcnow().replace(tzinfo=tz.tzutc())
sched_time = run_time[tidx]
if sched_time.tzinfo is None:
sched_time = sched_time.replace(tzinfo=tz.tzutc())
# do automatic-run.
if time_now > sched_time:
con = zmq.asyncio.Context()
sock = con.socket(zmq.REQ)
port = cfg['GLOBAL']['server-port']
sock.SNDTIMEO = timeout
sock.RCVTIMEO = timeout
sock.connect(port)
await sock.send_pyobj(request)
msg = await sock.recv_pyobj()
logging.info('{} Automatic Run'.format(msg))
# Changing run_time to the sametime next week
run_time[tidx] = sched_time + timedelta(weeks=1)
tidx = tidx + 1 if tidx != len(run_time)-1 else 0
# check every 10mins, if there is
# a need for an automatic-run.
await asyncio.sleep(3000)
arg_parser = argparse.ArgumentParser(description='Automatic Launch')
arg_parser.add_argument('--config-file', type=str,
default='./report_conf.yaml',
help='config file path with reportservice port. '
'Default=./report_conf.yaml')
arg_parser.add_argument('--log-file', type=str, default='./report.log',
help='The report log file path. Default=./report.log')
arg_parser.add_argument('--logging', type=str, default="INFO",
help='logging modes: INFO, DEBUG or ERROR. '
'Default=INFO',
choices=['INFO', 'DEBUG', 'ERROR'])
if __name__ == "__main__":
args = vars(arg_parser.parse_args())
conf_file = args["config_file"]
with open(conf_file, "r") as f:
cfg = yaml.load(f.read(), Loader=yaml.FullLoader)
logfile = args["log_file"]
fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(filename=logfile, filemode='a+',
level=getattr(logging, args['logging']),
format='%(levelname)-6s: %(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
loop = asyncio.get_event_loop()
loop.run_until_complete(auto_run(cfg))
loop.close()
#!/bin/bash
# the path to doc folder with a Makefile
dc_folder=$1
report_fmt=$2
echo "Running with the following parameters:"
echo "DC folder path: $dc_folder"
echo "DC Report format should be: report_fmt"
if [ "${report_fmt}" == "pdf" ]
then
make latexpdf -C "${dc_folder}"
elif [ "${report_fmt}" == "html" ]
then
make html -C "${dc_folder}"
fi
import argparse
import logging
import yaml
import zmq
def manual_run(request, cfg):
"""
Run the report service manually from any machine
and provide the requested configuration for
reports generation.
:param request: a dictionary for generating reports for the
requested Instruments. This dict consists
of two keys. A upload boolean key for defining
the fate of the generated plots
(pushed to the DC repo. or staying locally)
and a key request which can either be a list
of requested Instruments names e.g ['SPB'] or ['all']
for generating reports, or dict or new requested
configuration.
"""
port = cfg['GLOBAL']['server-port']
con = zmq.Context()
socket = con.socket(zmq.REQ)
socket.connect(port)
socket.send_pyobj(request)
msg = socket.recv_pyobj()
logging.info('{} Manual Run'.format(msg))
arg_parser = argparse.ArgumentParser(description='Manual Launch')
arg_parser.add_argument('--instrument', default=['all'], nargs='+',
help='Select the requested instruments. '
'Default=\"all\", which can be used for selecting'
' all instruments.')
arg_parser.add_argument('--config-file', type=str,
default='./report_conf.yaml',
help='path to report configuration file '
'Default=./report_conf.yaml')
arg_parser.add_argument('--upload', action='store_true',
help='Required for uploading the generated figures.'
'Default=False. '
'Note: THIS HAS NO EFFECT IN SIM MODE!')
arg_parser.set_defaults(upload=False)
arg_parser.add_argument('--overwrite-conf', action='store_true',
help='A flag for using a different config file than'
'what is used by the running report_service.'
'Default=False, type=str.')
arg_parser.set_defaults(overwrite_conf=False)
arg_parser.add_argument('--report-fmt', default='html',
type=str, choices=['pdf', 'html'],
help='If available in the report service running mode,'
' this can configure the report format '
'to be html or pdf. Default=html '
'Note: THIS HAS NO EFFECT IN PROD AND SIM MODES!')
arg_parser.add_argument('--log-file', type=str, default='./report.log',
help='The report log file path. Default=./report.log')
arg_parser.add_argument('--logging', type=str, default="INFO",
help='logging modes: INFO, DEBUG or ERROR. '
'Default=INFO',
choices=['INFO', 'DEBUG', 'ERROR'])
if __name__ == "__main__":
args = vars(arg_parser.parse_args())
conf_file = args["config_file"]
with open(conf_file, "r") as f:
cfg = yaml.load(f.read(), Loader=yaml.FullLoader)
logfile = args["log_file"]
fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(filename=logfile, filemode='a+',
level=getattr(logging, args['logging']),
format='%(levelname)-6s: %(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
request = {'upload': args["upload"],
'report-fmt': args["report_fmt"]}
if args["overwrite_conf"]:
request['req'] = cfg
else:
request['req'] = args["instrument"]
manual_run(request, cfg)
class Errors:
REQUEST_MALFORMED = "FAILED: request {} is malformed, please contact det-support@xfel.eu"
INSTRUMENT_NOT_FOUND = "FAILED: Instrument {} is not known!, please contact det-support@xfel.eu"
GLOBAL:
git:
repo-local: "/gpfs/exfel/data/scratch/xcal/calibration/DetectorCharacterization/"
figures-remote: "http://git@git.xfel.eu/gitlab/detectors/DetectorCharacterization.git"
server-port: "tcp://max-exfl-cal001:5566"
run-on:
- Monday 08:30:00 UTC
report-service:
port: 5566
bind-to: tcp://*
allowed-ips:
job-db: ./reportservice_jobs.sqlite
job-update-interval: 30
job-timeout: 12000
SPB:
AGIPD1M1:
det-type:
- "GENERIC"
- "STATS_FROM_DB2"
modules:
- "AGIPD1M1"
start-date: "2019-01-01"
end-date: "NOW"
nconstants: 20
constants:
- "Noise"
- "SlopesFF"
- "SlopesPC"
- "Offset"
dclass: "AGIPD"
submodules: "0-16"
bias-voltage:
- 300
- 500
memory-cells:
- 128
- 176
- 250
acquisition-rate:
- 1.1
- 2.2
- 4.5
gain-setting:
- 0
- 1
- 2
photon-energy: 9.2
separate-plot:
- "gain_setting"
parameter-names:
- "bias_voltage"
- "acquisition_rate"
- "memory_cells"
spShape:
- 64
- 64
gain-titles:
- "High gain"
- "Medium gain"
- "Low gain"
x-labels:
- "Acquisition rate"
- "Memory cells"
sp-name: "ASICs id"
nMemToShow: 32
use-existing: "''"
out-folder: "/gpfs/exfel/data/scratch/xcal/report_service/tmp/{instrument}/{detector}/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl-cal001:8015#8025"
JUNGFRAU:
det-type:
- "GENERIC"
- "STATS_FROM_DB"
start-date: "2019-01-01"
end-date: "NOW"
nconstants: 20
constants:
- "Noise"
- "Offset"
- "RelativeGain"
dclass: "jungfrau"
nMemToShow: 16
modules:
- "Jungfrau_M035"
- "Jungfrau_M203"
- "Jungfrau_M221"
- "Jungfrau_M275"
- "Jungfrau_M273"
- "Jungfrau_M267"
bias-voltage:
- 90
- 180
memory-cells:
- 1
- 16
pixels-x:
- 1024
pixels-y:
- 512
- 1024
temperature:
- 291
integration-time:
- 4.96
- 50
- 250
gain-setting:
- 0
- 1
- 2
separate-plot:
- "integration_time"
- "gain_setting"
- "memory_cells"
parameter-names:
- "bias_voltage"
- "integration_time"
- "pixels_x"
- "pixels_y"
- "temperature"
- "memory_cells"
spShape:
- 256
- 64
gain-titles:
- "High gain"
- "Medium gain"
- "Low gain"
x-labels:
- "Sensor Temperature"
- "Integration Time"
sp-name: "Supercolumn 256*64"
photon-energy: 9.2
use-existing: "''"
out-folder: "/gpfs/exfel/data/scratch/xcal/report_service/tmp/{instrument}/{detector}/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl-cal001:8015#8025"
MID:
AGIPD1M2:
det-type:
- "GENERIC"
- "STATS_FROM_DB2"
modules:
- "AGIPD1M2"
start-date: "2019-01-01"
end-date: "NOW"
nconstants: 20
constants:
- "Noise"
- "SlopesFF"
- "SlopesPC"
- "Offset"
dclass: "AGIPD"
submodules: "0-16"
bias-voltage:
- 300
- 500
memory-cells:
- 128
- 176
- 250
acquisition-rate:
- 1.1
- 2.2
- 4.5
gain-setting:
- 0
- 1
- 2
photon-energy: 9.2
separate-plot:
- "gain_setting"
parameter-names:
- "bias_voltage"
- "acquisition_rate"
- "memory_cells"
spShape:
- 64
- 64
gain-titles:
- "High gain"
- "Medium gain"
- "Low gain"
x-labels:
- "Acquisition rate"
- "Memory cells"
sp-name: "ASICs id"
nMemToShow: 32
use-existing: "''"
out-folder: "/gpfs/exfel/data/scratch/xcal/report_service/tmp/{instrument}/{detector}/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl-cal001:8015#8025"
EPIX:
det-type:
- "GENERIC"
- "STATS_FROM_DB"
start-date: "2019-01-01"
end-date: "NOW"
nconstants: 20
dclass: "ePix100"
nMemToShow: 1
modules:
- "ePix100_M15"
- "ePix100_M18"
constants:
- "Noise"
- "Offset"
bias-voltage:
- 200
temperature:
- 288
integration-time:
- 1
- 50
photon-energy: 9.2
separate-plot:
- "integration_time"
parameter-names:
- "bias_voltage"
- "integration_time"
- "temperature"
- "in_vacuum"
spShape:
- 354
- 96
x-labels:
- "Sensor Temperature"
- "Integration Time"
sp-name: "ASICs id"
use-existing: "''"
out-folder: "/gpfs/exfel/data/scratch/xcal/report_service/tmp/{instrument}/{detector}/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl-cal001:8015#8025"
FXE:
LPD1M1:
det-type:
- "GENERIC"
- "STATS_FROM_DB2"
modules:
- "LPD1M1"
start-date: "2019-01-01"
end-date: "NOW"
nconstants: 20
constants:
- "Noise"
- "SlopesFF"
- "SlopesCI"
- "Offset"
dclass: "LPD"
submodules: "0-16"
bias-voltage:
- 250
- 500
memory-cells:
- 1
- 128
- 512
gain-setting:
- 0
- 1
- 2
photon-energy: 9.2
separate-plot:
- "gain_setting"
parameter-names:
- "bias_voltage"
- "memory_cells"
spShape:
- 64
- 64
gain-titles:
- "High gain"
- "Medium gain"
- "Low gain"
x-labels:
- "Memory cells"
sp-name: "ASICs id"
nMemToShow: 32
use-existing: "''"
out-folder: "/gpfs/exfel/data/scratch/xcal/report_service/tmp/{instrument}/{detector}/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl-cal001:8015#8025"
JUNGFRAU:
det-type:
- "GENERIC"
- "STATS_FROM_DB"
start-date: "2019-01-01"
end-date: "NOW"
nconstants: 20
constants:
- "Noise"
- "Offset"
- "RelativeGain"
dclass: "jungfrau"
modules:
- "Jungfrau_M233"
- "Jungfrau_M125"
- "Jungfrau_M260"
bias-voltage:
- 90
- 180
memory-cells:
- 1
- 16
pixels-x:
- 1024
pixels-y:
- 512
- 1024
temperature:
- 291
integration-time:
- 4.96
- 50
- 250
gain-setting:
- 0
- 1
- 2
separate-plot:
- "integration_time"
- "gain_setting"
- "memory_cells"
parameter-names:
- "bias_voltage"
- "integration_time"
- "pixels_x"
- "pixels_y"
- "temperature"
- "memory_cells"
spShape:
- 256
- 64
gain-titles:
- "High gain"
- "Medium gain"
- "Low gain"
nMemToShow: 1
x-labels:
- "Sensor Temperature"
- "Integration Time"
sp-name: "Supercolumn 256*64"
photon-energy: 9.2
use-existing: "''"
out-folder: "/gpfs/exfel/data/scratch/xcal/report_service/tmp/{instrument}/{detector}/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl-cal001:8015#8025"
DETLAB:
FASTCCD:
det-type:
- "GENERIC"
- "STATS_FROM_DB"
start-date: "2019-01-01"
end-date: "NOW"
nconstants: 20
constants:
- "Noise"
- "Offset"
dclass: "CCD"
nMemToShow: 1
modules:
- "fastCCD1"
bias-voltage:
- 79
temperature:
- 235
- 216
- 245
integration-time:
- 1
- 50
gain-setting:
- 0
- 1
- 2
- 8
pixels-x:
- 1934
pixels-y:
- 960
separate-plot:
- "integration_time"
- "gain_setting"
- "temperature"
parameter-names:
- "bias_voltage"
- "integration_time"
- "pixels_x"
- "pixels_y"
- "gain_setting"
- "temperature"
spShape:
- 967
- 10
gain-titles:
- "gain 0x"
- "gain 1x"
- "gain 2x"
- "gain 8x"
x-labels:
- "Sensor Temperature"
- "Integration Time"
sp-name: "Supercolumn 967*10"
photon-energy: 9.2
use-existing: "''"
out-folder: "/gpfs/exfel/data/scratch/xcal/report_service/tmp/{instrument}/{detector}/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl-cal001:8015#8025"
SCS:
FASTCCD:
det-type:
- "GENERIC"
- "STATS_FROM_DB"
start-date: "2019-01-01"
end-date: "NOW"
nconstants: 20
constants:
- "Noise"
- "Offset"
dclass: "CCD"
nMemToShow: 1
modules:
- "fastCCD1"
bias-voltage:
- 79
temperature:
- 235
- 216
- 245
integration-time:
- 1
- 50
gain-setting:
- 0
- 1
- 2
- 8
pixels-x:
- 1934
pixels-y:
- 960
separate-plot:
- "integration_time"
- "gain_setting"
- "temperature"
parameter-names:
- "bias_voltage"
- "integration_time"
- "pixels_x"
- "pixels_y"
- "gain_setting"
- "temperature"
spShape:
- 967
- 10
gain-titles:
- "gain 0x"
- "gain 1x"
- "gain 2x"
- "gain 8x"
x-labels:
- "Sensor Temperature"
- "Integration Time"
sp-name: "Supercolumn 967*10"
photon-energy: 9.2
use-existing: "''"
out-folder: "/gpfs/exfel/data/scratch/xcal/report_service/tmp/{instrument}/{detector}/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl-cal001:8015#8025"
SQS:
PNCCD:
det-type:
- "GENERIC"
- "STATS_FROM_DB"
start-date: "2019-01-01"
end-date: "NOW"
nconstants: 20
constants:
- "Noise"
- "Offset"
dclass: "CCD"
nMemToShow: 1
modules:
- "PnCCD1"
bias-voltage:
- 300
temperature:
- 235
integration-time:
- 1
- 50
gain-setting:
- 0
pixels-x:
- 1024
pixels-y:
- 1024
separate-plot:
- "integration_time"
- "temperature"
parameter-names:
- "bias_voltage"
- "integration_time"
- "pixels_x"
- "pixels_y"
- "gain_setting"
- "temperature"
spShape:
- 256
- 256
x-labels:
- "Sensor Temperature"
- "Integration Time"
sp-name: "Supercolumn 256*256"
photon-energy: 9.2
use-existing: "''"
out-folder: "/gpfs/exfel/data/scratch/xcal/report_service/tmp/{instrument}/{detector}/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl-cal001:8015#8025"
HED:
EPIX:
det-type:
- "GENERIC"
- "STATS_FROM_DB"
start-date: "2019-01-01"
end-date: "NOW"
nconstants: 20
dclass: "ePix100"
nMemToShow: 1
modules:
- "ePix100_M16"
- "ePix100_M17"
constants:
- "Noise"
- "Offset"
bias-voltage:
- 200
temperature:
- 288
integration-time:
- 1
- 50
photon-energy: 9.2
separate-plot:
- "integration_time"
parameter-names:
- "bias_voltage"
- "integration_time"
- "temperature"
- "in_vacuum"
spShape:
- 354
- 96
x-labels:
- "Sensor Temperature"
- "Integration Time"
sp-name: "superpixel id"
use-existing: "''"
out-folder: "/gpfs/exfel/data/scratch/xcal/report_service/tmp/{instrument}/{detector}/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl-cal001:8015#8025"
JUNGFRAU:
det-type:
- "GENERIC"
- "STATS_FROM_DB"
start-date: "2019-01-01"
end-date: "NOW"
nconstants: 20
constants:
- "Noise"
- "Offset"
- "RelativeGain"
dclass: "jungfrau"
nMemToShow: 1
modules:
- "Jungfrau_M039"
- "Jungfrau_M266"
bias-voltage:
- 90
- 180
memory-cells:
- 1
- 16
pixels-x:
- 1024
pixels-y:
- 512
- 1024
temperature:
- 291
integration-time:
- 4.96
- 50
- 250
gain-setting:
- 0
- 1
- 2
separate-plot:
- "integration_time"
- "gain_setting"
- "memory_cells"
parameter-names:
- "bias_voltage"
- "integration_time"
- "pixels_x"
- "pixels_y"
- "temperature"
- "memory_cells"
spShape:
- 256
- 64
gain-titles:
- "High gain"
- "Medium gain"
- "Low gain"
x-labels:
- "Sensor Temperature"
- "Integration Time"
sp-name: "Supercolumn 256*64"
photon-energy: 9.2
use-existing: "''"
out-folder: "/gpfs/exfel/data/scratch/xcal/report_service/tmp/{instrument}/{detector}/"
cal-db-timeout: 180000
cal-db-interface: "tcp://max-exfl-cal001:8015#8025"
import argparse
import asyncio
import copy
import glob
import logging
import os
import subprocess
from asyncio.subprocess import PIPE
import yaml
import zmq
import zmq.asyncio
from git import InvalidGitRepositoryError, Repo
from messages import Errors
loop = asyncio.get_event_loop()
def init_config_repo(config):
""" Make sure the configuration repo. is present and up-to-data
:param config: the configuration defined in the `config-repo` section
"""
os.makedirs(config['repo-local'], exist_ok=True)
try:
# Check if it is a git-repo.
repo = Repo(config['repo-local'])
except InvalidGitRepositoryError:
# clone the repo.
repo = Repo.clone_from(config['figures-remote'],
config['repo-local'])
logging.info("Cloning the repository")
try:
# make sure it is updated
repo.remote().pull()
except Exception as e:
logging.error(e)
# update the head of local repository as the remote's
repo.remote().fetch()
repo.git.reset('--hard', 'origin/master')
# then make sure
repo.remote().pull()
logging.info("Config repo is initialized")
async def parse_output(output):
joblist = []
for line in output.split('\n'):
if 'Submitted' in line:
joblist.append(line.split()[2])
logging.info('joblist: {}'.format(joblist))
return joblist
async def wait_jobs(joblist):
counter = 0
while True:
found_jobs = set()
output = subprocess.check_output(['squeue']).decode('utf8')
for line in output.split("\n"):
for job in joblist:
if str(job) in line:
found_jobs.add(job)
if not found_jobs:
logging.info('Jobs are finished')
break
await asyncio.sleep(10)
counter += 10
async def get_run_base(instr_name, det_name, detector):
run_base = ['xfel-calibrate'] + detector['det-type']
for key, item in detector.items():
if key in ['det-type']:
continue
run_base += ['--{}'.format(str(key))]
if not isinstance(item, list):
if key == 'out-folder':
item = detector['out-folder'].format(instrument=instr_name,
detector=det_name)
run_base += [str(item).replace(' ', '\ ')]
else:
for val in item:
run_base += [str(val).replace(' ', '\ ')]
return run_base
async def del_folder(fpath):
""" Delete temporary folders e.g. the out-folder of new generated
plots.
:param fpath: the folder path that needs to be deleted.
"""
cmd = ["rm", "-rf", fpath]
await asyncio.subprocess.create_subprocess_shell(" ".join(cmd))
logging.info('temp file {} has been deleted'.format(fpath))
async def copy_files(f, path, sem):
""" Copying with concurrency limitation
:param f: the main file with its current path.
:param path: the path, where f is copied to.
:param sem: Semaphore is a variable that controls
access to common resources.
sem can be give a None value, to indicate
a copy of low numbers of file e.g. 1 file.
"""
if sem:
async with sem:
cmd = ["rsync", "-a", f, path]
await asyncio.subprocess.create_subprocess_shell(" ".join(cmd))
else:
cmd = ["rsync", "-a", f, path]
await asyncio.subprocess.create_subprocess_shell(" ".join(cmd))
async def build_dc_report(dc_folder, report_fmt):
"""
Generating a DC report (latex or html) using maxwell nodes.
With the supported inputs a slurm job is submitted to sphinx-build
pdf or html depending on mode of the report_service.
html for prod mode and pdf or html for local mode depending
on the chosen report_fmt
:param dc_folder: the local DC folder path with figures and rst files
:param report_fmt: the expected report format(html or pdf)
"""
temp_path = "{}/temp/build_dc_report/".format(os.getcwd())
os.makedirs(temp_path, exist_ok=True)
# launching a slurm job and assigning the bash script to it.
sprof = os.environ.get("XFELCALSLURM", "exfel")
launcher_command = "sbatch -t 24:00:00 --mem 500G --requeue " \
"--output {temp_path}/slurm-%j.out"
srun_base = launcher_command.format(
temp_path=temp_path) + " -p {}".format(sprof)
srun_base = srun_base.split()
srun_base += [os.path.abspath("./build_dc_report.sh"),
os.path.abspath("{}/doc".format(dc_folder)),
report_fmt]
logging.info("Building DC report submission: {}".format(srun_base))
output = subprocess.check_output(srun_base).decode('utf8')
jobid = None
for line in output.split("\n"):
if "Submitted batch job " in line:
jobid = line.split(" ")[3]
logging.info("Submitted job for building a report: {}".format(jobid))
await asyncio.wait_for(wait_jobs([jobid]), timeout=7200) # timeout=2hours
# delete folder only after the pending slurm jobs finishes
await del_folder("{}/slurm-{}.out".format(temp_path, jobid))
async def push_figures(repo_master, addf):
""" Upload new figures
:param repo_master: the local git-repository.
:param addf: the generated figures to be added.
"""
repo = Repo(repo_master)
add_tries = 0
while add_tries < 10:
try:
repo.index.add(addf)
add_tries = 10
except Exception as e:
logging.error(str(e))
await asyncio.sleep(2)
add_tries += 1
repo.index.commit("Add {} new figures".format(len(addf)))
repo.remote().push()
logging.info('Pushed to git')
async def server_runner(conf_file, mode):
"""
The main server loop. After pulling the latest changes
of the DC project, it awaits receiving configurations
on how to generate the requested reports.
Depending on receiving a conf yaml file or a list of
instruments, the server proceeds by generating figures.
Figures are copied to the corresponding folder in the
DC project and an add-commit-push is performed to
update the remote project and build reports that can
be accessed through ReadTheDocs.
"""
with open(conf_file, "r") as f:
config = yaml.load(f.read(), Loader=yaml.FullLoader)
# perform git-dir checks and pull the project
# for updates only in production mode.
if mode != 'sim':
init_config_repo(config['GLOBAL']['git'])
logging.info("Report service started in mode: {}".format(mode))
logging.info("report service port: {}:{}"
.format(config['GLOBAL']['report-service']['bind-to'],
config['GLOBAL']['report-service']['port']))
context = zmq.asyncio.Context()
socket = context.socket(zmq.REP)
socket.bind("{}:{}".format(config['GLOBAL']['report-service']['bind-to'],
config['GLOBAL']['report-service']['port']))
while True:
response = await socket.recv_pyobj()
await socket.send_pyobj('Build DC reports through -->')
logging.info("response: {} with uploading: {} and report format: {}"
.format(response['req'],
response['upload'],
response['report-fmt']))
# Check if response is a list or a dict.
# if list, it should either have instrument names or ['all'].
# if dict, it should acquire the details of the requested reports
# for generation. As it will be used instead of report_conf.yaml
# reports config file
req_cfg = {}
# Validate the type of 'requested' response.
if isinstance(response['req'], dict):
req_cfg = response['req']
elif isinstance(response['req'], list):
if len(response['req']) == 1 and response['req'][0] == 'all':
req_cfg = config
else:
req_cfg['GLOBAL'] = config['GLOBAL']
for instr in response['req']:
try:
req_cfg[instr] = config[instr]
except:
logging.error(
Errors.INSTRUMENT_NOT_FOUND.format(instr))
continue
else:
logging.error(Errors.REQUEST_MALFORMED.format(response['req']))
continue
# No interaction with DC repository (local or remote)
# is allowed if sim mode.
if mode == 'sim':
req_cfg['GLOBAL']['upload'] = False
req_cfg['GLOBAL']['report-fmt'] = False
else:
# boolean for pushing to DC git repo.
req_cfg['GLOBAL']['upload'] = response['upload']
if mode == 'prod':
req_cfg['GLOBAL']['report-fmt'] = 'html'
else:
req_cfg['GLOBAL']['report-fmt'] = response['report-fmt']
logging.info('Requested Configuration: {}'.format(req_cfg))
async def do_action(cfg, service_mode):
logging.info('Run plot production')
local_repo = cfg['GLOBAL']['git']['repo-local']
fig_local = '{}/figures'.format(local_repo)
jobs_timeout = cfg['GLOBAL']['report-service'].get('job-timeout',
3600)
all_new_files = []
for instr_name, instrument in cfg.items():
if instr_name == "GLOBAL":
continue
launched_jobs = []
for det_name, det_conf in instrument.items():
logging.info('Process detector: {}'.format(det_name))
logging.debug('Config information: {}'.format(det_conf))
run_base = await get_run_base(instr_name,
det_name,
det_conf)
try:
output = await asyncio.create_subprocess_shell(
" ".join(run_base), stdout=PIPE, stderr=PIPE)
launched_jobs.append(output.communicate())
logging.info('Submission information: {}:'
.format(run_base))
except Exception as e:
logging.error('Submission failed: {}'.format(e))
exit(1)
outputs = await asyncio.gather(*launched_jobs)
job_list = []
for output in outputs:
if output[0]:
logging.info('Submission Output: {}'
.format(output[0].decode('utf8')))
if output[1]:
logging.error('Submission Error: {}'
.format(output[1].decode('utf8')))
job_list += await parse_output(output[0].decode('utf8'))
try:
await asyncio.wait_for(wait_jobs(job_list),
timeout=jobs_timeout)
logging.info('All jobs are finished')
except asyncio.TimeoutError:
logging.error('Jobs have timed-out!')
logging.error('{}/temp has not been deleted.'.format(
os.path.dirname(os.path.abspath(__file__))))
# Avoid copying files if upload bool is False
# to avoid causing local git repository errors.
if cfg['GLOBAL']['upload']:
# Copy all plots
for det_name, det_conf in instrument.items():
out_folder = det_conf['out-folder'].format(
instrument=instr_name,
detector=det_name)
figures = glob.glob("{}/*png".format(out_folder))
det_new_files = {}
for f in figures:
const = f.split('/')[-1].split('_')[0]
fpath = '{}/{}/{}/{}'.format(fig_local, instr_name,
det_name, const)
os.makedirs(fpath, exist_ok=True)
det_new_files[f] = fpath
# Set concurrency limitation.
# 50 have been chosen by trial
# Note: This is not the max limitation.
sem = asyncio.Semaphore(50)
all_new_files.append(
'{}/{}'.format(fpath, f.split('/')[-1]))
await asyncio.gather(*[copy_files(k, v, sem)
for k, v in det_new_files.items()]) # noqa
logging.info('{} figures of {} are copied into {}'
.format(len(figures), det_name,
fig_local))
if cfg['GLOBAL']['upload']:
try:
report_fmt = cfg['GLOBAL']['report-fmt']
# Remove sensitive information from the config file.
del cfg['GLOBAL']
# Write the requested cfg.yaml before pushing all figures.
with open('{}/report_conf.yaml'.format(
fig_local), 'w') as outfile:
yaml.dump(cfg, outfile, default_flow_style=False)
if service_mode == 'prod':
# add report_con.yaml in the list of files added to the
# new git commit before pushing to remote
all_new_files.append('{}/report_conf.yaml'
.format(fig_local))
asyncio.ensure_future(push_figures(local_repo,
all_new_files))
# build either html or pdf depending on the running mode
# of the report_service and requested report format.
asyncio.ensure_future(build_dc_report(local_repo,
report_fmt)) # noqa
except Exception as upload_e:
logging.error("upload failed: {}".format(upload_e))
# TODO:delete out-folder
#try:
# asyncio.ensure_future(del_folder(out_folder))
#except:
# logging.error(str(e))
logging.info('Generating requested plots is finished!')
logging.info('=======================================')
return
try:
asyncio.ensure_future(do_action(copy.copy(req_cfg),
mode))
except Exception as e: # actions that fail are only error logged
logging.error(str(e))
break
arg_parser = argparse.ArgumentParser(description='Start the report service')
arg_parser.add_argument('--config-file', type=str,
default='./report_conf.yaml',
help='config file path with '
'reportservice port. '
'Default=./report_conf.yaml')
arg_parser.add_argument('--mode', type=str, default="sim", choices=['sim', 'prod', 'local'])
arg_parser.add_argument('--log-file', type=str, default='./report.log',
help='The report log file path. Default=./report.log')
arg_parser.add_argument('--logging', type=str, default="INFO",
help='logging modes: INFO, DEBUG or ERROR. '
'Default=INFO',
choices=['INFO', 'DEBUG', 'ERROR'])
if __name__ == "__main__":
args = vars(arg_parser.parse_args())
conf_file = args["config_file"]
logfile = args["log_file"]
fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(filename=logfile, filemode='a+',
level=getattr(logging, args['logging']),
format='%(levelname)-6s: %(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
mode = args["mode"]
loop = asyncio.get_event_loop()
loop.run_until_complete(server_runner(conf_file, mode))
loop.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment