diff --git a/webservice/serve_overview.py b/webservice/serve_overview.py index 0b3da01db26254209fe4412eb2673910be8dcd12..de67c0c1964e2927c1f6532912392507bad23ded 100644 --- a/webservice/serve_overview.py +++ b/webservice/serve_overview.py @@ -399,17 +399,18 @@ class RequestHandler(BaseHTTPRequestHandler): port=port) conn = sqlite3.connect(config['web-service']['job-db']).cursor() - conn.execute("SELECT * FROM jobs") + conn.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG')") running_jobs = {} for r in conn.fetchall(): - rid, jobid, proposal, run, flg, status = r + rid, jobid, proposal, run, status, time, det, act = r run = int(run) - if "QUEUE" in status or "PD" in status: + key = '{}/r{:04d}/{}/{}'.format(proposal, run, det, act) + flg = "R" + if status in ["QUEUE", "PD"]: flg = "Q" - rjobs = running_jobs.get("r{:04d}".format(run), - {'proposal': proposal, 'statii': []}) - rjobs["statii"].append((flg, status)) - running_jobs["r{:04d}".format(run)] = rjobs + rjobs = running_jobs.get(key, []) + rjobs.append((flg, '{}-{}'.format(status, time))) + running_jobs[key] = rjobs tmpl = self.templates["running-jobs"] running_jobs_r = Template(tmpl).render(running_jobs=running_jobs) diff --git a/webservice/templates/running_jobs.html b/webservice/templates/running_jobs.html index 485d882fbbe5038fe9145611e9562ce1728ad2a0..3bec3d63c3cbce2a6f6efc435e8d596fa68df719 100644 --- a/webservice/templates/running_jobs.html +++ b/webservice/templates/running_jobs.html @@ -2,9 +2,9 @@ <h2>Running calibration jobs</h2> <dl> {% for run, data in running_jobs.items() %} - <dt>{{ data['proposal'] }}/{{ run }}</dt> + <dt>{{ run }}</dt> <dd> - {% for status in data['statii'] %} + {% for status in data %} <span class="status, {{ status[0] }}">{{ status[1] }}</span> {% endfor %} </dd> diff --git a/webservice/webservice.py b/webservice/webservice.py index d20cdadb240b367494fdc234d9f58e79e0057601..d1d5eb98afdac1d1be92c58adc53b632f19eb925 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -39,7 +39,7 @@ async def init_job_db(config): c.execute("SELECT * FROM jobs") except: logging.info("Creating initial job database") - c.execute("CREATE TABLE jobs(rid, id, proposal, run, flg, status)") + c.execute("CREATE TABLE jobs(rid, jobid, proposal, run, status, time, det, act)") # noqa return conn @@ -212,30 +212,55 @@ async def slurm_status(filter_user=True): return None +async def slurm_job_status(jobid): + """ Return the status of slurm job + + :param jobid: Slurm job Id + :return: Slurm state, Elapsed. + """ + cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"] + + ret = subprocess.run(cmd, stdout=subprocess.PIPE) + if ret.returncode == 0: + rlines = ret.stdout.decode().split("\n") + + logging.debug("Job {} state {}".format(jobid, rlines[2].split())) + if len(rlines[2].split()) == 3: + return rlines[2].replace("+", "").split() + return "NA", "NA", "NA" + + async def query_rid(conn, socket, rid): c = conn.cursor() c.execute("SELECT * FROM jobs WHERE rid LIKE '{}'".format(rid)) combined = {} for r in c.fetchall(): - rid, jobid, proposal, run, flg, status = r + rid, jobid, proposal, run, status, time_, _ = r logging.debug( "Job {}, proposal {}, run {} has status {}".format(jobid, proposal, run, status)) cflg, cstatus = combined.get(rid, ([], [])) + if status in ['R', 'PD']: + flg = 'R' + elif status == 'NA': + flg = 'NA' + else: + flg = 'A' + cflg.append(flg) cstatus.append(status) combined[rid] = cflg, cstatus - flg_order = {"R": 2, "A": 1, "NA": 0} + flg_order = {"R": 2, "A": 1, "NA": 0} msg = "" for rid, value in combined.items(): flgs, statii = value flg = max(flgs, key=lambda i: flg_order[i]) msg += "\n".join(statii) if msg == "": - msg = "DONE" + msg = 'NA' socket.send(msg.encode()) @@ -266,52 +291,46 @@ async def update_job_db(config): while True: statii = await slurm_status() c = conn.cursor() - c.execute("SELECT * FROM jobs") + c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ") combined = {} + logging.debug("SLURM info {}".format(statii)) + for r in c.fetchall(): + rid, jobid, proposal, run, status, time, _, _ = r + logging.debug("DB info {}".format(r)) - rid, jobid, proposal, run, flg, status = r + cflg, cstatus = combined.get(rid, ([], [])) if jobid in statii: slstatus, runtime = statii[jobid] - query = "UPDATE jobs SET status='{status} - {runtime}' WHERE id LIKE '{jobid}'" # noqa + query = "UPDATE jobs SET status='{status}', time='{runtime}' WHERE jobid LIKE '{jobid}'" # noqa c.execute(query.format(status=slstatus, runtime=runtime, jobid=jobid)) - elif not "QUEUED" in status: - c.execute("DELETE FROM jobs WHERE id LIKE '{jobid}'".format( - jobid=jobid)) - cflg, cstatus = combined.get(rid, ([], [])) - cflg.append("A") - cstatus.append('DONE') - combined[rid] = cflg, cstatus + + cflg.append('R') + cstatus.append("{}-{}".format(slstatus, runtime)) else: - # check for timed out jobs - _, start_time = status.split(": ") - dt = datetime.now() - timeparser.parse(start_time) - if dt.total_seconds() > config['web-service']['job-timeout']: - c.execute( - "DELETE FROM jobs WHERE id LIKE '{jobid}'".format( - jobid=jobid)) - cflg, cstatus = combined.get(rid, ([], [])) - cflg.append("R") - cstatus.append('PENDING SLURM SCHEDULING / TIMED OUT?') - combined[rid] = cflg, cstatus - conn.commit() - c.execute("SELECT * FROM jobs") - for r in c.fetchall(): - rid, jobid, proposal, run, flg, status = r - cflg, cstatus = combined.get(rid, ([], [])) - cflg.append(flg) - cstatus.append(status) + _, sltime, slstatus = await slurm_job_status(jobid) + query = "UPDATE jobs SET status='{slstatus}' WHERE jobid LIKE '{jobid}'" # noqa + c.execute(query.format(jobid=jobid, slstatus=slstatus)) + + if slstatus == 'COMPLETED': + cflg.append("A") + else: + cflg.append("NA") + cstatus.append(slstatus) combined[rid] = cflg, cstatus - flg_order = {"R": 2, "A": 1, "NA": 0} + conn.commit() + flg_order = {"R": 2, "A": 1, "NA": 0} for rid, value in combined.items(): - if int(rid)==0: + if int(rid) == 0: continue flgs, statii = value flg = max(flgs, key=lambda i: flg_order[i]) msg = "\n".join(statii) + logging.debug("Update MDC {}, {}".format(rid, + msg.replace('\n', ', '))) response = mdc.update_run_api(rid, {'flg_cal_data_status': flg, 'cal_pipeline_reply': msg}) if response.status_code != 200: @@ -360,13 +379,14 @@ async def run_correction(conn, cmd, mode, proposal, run, rid): # enter jobs in job db c = conn.cursor() rstr = ret.stdout.decode() - query = "INSERT INTO jobs VALUES ('{rid}', '{jobid}', '{proposal}', '{run}', 'R', 'QUEUED: {now}')" # noqa + query = "INSERT INTO jobs VALUES ('{rid}', '{jobid}', '{proposal}', '{run}', 'PD', '{now}', '{det}', '{act}')" # noqa for r in rstr.split("\n"): if "Submitted job:" in r: _, jobid = r.split(":") c.execute(query.format(rid=rid, jobid=jobid.strip(), proposal=proposal, run=run, - now=datetime.now().isoformat())) + now=datetime.now().isoformat(), + det=cmd[3], act=cmd[4])) conn.commit() logging.debug(" ".join(cmd)) if "DARK" in cmd: @@ -508,6 +528,9 @@ async def server_runner(config, mode): priority) = payload runnr = runnr.replace("r", "") wait_runs = [runnr] + msg = "Correction of run {} at {}({}) " \ + "is requested. Checking files..." + logging.info(msg.format(runnr, instrument, proposal)) if action == 'dark': (rid, sase, instrument, cycle, proposal, det_list) = payload[:6]