diff --git a/webservice/webservice.py b/webservice/webservice.py index 4f0089235272a314b3e4c3ec87323a5b77fbf9cd..fdb65cd81f45aaa1715ddb8feb0fa303ba041139 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -288,54 +288,64 @@ async def update_job_db(config): logging.info("Starting config db handling") conn = await init_job_db(config) mdc = await init_md_client(config) + time_interval = int(config['web-service']['job-update-interval']) while True: statii = await slurm_status() - c = conn.cursor() - c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ") - combined = {} - logging.debug("SLURM info {}".format(statii)) - - for r in c.fetchall(): - rid, jobid, proposal, run, status, time, _, _ = r - logging.debug("DB info {}".format(r)) - - cflg, cstatus = combined.get(rid, ([], [])) - if jobid in statii: - slstatus, runtime = statii[jobid] - query = "UPDATE jobs SET status='{status}', time='{runtime}' WHERE jobid LIKE '{jobid}'" # noqa - c.execute(query.format(status=slstatus, - runtime=runtime, - jobid=jobid)) - - cflg.append('R') - cstatus.append("{}-{}".format(slstatus, runtime)) - else: - _, sltime, slstatus = await slurm_job_status(jobid) - query = "UPDATE jobs SET status='{slstatus}' WHERE jobid LIKE '{jobid}'" # noqa - c.execute(query.format(jobid=jobid, slstatus=slstatus)) - - if slstatus == 'COMPLETED': - cflg.append("A") + # Check that slurm is giving proper feedback + if statii is None: + await asyncio.sleep(time_interval) + continue + try: + c = conn.cursor() + c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ") + combined = {} + logging.debug("SLURM info {}".format(statii)) + + for r in c.fetchall(): + rid, jobid, proposal, run, status, time, _, _ = r + logging.debug("DB info {}".format(r)) + + cflg, cstatus = combined.get(rid, ([], [])) + if jobid in statii: + slstatus, runtime = statii[jobid] + query = "UPDATE jobs SET status='{status}', time='{runtime}' WHERE jobid LIKE '{jobid}'" # noqa + c.execute(query.format(status=slstatus, + runtime=runtime, + jobid=jobid)) + + cflg.append('R') + cstatus.append("{}-{}".format(slstatus, runtime)) else: - cflg.append("NA") - cstatus.append(slstatus) - combined[rid] = cflg, cstatus - conn.commit() - - flg_order = {"R": 2, "A": 1, "NA": 0} - for rid, value in combined.items(): - if int(rid) == 0: - continue - flgs, statii = value - flg = max(flgs, key=lambda i: flg_order[i]) - msg = "\n".join(statii) - logging.debug("Update MDC {}, {}".format(rid, - msg.replace('\n', ', '))) - response = mdc.update_run_api(rid, {'flg_cal_data_status': flg, - 'cal_pipeline_reply': msg}) - if response.status_code != 200: - logging.error(Errors.MDC_RESPONSE.format(response)) - await asyncio.sleep(int(config['web-service']['job-update-interval'])) + _, sltime, slstatus = await slurm_job_status(jobid) + query = "UPDATE jobs SET status='{slstatus}' WHERE jobid LIKE '{jobid}'" # noqa + c.execute(query.format(jobid=jobid, slstatus=slstatus)) + + if slstatus == 'COMPLETED': + cflg.append("A") + else: + cflg.append("NA") + cstatus.append(slstatus) + combined[rid] = cflg, cstatus + conn.commit() + + flg_order = {"R": 2, "A": 1, "NA": 0} + for rid, value in combined.items(): + if int(rid) == 0: + continue + flgs, statii = value + flg = max(flgs, key=lambda i: flg_order[i]) + msg = "\n".join(statii) + msg_debug = f"Update MDC {rid}, {msg}" + logging.debug(msg_debug.replace('\n', ', ')) + response = mdc.update_run_api(rid, {'flg_cal_data_status': flg, + 'cal_pipeline_reply': msg}) + if response.status_code != 200: + logging.error(Errors.MDC_RESPONSE.format(response)) + except Exception as e: + e = str(e) + logging.error(f"Failure to update job DB: {e}") + + await asyncio.sleep(time_interval) async def copy_untouched_files(file_list, out_folder, run):