diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb index b96d1b285ddc4a816153c4b4b0eae6657a419af4..69c91d7de01bf7fa5769fc9282a0a2949effb9a9 100644 --- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb +++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb @@ -30,7 +30,7 @@ "run = 11 # runs to process, required\n", "\n", "karabo_id = \"MID_DET_AGIPD1M-1\" # karabo karabo_id\n", - "karabo_da = [-1] # data aggregators\n", + "karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators\n", "receiver_id = \"{}CH0\" # inset for receiver devices\n", "path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data\n", "h5path = 'INSTRUMENT/{}/DET/{}:xtdf/' # path in the HDF5 file to images\n", @@ -228,7 +228,7 @@ "print(f\"Instrument {instrument}\")\n", "print(f\"Detector instance {dinstance}\")\n", "\n", - "if karabo_da[0] == -1:\n", + "if karabo_da[0] == '-1':\n", " if modules[0] == -1:\n", " modules = list(range(16))\n", " karabo_da = [\"AGIPD{:02d}\".format(i) for i in modules]\n", diff --git a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb index d4299cd575ba43756c838cf74e06ac438e1dd8c6..a10165106967ae33e7c44eac14e902df68af3ba2 100644 --- a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb +++ b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb @@ -34,7 +34,7 @@ "run_low = 169 # run number in which low gain data was recorded, required\n", "\n", "karabo_id = \"SPB_DET_AGIPD1M-1\" # karabo karabo_id\n", - "karabo_da = [-1] # data aggregators\n", + "karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators\n", "receiver_id = \"{}CH0\" # inset for receiver devices\n", "path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data\n", "h5path = '/INSTRUMENT/{}/DET/{}:xtdf/image' # path in the HDF5 file to images\n", @@ -190,7 +190,7 @@ }, "outputs": [], "source": [ - "if karabo_da[0] == -1:\n", + "if karabo_da[0] == '-1':\n", " if modules[0] == -1:\n", " modules = list(range(16))\n", " karabo_da = [\"AGIPD{:02d}\".format(i) for i in modules]\n", diff --git a/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb b/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb index 35926c6e6a8d98a3c5c6718f2adba4594b02a03a..51a27c7409880050a615b4e16206375f2fa013c1 100644 --- a/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb +++ b/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb @@ -32,7 +32,7 @@ "run = 1497 # run number in which data was recorded, required\n", "\n", "karabo_id = \"SCS_DET_DSSC1M-1\" # karabo karabo_id\n", - "karabo_da = [-1] # data aggregators\n", + "karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators\n", "receiver_id = \"{}CH0\" # inset for receiver devices\n", "path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data\n", "h5path = '/INSTRUMENT/{}/DET/{}:xtdf/image' # path in the HDF5 file to images\n", @@ -102,7 +102,7 @@ "h5path_idx = h5path_idx.format(karabo_id, receiver_id)\n", "\n", "\n", - "if karabo_da[0] == -1:\n", + "if karabo_da[0] == '-1':\n", " if modules[0] == -1:\n", " modules = list(range(16))\n", " karabo_da = [\"DSSC{:02d}\".format(i) for i in modules]\n", diff --git a/notebooks/DSSC/DSSC_Correct_and_Verify.ipynb b/notebooks/DSSC/DSSC_Correct_and_Verify.ipynb index 79fa9bf4fd768c06fa6607abfea6bb7b6d85c99d..5509fd54af89411a9edf2d78c635f17dd7205788 100644 --- a/notebooks/DSSC/DSSC_Correct_and_Verify.ipynb +++ b/notebooks/DSSC/DSSC_Correct_and_Verify.ipynb @@ -30,7 +30,7 @@ "run = 9987 #runs to process, required\n", "\n", "karabo_id = \"SCS_DET_DSSC1M-1\" # karabo karabo_id\n", - "karabo_da = [-1] # data aggregators\n", + "karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators\n", "receiver_id = \"{}CH0\" # inset for receiver devices\n", "path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data\n", "h5path = 'INSTRUMENT/{}/DET/{}:xtdf/image' # path in the HDF5 file to images\n", @@ -106,7 +106,7 @@ "h5path_idx = h5path_idx.format(karabo_id, receiver_id)\n", "\n", "\n", - "if karabo_da[0] == -1:\n", + "if karabo_da[0] == '-1':\n", " if modules[0] == -1:\n", " modules = list(range(16))\n", " karabo_da = [\"DSSC{:02d}\".format(i) for i in modules]\n", @@ -1017,7 +1017,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.6" + "version": "3.6.7" } }, "nbformat": 4, diff --git a/notebooks/LPD/LPDChar_Darks_NBC.ipynb b/notebooks/LPD/LPDChar_Darks_NBC.ipynb index 11ed8987c1435aefbe7951d3ed715fe6c9d1289b..0b9c384fb00ff7fba60f53a2425e9676d520f572 100644 --- a/notebooks/LPD/LPDChar_Darks_NBC.ipynb +++ b/notebooks/LPD/LPDChar_Darks_NBC.ipynb @@ -65,7 +65,7 @@ "run_low = 114 # run number in which low gain data was recorded, required\n", "\n", "karabo_id = \"FXE_DET_LPD1M-1\" # karabo karabo_id\n", - "karabo_da = [-1] # data aggregators\n", + "karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators\n", "receiver_id = \"{}CH0\" # inset for receiver devices\n", "path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data\n", "h5path = '/INSTRUMENT/{}/DET/{}:xtdf/image' # path in the HDF5 file to images\n", @@ -149,7 +149,7 @@ "cells = np.arange(max_cells)\n", "gain_names = ['High', 'Medium', 'Low']\n", " \n", - "if karabo_da[0] == -1:\n", + "if karabo_da[0] == '-1':\n", " if modules[0] == -1:\n", " modules = list(range(16))\n", " karabo_da = ['LPD{:02d}'.format(i) for i in modules]\n", diff --git a/notebooks/LPD/LPD_Correct_and_Verify.ipynb b/notebooks/LPD/LPD_Correct_and_Verify.ipynb index 0c731274ea8e865cb24d3a337cff600dd5b1418f..776c2d6fa2ff20ca7583354c9fe406d822f280f1 100644 --- a/notebooks/LPD/LPD_Correct_and_Verify.ipynb +++ b/notebooks/LPD/LPD_Correct_and_Verify.ipynb @@ -28,11 +28,11 @@ "run = 270 # runs to process, required\n", "\n", "karabo_id = \"FXE_DET_LPD1M-1\" # karabo karabo_id\n", - "karabo_da = [-1] # data aggregators\n", + "karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators\n", "receiver_id = \"{}CH0\" # inset for receiver devices\n", "path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data\n", - "h5path = '/INSTRUMENT/{}/DET/{}:xtdf/image' # path in the HDF5 file to images\n", - "h5path_idx = '/INDEX/{}/DET/{}:xtdf/image' # path in the HDF5 file to images\n", + "h5path = '/INSTRUMENT/{}/DET/{}:xtdf/' # path in the HDF5 file to images\n", + "h5path_idx = '/INDEX/{}/DET/{}:xtdf/' # path in the HDF5 file to images\n", "\n", "use_dir_creation_date = True # use the creation date of the directory for database time derivation\n", "cal_db_interface = \"tcp://max-exfl016:8015#8020\" # the database interface to use\n", @@ -88,7 +88,7 @@ "if sequences[0] == -1:\n", " sequences = None\n", "\n", - "if karabo_da[0] == -1:\n", + "if karabo_da[0] == '-1':\n", " if modules[0] == -1:\n", " modules = list(range(16))\n", " karabo_da = ['LPD{:02d}'.format(i) for i in modules]\n", @@ -843,7 +843,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.6" + "version": "3.6.7" } }, "nbformat": 4, diff --git a/notebooks/ePix/Characterize_Darks_ePix_NBC.ipynb b/notebooks/ePix100/Characterize_Darks_ePix100_NBC.ipynb similarity index 100% rename from notebooks/ePix/Characterize_Darks_ePix_NBC.ipynb rename to notebooks/ePix100/Characterize_Darks_ePix100_NBC.ipynb diff --git a/notebooks/ePix/Correction_ePix_NBC.ipynb b/notebooks/ePix100/Correction_ePix100_NBC.ipynb similarity index 100% rename from notebooks/ePix/Correction_ePix_NBC.ipynb rename to notebooks/ePix100/Correction_ePix100_NBC.ipynb diff --git a/notebooks/ePix/Characterize_Darks_ePix10K_NBC.ipynb b/notebooks/ePix10K/Characterize_Darks_ePix10K_NBC.ipynb similarity index 100% rename from notebooks/ePix/Characterize_Darks_ePix10K_NBC.ipynb rename to notebooks/ePix10K/Characterize_Darks_ePix10K_NBC.ipynb diff --git a/notebooks/ePix/Correction_ePix10K_NBC.ipynb b/notebooks/ePix10K/Correction_ePix10K_NBC.ipynb similarity index 100% rename from notebooks/ePix/Correction_ePix10K_NBC.ipynb rename to notebooks/ePix10K/Correction_ePix10K_NBC.ipynb diff --git a/webservice/manual_launch.py b/webservice/manual_launch.py index dd038780044426decbc6e2081bb6a8d2c96f44dd..bf07427f2ba6a40c53f5884ed84234f016cab754 100644 --- a/webservice/manual_launch.py +++ b/webservice/manual_launch.py @@ -8,7 +8,7 @@ import zmq parser = argparse.ArgumentParser( description='Request dark characterization. Will wait on data transfers to complete first!') parser.add_argument('--proposal', type=str, help='The proposal number, without leading p, but with leading zeros') -parser.add_argument('--instrument', type=str, choices=["SPB", "MID", "FXE", "SCS", "SQS", "HED", "DETLAB"], help='The instrument') +parser.add_argument('--instrument', type=str, choices=["SPB", "MID", "FXE", "SCS", "SQS", "HED", "DETLAB", "CALLAB"], help='The instrument') parser.add_argument('--cycle', type=str, help='The facility cycle') parser.add_argument('--run', type=int, help='Run number as an integer') parser.add_argument('--mdc-id', type=int, help='Run id from MDC') diff --git a/webservice/request_darks.py b/webservice/request_darks.py index 866560774b44f0f8b34f38ebed379a6464a9b017..b9847f1b411ac95ff016f21fc587f2411f3172c1 100644 --- a/webservice/request_darks.py +++ b/webservice/request_darks.py @@ -9,11 +9,15 @@ parser = argparse.ArgumentParser( parser.add_argument('--proposal', type=str, help='The proposal number, without leading p, but with leading zeros') # noqa parser.add_argument('--instrument', type=str, - choices=["SPB", "MID", "FXE", "SCS", "DETLAB", "SQS", "HED"], + choices=["SPB", "MID", "FXE", "SCS", "DETLAB", "SQS", "HED", "CALLAB"], help='The instrument') -parser.add_argument('--detectors', type=str, nargs='*', - help='A list of detectors to process, default ["all"]', +parser.add_argument('--karabo-ids', type=str, nargs='*', + help='A list of karabo-id, default ["all"]', default=['all']) +parser.add_argument('--karabo-das', type=str, nargs='*', + help='A list of karabo-da to process, default ["all"]', + default=['all']) + parser.add_argument('--cycle', type=str, help='The facility cycle') parser.add_argument('--run-high', type=str, help='Run number of high gain data as an integer') @@ -22,9 +26,7 @@ parser.add_argument('--run-med', type=str, parser.add_argument('--run-low', type=str, help='Run number of low gain data as an integer') parser.add_argument('--run', type=str, help='Run number as an integer') -parser.add_argument('--reservation', type=str, help='Reservation to run on. ' - 'Do not use this command, ' - 'as it has no effect anymore.') #noqa + # default is to use configured reservations parser.add_argument('--bkg', action='store_true', help='Background mode: exit script after requesting dark.') @@ -38,7 +40,8 @@ con = socket.connect("tcp://max-exfl016:5555") uuid = str(datetime.now().timestamp().as_integer_ratio()[0]) parm_list = ["dark", uuid, "SASEX", args["instrument"], args["cycle"], - args["proposal"], ','.join(args["detectors"])] + args["proposal"], ','.join(args["karabo_ids"]), + ','.join(args["karabo_das"])] if "run_high" in args and args["run_high"]: parm_list += ["(\"run-high\", \"{}\")".format(args["run_high"])] @@ -49,11 +52,6 @@ if "run_low" in args and args["run_low"]: if "run" in args and args["run"]: parm_list += ["(\"run\", \"{}\")".format(args["run"])] -# Avoid giving a reservation parameter after the ITDM changes -# for giving xcal high priority by default. -#if "reservation" in args and args["reservation"]: - #parm_list += ["(\"reservation\", \"{}\")".format(args["reservation"])] - msg = "','".join(parm_list) socket.send("['{}']".format(msg).encode()) resp = socket.recv_multipart()[0] diff --git a/webservice/serve_overview.py b/webservice/serve_overview.py index ab34a06f60adb21fccc20dccdf79b0a9d41edf39..3863271e454400c8dad8329e6193a4a5caf7e11d 100644 --- a/webservice/serve_overview.py +++ b/webservice/serve_overview.py @@ -101,7 +101,7 @@ class RequestHandler(BaseHTTPRequestHandler): del pars["request_dark"] for k, v in pars.items(): par_list.append('--{}'.format(str(k).replace("_", "-"))) - if k == 'detectors': + if k == 'karabo_ids': for det in v.split(","): par_list.append('{}'.format(det)) elif k == 'proposal': @@ -125,8 +125,8 @@ class RequestHandler(BaseHTTPRequestHandler): detectors = list(cal_config['dark'][pars['instrument']].keys()) det_list = [] - if 'detectors' in pars: - det_list = pars['detectors'].split(",") + if 'karabo_ids' in pars: + det_list = pars['karabo_ids'].split(",") det_names = [] for d in detectors: @@ -136,8 +136,8 @@ class RequestHandler(BaseHTTPRequestHandler): det_names.append(["", d]) run_names = [] - run1_det = ["FASTCCD", "EPIX", "DSSC", "PNCCD"] - run3_det = ["LPD", "AGIPD", "JUNGFRAU"] + run1_det = ["FASTCCD", "FCCD", "EPIX", "DSSC", "PNCCD"] + run3_det = ["LPD", "AGIPD", "JUNGFRAU", "JF"] msg = '' if any(y in x for x in det_list for y in run1_det): run_names = ['run'] diff --git a/webservice/templates/request_dark.html b/webservice/templates/request_dark.html index efd129e37a7a273a0011d2a4a72e7f5716b617b0..c18afd84b8bae13b9925209486f4d88414f2ec12 100644 --- a/webservice/templates/request_dark.html +++ b/webservice/templates/request_dark.html @@ -37,7 +37,7 @@ function loadCheckbox(command) { msg += "&cycle="+cycle msg += "&proposal="+proposal - msg += "&detectors="+det_list + msg += "&karabo_ids="+det_list msg += run_list if (command=='request'){ if (!isInt(cycle) || cycle.length!=6){ diff --git a/webservice/webservice.py b/webservice/webservice.py index fdb65cd81f45aaa1715ddb8feb0fa303ba041139..fdab5233937d65e4aacad7b1f540b191ea77cd9a 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -17,7 +17,7 @@ import zmq.asyncio import zmq.auth.thread from dateutil import parser as timeparser from git import Repo, InvalidGitRepositoryError -from messages import Errors, Success +from messages import Errors, Success, MDC from metadata_client.metadata_client import MetadataClient @@ -400,7 +400,7 @@ async def run_action(job_db, cmd, mode, proposal, run, rid): now=datetime.now().isoformat(), det=cmd[3], act=cmd[4])) job_db.commit() - logging.debug(" ".join(cmd)) + logging.debug((" ".join(cmd)).replace(',', '').replace("'", "") ) if "DARK" in cmd: return Success.START_CHAR.format(proposal, run) else: @@ -415,13 +415,65 @@ async def run_action(job_db, cmd, mode, proposal, run, rid): else: logging.debug(Success.START_CORRECTION_SIM.format(proposal, run)) - logging.debug(cmd) + logging.debug((" ".join(cmd)).replace(',', '').replace("'", "")) if "DARK" in cmd: return Success.START_CHAR_SIM.format(proposal, run) else: return Success.START_CORRECTION_SIM.format(proposal, run) +async def wait_on_transfer(rpath, max_tries=300): + """ + Wait on data files to be transferred to Maxwell + + :param rpath: Folder, which contains data files migrated to Maxwell + :param max_tries: Maximum number of checks if files are transferred + :return: True if files are transferred + """ + # dcache files are assumed migrated + if 'pnfs' in os.path.realpath(rpath): + return True + rstr = None + ret = None + tries = 0 + + # wait until folder gets created + while not os.path.exists(rpath): + if tries > max_tries: + return False + tries += 1 + await asyncio.sleep(10) + + # wait until files are migrated + while rstr is None or 'status="online"' in rstr or 'status="Online"' in rstr or ret.returncode != 0: # noqa + await asyncio.sleep(10) + ret = subprocess.run(["getfattr", "-n", "user.status", rpath], + stdout=subprocess.PIPE) + rstr = ret.stdout.decode() + if tries > max_tries: + return False + tries += 1 + return ret.returncode == 0 + + +async def check_files(in_folder, runs, karabo_das): + """ + Check if files for given karabo-das exists for given runs + + :param in_folder: Input folder + :param runs: List of runs + :param karabo_das: List of karabo-das + :return: True if files are there + """ + files_exists = True + for runnr in runs: + rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) + fl = glob.glob(f"{rpath}/*.h5") + if not any(y in x for x in fl for y in karabo_das): + files_exists = False + return files_exists + + async def server_runner(config, mode): """ The main server loop @@ -491,8 +543,7 @@ async def server_runner(config, mode): socket.send(Errors.REQUEST_FAILED.encode()) continue - if len( - response) < 2: # catch parseable but malformed requests + if len(response) < 2: # catch parseable but malformed requests logging.error(Errors.REQUEST_MALFORMED.format(response)) socket.send(Errors.REQUEST_MALFORMED.format(response).encode()) continue @@ -501,7 +552,7 @@ async def server_runner(config, mode): if action not in ["correct", 'dark', 'query-rid', 'upload-yaml', 'update_conf']: # only handle known actions - logging.warn(Errors.UNKNOWN_ACTION.format(action)) + logging.warning(Errors.UNKNOWN_ACTION.format(action)) socket.send(Errors.UNKNOWN_ACTION.format(action).encode()) continue @@ -514,10 +565,8 @@ async def server_runner(config, mode): async def do_action(action, payload): in_folder = None - out_folder = None run_mapping = {} priority = None - req_res = None if action in ['update_conf']: updated_config = None @@ -546,21 +595,20 @@ async def server_runner(config, mode): "is requested. Checking files..." logging.info(msg.format(runnr, instrument, proposal)) if action == 'dark': - (rid, sase, instrument, cycle, proposal, - det_list) = payload[:6] + (rid, sase, instrument, cycle, proposal, karabo_ids, + karabo_das) = payload[:7] msg = "Dark characterization for {} at {} " \ "is requested. Checking files..." - logging.info(msg.format(det_list, instrument)) - det_list = det_list.split(',') - runs = payload[6:] # can be many + logging.info(msg.format(karabo_ids, instrument)) + karabo_ids = karabo_ids.split(',') + karabo_das = karabo_das.split(',') + runs = payload[7:] # can be many for i, run in enumerate(runs): erun = eval(run) if isinstance(erun, (list, tuple)): typ, runnr = erun if typ == "reservation": - req_res = runnr continue - runnr = runnr.replace("r", "") run_mapping[typ] = runnr wait_runs.append(runnr) @@ -569,21 +617,24 @@ async def server_runner(config, mode): wait_runs.append(erun) proposal = proposal.replace("p", "") proposal = "{:06d}".format(int(proposal)) - specific_conf_file = "{}/{}/{}.yaml".format( + + # Read calibration configuration from yaml + conf_file = "{}/{}/{}.yaml".format( config['config-repo']['local-path'], cycle, proposal) - if os.path.exists(specific_conf_file): - with open(specific_conf_file, "r") as f: - pconf = yaml.load(f.read(), Loader=yaml.FullLoader)[action] - else: - default_file = "{}/default.yaml".format( + if not os.path.exists(conf_file): + conf_file = "{}/default.yaml".format( config['config-repo']['local-path']) - with open(default_file, "r") as f: - pconf = yaml.load(f.read(), Loader=yaml.FullLoader)[action] - if instrument not in pconf: - socket.send(Errors.NOT_CONFIGURED.encode()) - logging.info( - 'Instrument {} in unknown'.format(instrument)) - return + + with open(conf_file, "r") as f: + pconf_full = yaml.load(f.read(), + Loader=yaml.FullLoader) + data_conf = pconf_full['data-mapping'] + if instrument in pconf_full[action]: + pconf = pconf_full[action][instrument] + else: + socket.send(Errors.NOT_CONFIGURED.encode()) + logging.info(f'Instrument {instrument} is unknown') + return in_folder = config[action]['in-folder'].format( instrument=instrument, cycle=cycle, proposal=proposal) @@ -600,137 +651,78 @@ async def server_runner(config, mode): socket.send(msg.encode()) return + # Check if all files for given runs are transferred all_transfers = [] for runnr in wait_runs: rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) - - async def wait_on_transfer(): - if 'pnfs' in os.path.realpath( - rpath): # dcache files are assumed migrated - return True - rstr = None - ret = None - max_tries = 300 # 3000s - tries = 0 - while not os.path.exists(rpath): - await asyncio.sleep(10) - # await asyncio.sleep(1) - while rstr is None or 'status="online"' in rstr or 'status="Online"' in rstr or ret.returncode != 0: # noqa - await asyncio.sleep(10) - ret = subprocess.run( - ["getfattr", "-n", "user.status", rpath], - stdout=subprocess.PIPE) - rstr = ret.stdout.decode() - print(rstr) - if tries > max_tries: - return False - tries += 1 - - return ret.returncode == 0 - - transfer_complete = await wait_on_transfer() - print("Transfer complete: ", transfer_complete) + transfer_complete = await wait_on_transfer(rpath) all_transfers.append(transfer_complete) if not transfer_complete: logging.error( Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr)) - msg = "Timeout waiting for migration. Contact det-support@xfel.eu" if action == 'correct': response = mdc.update_run_api(rid, { 'flg_cal_data_status': 'NA', - 'cal_pipeline_reply': msg}) + 'cal_pipeline_reply': MDC.MIGRATION_TIMEOUT}) if response.status_code != 200: logging.error(Errors.MDC_RESPONSE.format(response)) - print("All transfers", all(all_transfers)) if not all(all_transfers): logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal, ",".join( wait_runs))) return - print("Now doing: {}".format(action)) + logging.debug(f"Now doing: {action}") ts = datetime.now().strftime('%y%m%d_%H%M%S') if action == 'dark': - print("Running dark cal") - status = [] detectors = {} out_folder = config[action]['out-folder'].format( instrument=instrument, cycle=cycle, proposal=proposal, runs="_".join(wait_runs)) - for runnr in wait_runs: - rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) + # Run over all available detectors + if karabo_ids[0] == 'all': + karabo_ids = list(pconf.keys()) - for detector, dconfig in pconf[instrument].items(): - # check if we find files according - # to mapping in raw run folder - if detector not in det_list and det_list[0] != 'all': - continue - fl = glob.glob( - "{}/RAW-*{}*.h5".format(rpath, dconfig["inset"])) - if len(fl): - thisconf = copy.copy(dconfig) - thisconf["in-folder"] = in_folder - thisconf["out-folder"] = '/'.join((out_folder, - detector.replace('-', '_'))) - - report_to = 'Dark_' + \ - detector.replace('-', '_') + '_' + ts - thisconf["report-to"] = report_to.lower() - - # don't need this for xfel-calibrate - del thisconf["inset"] - detectors[detector] = thisconf + # Prepare configs for all requested detectors + for karabo_id in karabo_ids: - else: - logging.info("File list for {} at {} is empty" - .format(detector, "{}/RAW-*{}*.h5" - .format(rpath, dconfig["inset"]))) - if len(detectors) == 0: - logging.warn(Errors.NOTHING_TO_DO.format(rpath)) - - print("Detectors:", detectors) - for detector, dconfig in detectors.items(): - if "-" in detector: - detector, _ = detector.split("-") - priority = '1' - - if detector.upper() in ["JUNGFRAU", "FASTCCD", "PNCCD", - "EPIX", "EPIX10K"]: - priority = '0' - - # --slurm-scheduling is the --nice slurm command - # to increase priority by having a lower number. - cmd = config['dark']['cmd'].format( - detector=detector, - sched_prio=str(config[action]['sched-prio']), - priority=priority, - action=action, instrument=instrument, - cycle=cycle, proposal=proposal, - runs="_r".join(wait_runs) - ).split() + if karabo_das[0] == 'all': + karabo_das = data_conf[karabo_id]["karabo-da"] - # Avoid giving a reservation parameter after the - # ITDM changes for giving xcal high priority by default - #if req_res: - # cmd += ['--reservation', req_res] + # Check if any files for given karabo-das exists + if await check_files(in_folder, wait_runs, karabo_das): + thisconf = copy.copy(data_conf[karabo_id]) - run_config = [] - for typ, run in run_mapping.items(): - if "no_mapping" in typ: - run_config.append(run) - else: - dconfig[typ] = run - if len(run_config): - dconfig["runs"] = ",".join(run_config) + if (karabo_id in pconf and + isinstance(pconf[karabo_id], dict)): + thisconf.update(copy.copy(pconf[karabo_id])) - cmd = await parse_config(cmd, dconfig) + thisconf["in-folder"] = in_folder + thisconf["out-folder"] = '/'.join((out_folder, + karabo_id.replace('-', '_'))) + thisconf["karabo-id"] = karabo_id + thisconf["karabo-da"] = karabo_das + + run_config = [] + for typ, run in run_mapping.items(): + if "no_mapping" in typ: + run_config.append(run) + else: + thisconf[typ] = run + if len(run_config): + thisconf["runs"] = ",".join(run_config) - ret = await run_action(job_db, cmd, mode, proposal, - wait_runs[0], 0) - status.append(ret) + detectors[karabo_id] = thisconf + else: + logging.warning("File list for {} at {} is empty" + .format(karabo_id, "{}/*.h5" + .format(rpath))) + + if len(detectors) == 0: + logging.warning(Errors.NOTHING_TO_DO.format(rpath)) if action == 'correct': runnr = wait_runs[0] @@ -740,61 +732,59 @@ async def server_runner(config, mode): instrument=instrument, cycle=cycle, proposal=proposal, run='r{:04d}'.format(int(runnr))) + # Prepare configs for all detectors in run + fl = glob.glob(f"{rpath}/*.h5") corr_file_list = set() - copy_file_list = set(glob.glob("{}/*.h5".format(rpath))) + copy_file_list = set(fl) detectors = {} - for detector, dconfig in pconf[instrument].items(): + for karabo_id in pconf: + dconfig = data_conf[karabo_id] # check if we find files according to mapping in raw run folder - fl = glob.glob( - "{}/RAW-*{}*.h5".format(rpath, dconfig["inset"])) - if len(fl): - corr_file_list = corr_file_list.union(set(fl)) + if any(y in x for x in fl for y in dconfig['karabo-da']): + for karabo_da in dconfig['karabo-da']: + tfl = glob.glob(f"{rpath}/*{karabo_da}*.h5") + corr_file_list = corr_file_list.union(set(tfl)) thisconf = copy.copy(dconfig) + if isinstance(pconf[karabo_id], dict): + thisconf.update(copy.copy(pconf[karabo_id])) thisconf["in-folder"] = in_folder thisconf["out-folder"] = out_folder - report_to = 'Correct_' + \ - detector.replace('-', '_') + '_' + ts - thisconf["report-to"] = report_to.lower() - + thisconf["karabo-id"] = karabo_id thisconf["run"] = runnr - del thisconf[ - "inset"] # don't need this for xfel-calibrate - detectors[detector] = thisconf + if priority: + thisconf["priority"] = str(priority) + + detectors[karabo_id] = thisconf copy_file_list = copy_file_list.difference(corr_file_list) - print(detectors) - asyncio.ensure_future( - copy_untouched_files(copy_file_list, out_folder, runnr)) + asyncio.ensure_future(copy_untouched_files(copy_file_list, out_folder, runnr)) if len(detectors) == 0: - logging.warn(Errors.NOTHING_TO_DO.format(rpath)) - msg = "Nothing to calibrate for this run, copied raw data only" + logging.warning(Errors.NOTHING_TO_DO.format(rpath)) + msg = MDC.NOTHING_TO_DO response = mdc.update_run_api(rid, {'flg_cal_data_status': 'NA', 'cal_pipeline_reply': msg}) if response.status_code != 200: logging.error(Errors.MDC_RESPONSE.format(response)) return - status = [] - for detector, dconfig in detectors.items(): - if "-" in detector: - detector, _ = detector.split("-") - - # --slurm-scheduling is the --nice slurm command - # to increase priority by having a lower number. - cmd = config['correct']['cmd'].format( + + if action in ['dark', 'correct']: + # run xfel_calibrate + for karabo_id, dconfig in detectors.items(): + detector = dconfig['detector-type'] + del dconfig['detector-type'] + cmd = config[action]['cmd'].format( detector=detector, sched_prio=str(config[action]['sched-prio']), action=action, instrument=instrument, cycle=cycle, proposal=proposal, - runs="_r".join(wait_runs) + runs="_r".join(wait_runs), + time_stamp=ts, + det_instance=karabo_id ).split() cmd = await parse_config(cmd, dconfig) - - if priority: - cmd += ["--priority", str(priority)] - ret = await run_action(job_db, cmd, mode, proposal, - runnr, rid) - status.append(ret) + ret = await run_action(job_db, cmd, mode, proposal, runnr, + rid if action == 'correct' else 0) if action == 'upload-yaml': sase, instrument, cycle, proposal, this_yaml = payload diff --git a/webservice/webservice.yaml b/webservice/webservice.yaml index 40b0f1f3502d27823eec93094e1f1eaad720de03..54d59f49f1537b996b6e9347c3c7d415204d7b21 100644 --- a/webservice/webservice.yaml +++ b/webservice/webservice.yaml @@ -30,6 +30,7 @@ correct: python -m xfel_calibrate.calibrate {detector} CORRECT --slurm-scheduling {sched_prio} --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_r{runs} + --report-to {action}_{det_instance}_{time_stamp} --cal-db-timeout 300000 dark: @@ -41,4 +42,5 @@ dark: --priority {priority} --slurm-scheduling {sched_prio} --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_r{runs} + --report-to /gpfs/exfel/d/cal/caldb_store/xfel/reports/{detector}/{instrument}/{det_instance}/{action}/{action}_{proposal}_{runs}_{time_stamp} --db-output diff --git a/xfel_calibrate/finalize.py b/xfel_calibrate/finalize.py index fd099370b079b8c4c31bf419c14ada252e889ffa..86b198458480f726bd27ca32acdfa7f90002baab 100644 --- a/xfel_calibrate/finalize.py +++ b/xfel_calibrate/finalize.py @@ -272,6 +272,7 @@ def make_report(run_path, tmp_path, out_path, project, author, version, "can be inspected at: {}".format(run_path)) return print("Moving report to final location: {}".format(out_path)) + makedirs(out_path, exist_ok=True) copy('{}/_build/latex/{}.pdf'.format(run_path, report_name), out_path) temp_dirs = glob(f'{tmp_path}/*/') diff --git a/xfel_calibrate/notebooks.py b/xfel_calibrate/notebooks.py index b814077b51b6e0d5c3913795105024b9887a0143..d831746da479589011250e580928cbefde180df1 100644 --- a/xfel_calibrate/notebooks.py +++ b/xfel_calibrate/notebooks.py @@ -42,7 +42,6 @@ notebooks = { "cluster cores": 8}, }, }, - "AGIPD64K": { "DARK": { "notebook": @@ -52,7 +51,6 @@ notebooks = { "cluster cores": 4}, }, }, - "LPD": { "DARK": { "notebook": "notebooks/LPD/LPDChar_Darks_NBC.ipynb", @@ -158,7 +156,7 @@ notebooks = { "DARK": { "notebook": "notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb", # noqa - "concurrency": {"parameter": None, + "concurrency": {"parameter": "karabo_da", "default concurrency": None, "cluster cores": 4}, }, @@ -173,14 +171,14 @@ notebooks = { }, "EPIX": { "DARK": { - "notebook": "notebooks/ePix/Characterize_Darks_ePix_NBC.ipynb", + "notebook": "notebooks/ePix100/Characterize_Darks_ePix_NBC.ipynb", "concurrency": {"parameter": None, "default concurrency": None, "cluster cores": 4}, }, "CORRECT": { - "notebook": "notebooks/ePix/Correction_ePix_NBC.ipynb", + "notebook": "notebooks/ePix100/Correction_ePix_NBC.ipynb", "concurrency": {"parameter": "sequences", "default concurrency": [-1], "use function": "balance_sequences", @@ -189,14 +187,14 @@ notebooks = { }, "EPIX100": { "DARK": { - "notebook": "notebooks/ePix/Characterize_Darks_ePix_NBC.ipynb", + "notebook": "notebooks/ePix100/Characterize_Darks_ePix100_NBC.ipynb", "concurrency": {"parameter": None, "default concurrency": None, "cluster cores": 4}, }, "CORRECT": { - "notebook": "notebooks/ePix/Correction_ePix_NBC.ipynb", + "notebook": "notebooks/ePix100/Correction_ePix100_NBC.ipynb", "concurrency": {"parameter": "sequences", "default concurrency": [-1], "use function": "balance_sequences", @@ -205,14 +203,14 @@ notebooks = { }, "EPIX10K": { "DARK": { - "notebook": "notebooks/ePix/Characterize_Darks_ePix10K_NBC.ipynb", + "notebook": "notebooks/ePix10K/Characterize_Darks_ePix10K_NBC.ipynb", "concurrency": {"parameter": None, "default concurrency": None, "cluster cores": 4}, }, "CORRECT": { - "notebook": "notebooks/ePix/Correction_ePix10K_NBC.ipynb", + "notebook": "notebooks/ePix10K/Correction_ePix10K_NBC.ipynb", "concurrency": {"parameter": "sequences", "default concurrency": [-1], "use function": "balance_sequences",