diff --git a/webservice/job_monitor.py b/webservice/job_monitor.py index 802d74596d365fb6f98d3c88efe3a809c1299ac7..195a9f7372868791b49650b406c40260bfd05258 100644 --- a/webservice/job_monitor.py +++ b/webservice/job_monitor.py @@ -84,116 +84,217 @@ def slurm_job_status(jobid): return "NA", "NA", "NA" -def update_job_db(config): - """ Update the job database and send out updates to MDC +class JobsMonitor: + def __init__(self, config): + log.info("Starting jobs monitor") + self.job_db = init_job_db(config) + self.mdc = init_md_client(config) + self.kafka_prod = init_kafka_producer(config) + self.kafka_topic = config['kafka']['topic'] + self.time_interval = int(config['web-service']['job-update-interval']) + + def run(self): + while True: + try: + self.do_updates() + except Exception: + log.error("Failure to update job DB", exc_info=True) + time.sleep(self.time_interval) + + def do_updates(self): + ongoing_jobs_by_exn = self.get_updates_by_exec_id() + # ^ dict grouping statuses of unfinished jobs by execution ID: + # {12345: ['R-5:41', 'PD-0:00', ...]} + # Newly completed executions are present with an empty list. + + # For executions still running, regroup the jobs by request + # (by run, for correction requests): + reqs_still_going = {} + for exec_id, running_jobs_info in ongoing_jobs_by_exn.items(): + if running_jobs_info: + req_id = self.job_db.execute( + "SELECT req_id FROM executions WHERE exec_id = ?", (exec_id,) + ).fetchone().req_id + reqs_still_going.setdefault(req_id, []).extend(running_jobs_info) + + # For executions that have finished, send out notifications, and + # check if the whole request (several executions) has finished. + reqs_finished = set() + for exec_id, running_jobs_info in ongoing_jobs_by_exn.items(): + if not running_jobs_info: + req_id = self.process_execution_finished(exec_id) + + if req_id not in reqs_still_going: + reqs_finished.add(req_id) + + # Now send updates for all requests which hadn't already finished + # by the last time this ran: + + for req_id, running_jobs_info in reqs_still_going.items(): + self.process_request_still_going(req_id, running_jobs_info) + + for req_id in reqs_finished: + self.process_request_finished(req_id) + + def get_updates_by_exec_id(self): + c = self.job_db.cursor() + c.execute("SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0") - :param config: configuration parsed from webservice YAML - """ - log.info("Starting jobs monitor") - conn = init_job_db(config) - mdc = init_md_client(config) - kafka_prod = init_kafka_producer(config) - kafka_topic = config['kafka']['topic'] - time_interval = int(config['web-service']['job-update-interval']) - - while True: statii = slurm_status() # Check that slurm is giving proper feedback if statii is None: - time.sleep(time_interval) - continue - try: - c = conn.cursor() - c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ") - combined = {} - log.debug(f"SLURM info {statii}") - - for r in c.fetchall(): - rid, jobid, proposal, run, status, _time, det, action = r - log.debug(f"DB info {r}") - - cflg, cstatus, *_ = combined.setdefault((rid, action), ( - [], [], proposal, run, det - )) - if jobid in statii: - slstatus, runtime = statii[jobid] - query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?" - c.execute(query, (slstatus, runtime, jobid)) - - cflg.append('R') - cstatus.append(f"{slstatus}-{runtime}") - else: - _, sltime, slstatus = slurm_job_status(jobid) - query = "UPDATE jobs SET status=? WHERE jobid LIKE ?" - c.execute(query, (slstatus, jobid)) - - if slstatus == 'COMPLETED': - cflg.append("A") - else: - cflg.append("NA") - cstatus.append(slstatus) - conn.commit() - - flg_order = {"R": 2, "A": 1, "NA": 0} - dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'} - for rid, action in combined: - if int(rid) == 0: # this job was not submitted from MyMDC - continue - flgs, statii, proposal, run, det = combined[rid, action] - # sort by least done status - flg = max(flgs, key=lambda i: flg_order[i]) - if flg != 'R': - log.info( - "Jobs finished - action: %s, run id: %s, status: %s", - action, rid, flg, - ) - if action == 'CORRECT': - try: - kafka_prod.send(kafka_topic, { - 'event': 'correction_complete', - 'proposal': proposal, - 'run': run, - 'detector': det, - 'success': (flg == 'A'), # A for Available - }) - except KafkaError: - log.warning("Error sending Kafka notification", - exc_info=True) - - if all(s.startswith('PD-') for s in statii): - # Avoid swamping myMdC with updates for jobs still pending. - log.debug( - "No update for action %s, rid %s: jobs pending", - action, rid - ) - continue - - msg = "\n".join(statii) - msg_debug = f"Update MDC {rid}, {msg}" - log.debug(msg_debug.replace('\n', ', ')) - - if action == 'CORRECT': - data = {'flg_cal_data_status': flg, - 'cal_pipeline_reply': msg} - if flg != 'R': - data['cal_last_end_at'] = datetime.now(tz=timezone.utc).isoformat() - response = mdc.update_run_api(rid, data) - - else: # action == 'DARK' but it's dark_request - data = {'dark_run': {'flg_status': dark_flags[flg], - 'calcat_feedback': msg}} - response = mdc.update_dark_run_api(rid, data) - - if response.status_code != 200: - log.error("Failed to update MDC for action %s, rid %s", - action, rid) - log.error(Errors.MDC_RESPONSE.format(response)) - except Exception: - log.error("Failure to update job DB", exc_info=True) - - time.sleep(time_interval) - - + return {} + log.debug(f"SLURM info {statii}") + + ongoing_jobs_by_exn = {} + for r in c.fetchall(): + log.debug(f"DB info {r}") + execn_ongoing_jobs = ongoing_jobs_by_exn.setdefault(r.exec_id, []) + + if r.job_id in statii: + # statii contains jobs which are still going (from squeue) + slstatus, runtime = statii[r.job_id] + finished = False + execn_ongoing_jobs.append(f"{slstatus}-{runtime}") + + else: + # These jobs have finished (successfully or otherwise) + _, runtime, slstatus = slurm_job_status(r.job_id) + finished = True + + c.execute( + "UPDATE slurm_jobs SET finished=?, elapsed=?, status=? WHERE job_id = ?", + (finished, runtime, slstatus, r.job_id) + ) + + self.job_db.commit() + return ongoing_jobs_by_exn + + def process_request_still_going(self, req_id, running_jobs_info): + """Send myMdC updates for a request with jobs still running/pending""" + mymdc_id, action = self.job_db.execute( + "SELECT mymdc_id, action FROM requests WHERE req_id = ?", + (req_id,) + ).fetchone() + + if all(s.startswith('PD-') for s in running_jobs_info): + # Avoid swamping myMdC with updates for jobs still pending. + log.debug("No update for %s request with mymdc id %s: jobs pending", + action, mymdc_id) + return + + msg = "\n".join(running_jobs_info) + log.debug("Update MDC for %s, %s: %s", + action, mymdc_id, ', '.join(running_jobs_info) + ) + + if action == 'CORRECT': + self.mymdc_update_run(mymdc_id, msg) + else: # action == 'DARK' + self.mymdc_update_dark(mymdc_id, msg) + + def process_execution_finished(self, exec_id): + """Send notification that one execution has finished""" + statuses = [r.status for r in self.job_db.execute( + "SELECT status FROM slurm_jobs WHERE exec_id = ?", (exec_id,) + ).fetchall()] + success = set(statuses) == {'COMPLETED'} + r = self.job_db.execute( + "SELECT det_type, karabo_id, req_id, proposal, run, action " + "FROM executions JOIN requests USING (req_id)" + "WHERE exec_id = ?", + (exec_id,) + ).fetchone() + with self.job_db: + self.job_db.execute( + "UPDATE executions SET success = ? WHERE exec_id = ?", + (success, exec_id) + ) + log.info("Execution finished: %s for (p%s, r%s, %s), success=%s", + r.action, r.proposal, r.run, r.karabo_id, success) + + if r.action == 'CORRECT': + try: + self.kafka_prod.send(self.kafka_topic, { + 'event': 'correction_complete', + 'proposal': r.proposal, + 'run': r.run, + 'detector': r.det_type, + 'karabo_id': r.karabo_id, + 'success': success, + }) + except KafkaError: + log.warning("Error sending Kafka notification", + exc_info=True) + + return r.req_id + + def process_request_finished(self, req_id): + """Send notifications that a request has finished""" + exec_successes = {r.success for r in self.job_db.execute( + "SELECT success FROM executions WHERE req_id = ?", (req_id,) + ).fetchall()} + success = (exec_successes == {1}) + + r = self.job_db.execute( + "SELECT * FROM requests WHERE req_id = ?", (req_id,) + ).fetchone() + log.info( + "Jobs finished - action: %s, myMdC id: %s, success: %s", + r.action, r.mymdc_id, success, + ) + if r.action == 'CORRECT': + try: + self.kafka_prod.send(self.kafka_topic, { + 'event': 'run_corrections_complete', + 'proposal': r.proposal, + 'run': r.run, + 'success': success, + }) + except KafkaError: + log.warning("Error sending Kafka notification", + exc_info=True) + + if success: + msg = "Calibration jobs succeeded" + else: + # Count failed jobs + job_statuses = [r.status for r in self.job_db.execute( + "SELECT slurm_jobs.status FROM slurm_jobs " + "INNER JOIN executions USING (exec_id) " + "INNER JOIN requests USING (req_id) " + "WHERE req_id = ?", (req_id,) + ).fetchall()] + n_failed = sum(s != 'COMPLETED' for s in job_statuses) + msg = f"{n_failed}/{len(job_statuses)} calibration jobs failed" + + log.debug("Update MDC for %s, %s: %s", r.action, r.mymdc_id, msg) + + if r.action == 'CORRECT': + status = 'A' if success else 'NA' # Not-/Available + self.mymdc_update_run(r.mymdc_id, msg, status) + else: # r.action == 'DARK' + status = 'F' if success else 'E' # Finished/Error + self.mymdc_update_dark(r.mymdc_id, msg, status) + + def mymdc_update_run(self, run_id, msg, status='R'): + data = {'flg_cal_data_status': status, + 'cal_pipeline_reply': msg} + if status != 'R': + data['cal_last_end_at'] = datetime.now(tz=timezone.utc).isoformat() + response = self.mdc.update_run_api(run_id, data) + if response.status_code != 200: + log.error("Failed to update MDC run id %s", run_id) + log.error(Errors.MDC_RESPONSE.format(response)) + + def mymdc_update_dark(self, dark_run_id, msg, status='IP'): + data = {'dark_run': {'flg_status': status, + 'calcat_feedback': msg}} + response = self.mdc.update_dark_run_api(dark_run_id, data) + + if response.status_code != 200: + log.error("Failed to update MDC dark run id %s", dark_run_id) + log.error(Errors.MDC_RESPONSE.format(response)) def main(argv=None): # Ensure files are opened as UTF-8 by default, regardless of environment. @@ -218,7 +319,7 @@ def main(argv=None): level=getattr(logging, args.log_level), format=fmt ) - update_job_db(config) + JobsMonitor(config).run() if __name__ == "__main__": diff --git a/webservice/webservice.py b/webservice/webservice.py index 27407f6fdd2eccccdb63af1cee1d1154f380beaa..c9cf1a1d2a7014a4d56ec4a8e96a7c0dab1c24c3 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -45,10 +45,36 @@ def init_job_db(config): """ logging.info("Initializing database") conn = sqlite3.connect(config['web-service']['job-db']) - conn.execute( - "CREATE TABLE IF NOT EXISTS " - "jobs(rid, jobid, proposal, run, status, time, det, act)" - ) + conn.execute("PRAGMA foreign_keys = ON") + + conn.executescript(""" + CREATE TABLE IF NOT EXISTS requests( + req_id INTEGER PRIMARY KEY, + mymdc_id, + proposal, + run, + action, + timestamp + ); + CREATE TABLE IF NOT EXISTS executions( + exec_id INTEGER PRIMARY KEY, + req_id REFERENCES requests(req_id), + det_type, + karabo_id, + success + ); + CREATE INDEX IF NOT EXISTS exec_by_req ON executions(req_id); + CREATE TABLE IF NOT EXISTS slurm_jobs( + job_id INTEGER PRIMARY KEY, + exec_id REFERENCES executions(exec_id), + status, + finished, + elapsed + ); + CREATE INDEX IF NOT EXISTS job_by_exec ON slurm_jobs(exec_id); + CREATE INDEX IF NOT EXISTS job_by_finished ON slurm_jobs(finished); + """) + conn.row_factory = sqlite3.Row return conn @@ -227,37 +253,20 @@ async def run_proc_async(cmd: List[str]) -> Tuple[Optional[int], bytes, bytes]: def query_rid(conn, rid) -> bytes: - c = conn.cursor() - c.execute("SELECT * FROM jobs WHERE rid LIKE ?", (rid,)) - combined = {} + c = conn.execute( + "SELECT job_id, status FROM slurm_jobs " + "INNER JOIN executions USING (exec_id) " + "INNER JOIN requests USING (req_id) " + "WHERE mymdc_id = ?", (rid,) + ) + statuses = [] for r in c.fetchall(): - rid, jobid, proposal, run, status, time_, _ = r - logging.debug( - "Job {}, proposal {}, run {} has status {}".format(jobid, - proposal, - run, - status)) - cflg, cstatus = combined.get(rid, ([], [])) - if status in ['R', 'PD']: - flg = 'R' - elif status == 'NA': - flg = 'NA' - else: - flg = 'A' - - cflg.append(flg) - cstatus.append(status) - combined[rid] = cflg, cstatus + logging.debug("Job %s has status %s", r.job_id, r.status) + statuses.append(r.status) - flg_order = {"R": 2, "A": 1, "NA": 0} - msg = "" - for rid, value in combined.items(): - flgs, statii = value - flg = max(flgs, key=lambda i: flg_order[i]) - msg += "\n".join(statii) - if msg == "": - msg = 'NA' - return msg.encode() + if statuses: + return "\n".join(statuses).encode() + return b'NA' def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]: @@ -289,7 +298,7 @@ def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]: -async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: +async def run_action(job_db, cmd, mode, proposal, run, exec_id) -> str: """Run action command (CORRECT or DARK). :param job_db: jobs database @@ -298,7 +307,7 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: but the command will be logged :param proposal: proposal the command was issued for :param run: run the command was issued for - :param rid: run id in the MDC + :param exec_id: Execution ID in the local jobs database Returns a formatted Success or Error message indicating outcome of the execution. @@ -327,17 +336,11 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: jobs = [] for jobid in jobids.split(','): - jobs.append((rid, - jobid.strip(), - proposal, - run, - datetime.now().isoformat(), - cmd[3], - cmd[4]) - ) + jobs.append((int(jobid.strip()), exec_id)) c.executemany( - "INSERT INTO jobs VALUES (?, ?, ?, ?, 'PD', ?, ?, ?)", - jobs) + "INSERT INTO slurm_jobs VALUES (?, ?, 'PD', 0, 0)", + jobs + ) job_db.commit() else: # mode == "sim" @@ -821,6 +824,13 @@ class ActionsServer: proposal = self._normalise_proposal_num(proposal) pconf_full = self.load_proposal_config(cycle, proposal) + with self.job_db: + cur = self.job_db.execute( + "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)", + (rid, proposal, runnr, request_time) + ) + req_id = cur.lastrowid + _orca_passthrough( proposal_number=proposal, runs=[runnr], @@ -906,7 +916,7 @@ class ActionsServer: return ret, _ = await self.launch_jobs( - [runnr], rid, detectors, 'correct', instrument, cycle, proposal, + [runnr], req_id, detectors, 'correct', instrument, cycle, proposal, request_time, ) await update_mdc_status(self.mdc, 'correct', rid, ret) @@ -958,6 +968,13 @@ class ActionsServer: karabo_id=karabo_id, ) + with self.job_db: + cur = self.job_db.execute( + "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'DARK', ?)", + (rid, proposal, wait_runs[-1], request_time) + ) + req_id = cur.lastrowid + pconf_full = self.load_proposal_config(cycle, proposal) data_conf = pconf_full['data-mapping'] @@ -1039,7 +1056,7 @@ class ActionsServer: detectors = {karabo_id: thisconf} ret, report_path = await self.launch_jobs( - runs, rid, detectors, 'dark', instrument, cycle, proposal, + runs, req_id, detectors, 'dark', instrument, cycle, proposal, request_time ) await update_mdc_status(self.mdc, 'dark_request', rid, ret) @@ -1127,7 +1144,7 @@ class ActionsServer: return yaml.load(f.read(), Loader=yaml.FullLoader) async def launch_jobs( - self, run_nrs, rid, detectors, action, instrument, cycle, proposal, + self, run_nrs, req_id, detectors, action, instrument, cycle, proposal, request_time ) -> Tuple[str, List[str]]: report = [] @@ -1155,11 +1172,18 @@ class ActionsServer: request_time=request_time ).split() + with self.job_db: + cur = self.job_db.execute( + "INSERT INTO executions VALUES (NULL, ?, ?, ?, NULL)", + (req_id, detector, karabo_id) + ) + exec_id = cur.lastrowid + cmd = parse_config(cmd, dconfig) # TODO: Add detector info in returned run action status. ret.append(await run_action( self.job_db, cmd, self.mode, - proposal, run_nrs[-1], rid + proposal, run_nrs[-1], exec_id )) if '--report-to' in cmd[:-1]: report_idx = cmd.index('--report-to') + 1