diff --git a/webservice/update_config.py b/webservice/update_config.py new file mode 100644 index 0000000000000000000000000000000000000000..1be16cfa2ca65e266d087d41cd4a78f36dce1d40 --- /dev/null +++ b/webservice/update_config.py @@ -0,0 +1,83 @@ +import argparse +import json +import sys + +import yaml +import zmq + +available_options = { + "AGIPD": {"force-hg-if-below": float, + "blc-noise": bool, + "dont-zero-nans": bool, + "dont-zero-orange": bool, + "max-pulses": int}, +} + +parser = argparse.ArgumentParser( + description='Request update of configuration') +parser.add_argument('--detector', type=str, choices=['AGIPD']) +parser.add_argument('--task', type=str, choices=['correct', 'dark']) +parser.add_argument('--proposal', type=str, + help='The proposal number, without leading p, but with leading zeros') # noqa +parser.add_argument('--instrument', type=str, + choices=["SPB", "MID", "FXE", "SCS", "SQS", "HED", + "DETLAB"], help='The instrument') # noqa +parser.add_argument('--cycle', type=str, help='The facility cycle') +parser.add_argument('--apply', action='store_true') + +# remove help calls as they will cause the argument parser to exit +add_help = False +if "-h" in sys.argv: + sys.argv.remove("-h") + add_help = True +if "--help" in sys.argv: + sys.argv.remove("--help") + add_help = True + +known, remaining = parser.parse_known_args() +args = vars(known) +detector = args["detector"] + +for option, typ in available_options[detector].items(): + parser.add_argument(f"--{option}", type=typ) + +if add_help: + sys.argv.append("--help") + +args = vars(parser.parse_args()) + +task = args['task'] +instrument = args['instrument'] +proposal = args['proposal'] +cycle = args['cycle'] + +if task is None or instrument is None or proposal is None or cycle is None: + print("Need to define all fields") + exit() + +new_conf = {task: {instrument: {detector: {}}}} +for key, value in args.items(): + key = key.replace("_", "-") + if key in available_options[detector] and value is not None: + new_conf[task][instrument][detector][key] = value + +pyaml = yaml.dump(new_conf, default_flow_style=False) + +if not args["apply"]: + print("\n") + print("-" * 80) + print("THIS IS A DRY RUN ONLY, NO CHANGES ARE MADE") + print("\n") + print("-" * 80) + +print(f"Sending the following update: \n {pyaml}") +print("-" * 80) +con = zmq.Context() +socket = con.socket(zmq.REQ) +con = socket.connect("tcp://max-exfl016:5555") +msg = "','".join(["update_conf", "SASEX", args["instrument"], args["cycle"], + args["proposal"], json.dumps(new_conf), str(args["apply"])]) +socket.send("['{}']".format(msg).encode()) +resp = socket.recv_multipart()[0] +print("Configuration now in place is:") +print(resp.decode()) diff --git a/webservice/webservice.py b/webservice/webservice.py index d046b067fa7d5ba1fb066f4517abc7d631b192b4..430d1a2b29a5c954e7c8027cdac19aabb79501b2 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -3,6 +3,7 @@ import asyncio import copy import getpass import glob +import json import logging import os import sqlite3 @@ -112,6 +113,77 @@ async def upload_config(socket, config, yaml, instrument, cycle, proposal): socket.send(Success.UPLOADED_CONFIG.format(cycle, proposal).encode()) +def merge(source, destination): + """ + Deep merge two dictionaries + + :param source: source dictionary to merge into destination + :param destination: destination dictionary which is being merged in + :return: the updated destination dictionary + + Taken from: https://stackoverflow.com/questions/20656135/python-deep-merge-dictionary-data + """ + for key, value in source.items(): + if isinstance(value, dict): + # get node or create one + node = destination.setdefault(key, {}) + merge(value, node) + else: + destination[key] = value + + return destination + + +async def change_config(socket, config, updated_config, instrument, cycle, + proposal, apply=False): + """ + Change the configuration of a proposal + + If no proposal specific configuration yet exists, one is first created + based on the default configuration of the proposal + + Changes are committed to git. + + :param socket: ZMQ socket to send reply on + :param config: repo config as given in YAML config file + :param updated_config: a dictionary containing the updated config + :param instrument: the instrument to change config for + :param cycle: the cycle to change config for + :param proposal: the proposal to change config for + :param apply: set to True to actually commit a change, otherwise a dry-run + is performed + :return: The updated config to the requesting zmq socket + """ + # first check if a proposal specific config exists, if not create one + repo = Repo(config['local-path']) + repo.remote().pull() + prop_dir = os.path.join(repo.working_tree_dir, cycle) + os.makedirs(prop_dir, exist_ok=True) + fpath = "{}/p{:06d}.yaml".format(prop_dir, int(proposal)) + if not os.path.exists(fpath): + with open("{}/default.yaml".format(repo.working_tree_dir), "r") as f: + defconf = yaml.load(f.read()) + subconf = {} + for action, instruments in defconf.items(): + subconf[action]= {} + subconf[action][instrument] = instruments[instrument] + with open(fpath, "w") as wf: + wf.write(yaml.dump(subconf, default_flow_style=False)) + new_conf = None + with open(fpath, "r") as rf: + existing_conf = yaml.load(rf.read()) + new_conf = merge(updated_config, existing_conf) + if apply: + with open(fpath, "w") as wf: + wf.write(yaml.dump(new_conf, default_flow_style=False)) + repo.index.add([fpath]) + repo.index.commit( + "Update to proposal YAML: {}".format(datetime.now().isoformat())) + repo.remote().push() + logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal)) + socket.send(yaml.dump(new_conf, default_flow_style=False).encode()) + + async def slurm_status(filter_user=True): """ Return the status of slurm jobs by calling squeue @@ -362,7 +434,7 @@ async def server_runner(config, mode): action, payload = response[0], response[1:] if action not in ["correct", 'dark', 'query-rid', - 'upload-yaml']: # only handle known actions + 'upload-yaml', 'update_conf']: # only handle known actions logging.warn(Errors.UNKNOWN_ACTION.format(action)) socket.send(Errors.UNKNOWN_ACTION.format(action).encode()) continue @@ -379,6 +451,18 @@ async def server_runner(config, mode): priority = None req_res = None + if action in ['update_conf']: + try: + sase, instrument, cycle, proposal, config_yaml, apply = payload # noqa + updated_config = json.loads(config_yaml) + await change_config(socket, config['config-repo'], + updated_config, instrument, cycle, + proposal, apply.upper()=="TRUE") + except Exception as e: + e = str(e) + logging.error(f"Failure applying config for {proposal}:" + + " {e}: {updated_config}") + if action in ['dark', 'correct']: wait_runs = []