Skip to content
Snippets Groups Projects
Commit 354c2a6a authored by Karim Ahmed's avatar Karim Ahmed
Browse files

1st changes of MR reportservice

parent d86472a4
No related branches found
No related tags found
1 merge request!93feat/reportservice
...@@ -23,23 +23,17 @@ async def auto_run(): ...@@ -23,23 +23,17 @@ async def auto_run():
# list of timestamps for the report service runs # list of timestamps for the report service runs
run_time = cfg['GLOBAL']['run-on'] run_time = cfg['GLOBAL']['run-on']
# Begin-for-loop to parse the run time-stamps
for i, ti in enumerate(run_time): for i, ti in enumerate(run_time):
run_time[i] = parser.parse(ti) run_time[i] = parser.parse(ti)
# End-for-loop
# Begin-while-loop
while True: while True:
time_now = datetime.utcnow().replace(tzinfo=tz.tzutc()) time_now = datetime.utcnow().replace(tzinfo=tz.tzutc())
sched_time = run_time[tidx] sched_time = run_time[tidx]
# Begin-IF no-timezone, use UTC
if sched_time.tzinfo is None: if sched_time.tzinfo is None:
sched_time = sched_time.replace(tzinfo=tz.tzutc()) sched_time = sched_time.replace(tzinfo=tz.tzutc())
# End-IF
# Begin-IF a run-timestamp passed,
# do automatic-run. # do automatic-run.
if time_now > sched_time: if time_now > sched_time:
con = zmq.asyncio.Context() con = zmq.asyncio.Context()
...@@ -55,19 +49,38 @@ async def auto_run(): ...@@ -55,19 +49,38 @@ async def auto_run():
run_time[tidx] = sched_time + timedelta(weeks=1) run_time[tidx] = sched_time + timedelta(weeks=1)
tidx = tidx + 1 if tidx != len(run_time)-1 else 0 tidx = tidx + 1 if tidx != len(run_time)-1 else 0
# End-IF
# check every 10mins, if there is # check every 10mins, if there is
# a need for an automatic-run. # a need for an automatic-run.
await asyncio.sleep(600) await asyncio.sleep(600)
# End-while-loop
logfile = './cal.log'
logging.basicConfig(filename=logfile, filemode='a+',
level=logging.INFO,
format='%(levelname)-6s: %(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
loop = asyncio.get_event_loop()
loop.run_until_complete(auto_run())
loop.close() parser = argparse.ArgumentParser(description='Start the report service')
parser.add_argument('--config-file', type=str, default='./cal_conf.yaml')
parser.add_argument('--log-file', type=str, default='./report.log')
parser.add_argument('--logging', type=str, default="INFO",
choices=['INFO', 'DEBUG', 'ERROR'])
if __name__ == "__main__":
args = vars(parser.parse_args())
conf_file = args["config_file"]
with open(conf_file, "r") as f:
config = yaml.load(f.read())
logfile = args["log_file"]
fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(filename=logfile, filemode='a+',
level=getattr(logging, args['logging']),
format='%(levelname)-6s: %(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logfile = './report.log'
logging.basicConfig(filename=logfile, filemode='a+',
level=logging.INFO,
format='%(levelname)-6s: %(asctime)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
loop = asyncio.get_event_loop()
loop.run_until_complete(auto_run())
loop.close()
...@@ -7,7 +7,7 @@ import yaml ...@@ -7,7 +7,7 @@ import yaml
import zmq import zmq
import zmq.asyncio import zmq.asyncio
logfile = './cal.log' logfile = './report.log'
logging.basicConfig(filename=logfile, filemode='a+', logging.basicConfig(filename=logfile, filemode='a+',
level=logging.INFO, level=logging.INFO,
format='%(levelname)-6s: %(asctime)s %(message)s', format='%(levelname)-6s: %(asctime)s %(message)s',
......
...@@ -104,7 +104,7 @@ async def cp_with_conc_lim(f, path, sem): ...@@ -104,7 +104,7 @@ async def cp_with_conc_lim(f, path, sem):
:param f: the main file with it's current path. :param f: the main file with it's current path.
:param path: the path, where f is copied to. :param path: the path, where f is copied to.
:param sem: Semaphore is a variable that control :param sem: Semaphore is a variable that controls
access to common resources. access to common resources.
""" """
async with sem: async with sem:
...@@ -166,7 +166,6 @@ async def server_runner(config): ...@@ -166,7 +166,6 @@ async def server_runner(config):
# reports config file # reports config file
req_cfg = {} req_cfg = {}
#Begin-IF response dict or list
if isinstance(response, dict): if isinstance(response, dict):
req_cfg = response req_cfg = response
elif isinstance(response, list): elif isinstance(response, list):
...@@ -184,23 +183,19 @@ async def server_runner(config): ...@@ -184,23 +183,19 @@ async def server_runner(config):
else: else:
logging.error(Errors.REQUEST_MALFORMED.format(response)) logging.error(Errors.REQUEST_MALFORMED.format(response))
continue continue
#End-IF response dict or list
logging.info('Requested Configuration: {}'.format(req_cfg)) logging.info('Requested Configuration: {}'.format(req_cfg))
# Begin-asyncio-function(do_action)
async def do_action(cfg): async def do_action(cfg):
logging.info('Run plot production') logging.info('Run plot production')
all_new_files = [] all_new_files = []
# Begin-for-loop over instruments
for instr_name, instrument in cfg.items(): for instr_name, instrument in cfg.items():
if instr_name == "GLOBAL": if instr_name == "GLOBAL":
continue continue
# Begin-for-loop over detectors
for det_name, detector in instrument.items(): for det_name, detector in instrument.items():
logging.info('Process detector: {}'.format(det_name)) logging.info('Process detector: {}'.format(det_name))
...@@ -239,7 +234,6 @@ async def server_runner(config): ...@@ -239,7 +234,6 @@ async def server_runner(config):
det_new_files = {} det_new_files = {}
# Begin-for-loop over new-figures
for f in figures: for f in figures:
const = f.split('/')[-1].split('_')[0] const = f.split('/')[-1].split('_')[0]
fpath = '{}/{}/{}'.format(path, det_name, const) fpath = '{}/{}/{}'.format(path, det_name, const)
...@@ -253,14 +247,11 @@ async def server_runner(config): ...@@ -253,14 +247,11 @@ async def server_runner(config):
sem = asyncio.Semaphore(50) sem = asyncio.Semaphore(50)
all_new_files.append( all_new_files.append(
'{}/{}'.format(fpath, f.split('/')[-1])) '{}/{}'.format(fpath, f.split('/')[-1]))
# End-for-loop over new-figures
await asyncio.gather(*[cp_with_conc_lim(k, v, sem) await asyncio.gather(*[cp_with_conc_lim(k, v, sem)
for k, v in det_new_files.items()]) for k, v in det_new_files.items()])
logging.info('Figures Copied') logging.info('Figures Copied')
# End-for-loop over detectors
# End-for-loop over instruments
asyncio.ensure_future( asyncio.ensure_future(
push_figures(cfg['GLOBAL']['git']['repo-local'], push_figures(cfg['GLOBAL']['git']['repo-local'],
...@@ -268,7 +259,6 @@ async def server_runner(config): ...@@ -268,7 +259,6 @@ async def server_runner(config):
logging.info('All done') logging.info('All done')
return return
# End-asyncio-function(do_action)
# TODO delete out-folder # TODO delete out-folder
...@@ -280,7 +270,7 @@ async def server_runner(config): ...@@ -280,7 +270,7 @@ async def server_runner(config):
parser = argparse.ArgumentParser(description='Start the report service') parser = argparse.ArgumentParser(description='Start the report service')
parser.add_argument('--config-file', type=str, default='./cal_conf.yaml') parser.add_argument('--config-file', type=str, default='./cal_conf.yaml')
parser.add_argument('--log-file', type=str, default='./cal.log') parser.add_argument('--log-file', type=str, default='./report.log')
parser.add_argument('--logging', type=str, default="INFO", parser.add_argument('--logging', type=str, default="INFO",
choices=['INFO', 'DEBUG', 'ERROR']) choices=['INFO', 'DEBUG', 'ERROR'])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment