Skip to content
Snippets Groups Projects
Commit 4c34e834 authored by Karim Ahmed's avatar Karim Ahmed
Browse files

concurrent jobs

parent c63a6be4
No related branches found
No related tags found
1 merge request!93feat/reportservice
......@@ -37,7 +37,7 @@ arg_parser.add_argument('--instrument', default=['all'], nargs='+',
help='select the requested instruments. '
'Default=all')
arg_parser.add_argument('--testing', dest='testing', action='store_true',
help='required for testing with different'
help='required for testing with different '
'config files')
arg_parser.set_defaults(testing=False)
arg_parser.add_argument('--config-file', type=str,
......
import argparse
import asyncio
from asyncio.subprocess import PIPE, STDOUT
import copy
import getpass
import glob
......@@ -99,15 +100,21 @@ async def del_folder(fpath):
logging.info('tmp file has been deleted')
async def cp_with_conc_lim(f, path, sem):
async def copy_files(f, path, sem):
""" Copying with concurrency limitation
:param f: the main file with it's current path.
:param path: the path, where f is copied to.
:param sem: Semaphore is a variable that controls
access to common resources.
sem can be give a None value, to indicate
a copy of low numbers of file e.g. 1 file.
"""
async with sem:
if sem:
async with sem:
cmd = ["rsync", "-a", f, path]
await asyncio.subprocess.create_subprocess_shell(" ".join(cmd))
else:
cmd = ["rsync", "-a", f, path]
await asyncio.subprocess.create_subprocess_shell(" ".join(cmd))
......@@ -128,7 +135,7 @@ async def push_figures(repo_master, addf):
logging.info('Pushed to git')
async def server_runner(config):
async def server_runner(conf_file, jobs_timeout):
"""
The main server loop. After pulling the latest changes
of the DC project, it awaits receiving configurations
......@@ -141,6 +148,8 @@ async def server_runner(config):
update the remote project and build reports that can
be accessed through ReadTheDocs.
"""
with open(conf_file, "r") as f:
config = yaml.load(f.read())
# perform git-dir checks and pull the project for updates.
init_config_repo(config['GLOBAL']['git'])
......@@ -155,6 +164,10 @@ async def server_runner(config):
socket.bind("{}:{}".format(config['GLOBAL']['report-service']['bind-to'],
config['GLOBAL']['report-service']['port']))
asyncio.ensure_future(copy_files(conf_file,
config['GLOBAL']['git']['figures-local'],
sem=None))
while True:
response = await socket.recv_pyobj()
await socket.send_pyobj('Build DC reports through -->')
......@@ -188,72 +201,82 @@ async def server_runner(config):
logging.info('Requested Configuration: {}'.format(req_cfg))
async def do_action(cfg):
async def do_action(cfg, jobs_timeout = 3000):
logging.info('Run plot production')
all_new_files = []
all_new_files.append('{}/{}'.format(config['GLOBAL']
['git']
['figures-local'],
conf_file.split('/')[-1]))
for instr_name, instrument in cfg.items():
if instr_name == "GLOBAL":
continue
for det_name, detector in instrument.items():
launched_jobs = []
for det_name, det_conf in instrument.items():
logging.info('Process detector: {}'.format(det_name))
logging.debug('Config information: {}'.format(detector))
logging.debug('Config information: {}'.format(det_conf))
run_base = await get_run_base(detector)
run_base = await get_run_base(det_conf)
try:
output = subprocess.check_output(run_base) \
.decode('utf8')
logging.info(
'Submission information: {}'.format(output))
logging.info('Run plots {}'.format(run_base))
output = await asyncio.create_subprocess_shell(
" ".join(run_base), stdout=PIPE, stderr=PIPE)
launched_jobs.append(output.communicate())
logging.info('Submission information: {}'
.format(run_base))
except Exception as e:
logging.error('Submission failed: {}'.format(e))
exit(1)
joblist = await parse_output(output)
try:
# make sure to timeout if waiting for jobs
# took more than 10 mins.
await asyncio.wait_for(wait_jobs(joblist), timeout=600)
logging.info('Jobs finished')
except asyncio.TimeoutError:
asyncio.ensure_future(del_folder('./tmp'))
logging.error('Jobs timeout!')
# Copy all plots
out_folder = detector['out-folder']
path = cfg['GLOBAL']['git']['figures-local']
figures = glob.glob("{}/*png".format(out_folder))
logging.info('Copy figures to: {}'.format(path))
det_new_files = {}
for f in figures:
const = f.split('/')[-1].split('_')[0]
fpath = '{}/{}/{}'.format(path, det_name, const)
os.makedirs(fpath, exist_ok=True)
det_new_files[f] = fpath
# set concurrency limitation.
# 50 have been chosen by trial
# (this is not the max limitation).
sem = asyncio.Semaphore(50)
all_new_files.append(
'{}/{}'.format(fpath, f.split('/')[-1]))
await asyncio.gather(*[cp_with_conc_lim(k, v, sem)
for k, v in det_new_files.items()])
logging.info('Figures Copied')
outputs = await asyncio.gather(*launched_jobs)
job_list = []
for output in outputs:
job_list += await parse_output(output[0].decode('utf8'))
try:
# make sure to timeout if waiting for jobs
# took more than 10 mins.
await asyncio.wait_for(wait_jobs(job_list), timeout=jobs_timeout)
logging.info('All jobs are finished')
except asyncio.TimeoutError:
asyncio.ensure_future(del_folder('./tmp'))
logging.error('Jobs timeout!')
# Copy all plots
out_folder = det_conf['out-folder']
path = cfg['GLOBAL']['git']['figures-local']
figures = glob.glob("{}/*png".format(out_folder))
logging.info('Copy figures to: {}'.format(path))
det_new_files = {}
for f in figures:
const = f.split('/')[-1].split('_')[0]
fpath = '{}/{}/{}'.format(path, det_name, const)
os.makedirs(fpath, exist_ok=True)
det_new_files[f] = fpath
# Set concurrency limitation.
# 50 have been chosen by trial
# Note: This is not the max limitation.
sem = asyncio.Semaphore(50)
all_new_files.append(
'{}/{}'.format(fpath, f.split('/')[-1]))
await asyncio.gather(*[copy_files(k, v, sem)
for k, v in det_new_files.items()])
logging.info('Figures Copied')
asyncio.ensure_future(
push_figures(cfg['GLOBAL']['git']['repo-local'],
......@@ -262,19 +285,24 @@ async def server_runner(config):
logging.info('All done')
return
# TODO delete out-folder
try:
# Delete out-folder
del_folder(out_folder)
except:
logging.error(str(e))
try:
asyncio.ensure_future(do_action(copy.copy(req_cfg)))
asyncio.ensure_future(do_action(copy.copy(req_cfg), jobs_timeout))
except Exception as e: # actions that fail are only error logged
logging.error(str(e))
break
arg_parser = argparse.ArgumentParser(description='Start the report service')
arg_parser.add_argument('--config-file', type=str,
default='./report_conf.yaml',
arg_parser.add_argument('--config-file', type=str,
default='./report_conf.yaml',
help='config file path with reportservice port. '
'Default=./report_conf.yaml')
arg_parser.add_argument('--jobs-timeout', type=int, default=3000,
help='Maximum time to wait for jobs. Default=3000')
arg_parser.add_argument('--log-file', type=str, default='./report.log',
help='The report log file path. Default=./report.log')
arg_parser.add_argument('--logging', type=str, default="INFO",
......@@ -285,8 +313,6 @@ arg_parser.add_argument('--logging', type=str, default="INFO",
if __name__ == "__main__":
args = vars(arg_parser.parse_args())
conf_file = args["config_file"]
with open(conf_file, "r") as f:
config = yaml.load(f.read())
logfile = args["log_file"]
fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
......@@ -297,5 +323,5 @@ if __name__ == "__main__":
datefmt='%Y-%m-%d %H:%M:%S')
loop = asyncio.get_event_loop()
loop.run_until_complete(server_runner(config))
loop.run_until_complete(server_runner(conf_file, args["jobs_timeout"]))
loop.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment