Skip to content
Snippets Groups Projects
Commit 0f3044a7 authored by Karim Ahmed's avatar Karim Ahmed
Browse files

asyncio in both runs and del-tmp

parent 509ee7b4
No related branches found
No related tags found
1 merge request!93feat/reportservice
......@@ -5,7 +5,7 @@ import logging
from dateutil import parser, tz
import yaml
import zmq
import zmq.asyncio
async def auto_run():
"""
......@@ -42,12 +42,12 @@ async def auto_run():
# Begin-IF a run-timestamp passed,
# do automatic-run.
if time_now > sched_time:
con = zmq.Context()
con = zmq.asyncio.Context()
sock = con.socket(zmq.REQ)
port = cfg['GLOBAL']['server-port']
sock.connect(port)
sock.send_pyobj(['all'])
msg = sock.recv_pyobj()
await sock.send_pyobj(['all'])
msg = await sock.recv_pyobj()
logging.info('{} Automatic Run'
.format(msg))
......
import asyncio
import logging
import os
import sys
import yaml
import zmq
import zmq.asyncio
logfile = './cal.log'
logging.basicConfig(filename=logfile, filemode='a+',
......@@ -12,7 +14,7 @@ logging.basicConfig(filename=logfile, filemode='a+',
datefmt='%Y-%m-%d %H:%M:%S')
async def manual_launch(req_conf):
async def manual_run(req_conf):
"""
Run the report service manually from any machine
and provide the requested configuration for
......@@ -32,11 +34,11 @@ async def manual_launch(req_conf):
cfg = yaml.load(ymlfile)
port = cfg['GLOBAL']['server-port']
con = zmq.Context()
con = zmq.asyncio.Context()
socket = con.socket(zmq.REQ)
con = socket.connect(port)
socket.send_pyobj(req_conf)
msg = socket.recv_pyobj()
await socket.send_pyobj(req_conf)
msg = await socket.recv_pyobj()
logging.info('{} Manual Run'.format(msg))
......@@ -42,6 +42,7 @@ def init_config_repo(config):
async def parse_output(output):
joblist = []
for line in output.split('\n'):
if 'Submitted' in line:
......@@ -51,6 +52,7 @@ async def parse_output(output):
async def wait_jobs(joblist):
counter = 0
while True:
found_jobs = set()
......@@ -68,6 +70,7 @@ async def wait_jobs(joblist):
async def get_run_base(detector):
run_base = ['xfel-calibrate'] + detector['det-type']
for key, item in detector.items():
......@@ -85,7 +88,7 @@ async def get_run_base(detector):
return run_base
async def delete_tmp_folder(fpath):
async def del_folder(fpath):
""" Delete temporary folders e.g. the out-folder of new generated
plots.
......@@ -113,8 +116,8 @@ async def push_figures(repo_master, addf):
:param repo_master: the local git-repository.
:param addf: the generated figures to be added.
"""
repo = Repo(repo_master)
repo.index.add(addf)
repo.index.commit("Add new {} figures".format(len(addf)))
......@@ -138,7 +141,7 @@ async def server_runner(config):
# perform git-dir checks and pull the project for updates.
init_config_repo(config['GLOBAL']['git'])
logging.info("report service port: {}:{}"
.format(config['GLOBAL']['report-service']['bind-to'],
config['GLOBAL']['report-service']['port']))
......@@ -161,7 +164,7 @@ async def server_runner(config):
# reports config file
req_cfg = {}
#Begin-IF response dict or list
if isinstance(response, dict):
req_cfg = response
......@@ -203,12 +206,12 @@ async def server_runner(config):
logging.debug('Config information: {}'.format(detector))
run_base = await get_run_base(detector)
try:
output = subprocess.check_output(run_base) \
.decode('utf8')
logging.info(
'Submission information: \n{}'.format(output))
'Submission information: {}'.format(output))
logging.info('Run plots {}'.format(run_base))
except Exception as e:
......@@ -224,6 +227,7 @@ async def server_runner(config):
logging.info('Jobs finished')
except asyncio.TimeoutError:
asyncio.ensure_future(del_folder('./tmp'))
logging.error('Jobs timeout!')
# Copy all plots
......@@ -241,7 +245,10 @@ async def server_runner(config):
os.makedirs(fpath, exist_ok=True)
det_new_files[f] = fpath
# set concurrency limitation.
# 50 have been chosen by trial
# (this is not the max limitation).
sem = asyncio.Semaphore(50)
all_new_files.append(
'{}/{}'.format(fpath, f.split('/')[-1]))
......@@ -255,14 +262,14 @@ async def server_runner(config):
# End-for-loop over instruments
asyncio.ensure_future(
push_figures(cfg['GLOBAL']['git']['repo-local'],
all_new_files))
push_figures(cfg['GLOBAL']['git']['repo-local'],
all_new_files))
logging.info('All done')
return
# End-asyncio-function(do_action)
# TODO delete tmp file
# TODO delete out-folder
try:
asyncio.ensure_future(do_action(copy.copy(req_cfg)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment