diff --git a/bin/slurm_calibrate.sh b/bin/slurm_calibrate.sh index a600fa7eaf2e97da42144b7c9a213f66cb9cadc3..5786f7ce5737ed6a3be7a2d33658a1e359d5d331 100755 --- a/bin/slurm_calibrate.sh +++ b/bin/slurm_calibrate.sh @@ -22,6 +22,7 @@ echo "detector: $detector" echo "caltype: $caltype" echo "cluster_cores: $cluster_cores" echo "job ID: ${SLURM_JOB_ID:-none}" +echo "hostname: $(hostname)" export CAL_NOTEBOOK_NAME="$notebook" diff --git a/bin/slurm_finalize.sh b/bin/slurm_finalize.sh index 63a5f496df2e2e027a9c7587cc52dd97b768f48c..7524d2b19a7718effeab2243e542fb20f227e1cd 100755 --- a/bin/slurm_finalize.sh +++ b/bin/slurm_finalize.sh @@ -13,6 +13,7 @@ echo "Python path: $python_path" echo "Correction temp dir: $temp_dir" echo "finalize script: $finalize_script" echo "job ID: ${SLURM_JOB_ID:-none}" +echo "hostname: $(hostname)" # set-up enviroment source /etc/profile.d/modules.sh diff --git a/webservice/config/serve_overview.yaml b/webservice/config/serve_overview.yaml index 535c04ba680f639313af73ffdbd3b8078e3e1fbf..2da8f8cbc71c7b0a5328ef7ba2ccce12c366b9c3 100644 --- a/webservice/config/serve_overview.yaml +++ b/webservice/config/serve_overview.yaml @@ -10,7 +10,6 @@ templates: shell-commands: total-jobs: "sinfo -p exfel -o %A --noheader" - job-nodes: "squeue --me -o %i,%N --noheader" tail-log: "tail -5000 web.log" tail-log-monitor: "tail -5000 monitor.log" diff --git a/webservice/job_monitor.py b/webservice/job_monitor.py index b9a6b5fcbce62a4e4b3b862368b51a486bc1b528..ef521e6da67b3c3b9d3ef8347989362a5219b7da 100644 --- a/webservice/job_monitor.py +++ b/webservice/job_monitor.py @@ -30,6 +30,7 @@ log = logging.getLogger(__name__) STATES_FINISHED = { # https://slurm.schedmd.com/squeue.html#lbAG 'BOOT_FAIL', 'CANCELLED', 'COMPLETED', 'DEADLINE', 'FAILED', 'OUT_OF_MEMORY', 'SPECIAL_EXIT', 'TIMEOUT', + 'NA', # Unknown (used internally if job ID missing) } STATE_ABBREVS = { 'PENDING': 'PD', @@ -67,7 +68,7 @@ def slurm_status(filter_user=True): :return: a dictionary indexed by slurm jobid and containing a tuple of (status, run time) as values. """ - cmd = ["squeue", "--states=all", "--format=%i %T %M"] + cmd = ["squeue", "--states=all", "--format=%i %T %M %N"] if filter_user: cmd += ["--me"] res = run(cmd, stdout=PIPE, stderr=PIPE) @@ -76,9 +77,9 @@ def slurm_status(filter_user=True): statii = {} for r in rlines[1:]: try: - jobid, status, runtime = r.split() + jobid, status, runtime, hostname = r.split() jobid = jobid.strip() - statii[jobid] = status, runtime + statii[jobid] = status, runtime, hostname except ValueError: # not enough values to unpack in split pass return statii @@ -170,7 +171,7 @@ class JobsMonitor: Newly completed executions are present with an empty list. """ jobs_to_check = self.job_db.execute( - "SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0" + "SELECT job_id, exec_id, hostname FROM slurm_jobs WHERE finished = 0" ).fetchall() if not jobs_to_check: log.debug("No unfinished jobs to check") @@ -189,22 +190,28 @@ class JobsMonitor: execn_ongoing_jobs = ongoing_jobs_by_exn.setdefault(r['exec_id'], []) if str(r['job_id']) in statii: - # statii contains jobs which are still going (from squeue) - slstatus, runtime = statii[str(r['job_id'])] + # Jobs which are pending, running, or recently finished (from squeue) + # Jobs stay for >= 150s after finishing, so we should always see them. + slstatus, runtime, hostname = statii[str(r['job_id'])] else: - # These jobs have finished (successfully or otherwise) + # Fallback: get job info from sacct _, runtime, slstatus = slurm_job_status(r['job_id']) + # We *don't* take hostname from sacct; with some GPFS issues + # the job may not get recorded, and we don't want to overwrite + # a hostname we previously got from squeue. + hostname = r['hostname'] finished = slstatus in STATES_FINISHED if not finished: short_state = STATE_ABBREVS.get(slstatus, slstatus) execn_ongoing_jobs.append(f"{short_state}-{runtime}") - updates.append((finished, runtime, slstatus, r['job_id'])) + updates.append((finished, runtime, slstatus, hostname, r['job_id'])) with time_db_transaction(self.job_db, 'Update jobs'): self.job_db.executemany( - "UPDATE slurm_jobs SET finished=?, elapsed=?, status=? WHERE job_id = ?", + "UPDATE slurm_jobs SET " + "finished=?, elapsed=?, status=?, hostname=? WHERE job_id = ?", updates ) diff --git a/webservice/serve_overview.py b/webservice/serve_overview.py index 3ce5f3da8664624012a5a9d8423118bce70c9767..7053f6273adecb78df9f90bdc1cebb0605b94f87 100644 --- a/webservice/serve_overview.py +++ b/webservice/serve_overview.py @@ -39,7 +39,6 @@ class RequestHandler(BaseHTTPRequestHandler): def init_config(self): self.total_jobs_cmd = config["shell-commands"]["total-jobs"] self.tail_log_cmd = config["shell-commands"]["tail-log"] - self.job_nodes_cmd = config["shell-commands"]["job-nodes"] self.run_candidates = config["run-candidates"] self.templates = {} @@ -186,7 +185,7 @@ class RequestHandler(BaseHTTPRequestHandler): ) c = self.jobs_db.execute( - "SELECT job_id, status, elapsed, det_type, proposal, run, action " + "SELECT job_id, status, elapsed, hostname, det_type, proposal, run, action " "FROM slurm_jobs INNER JOIN executions USING (exec_id) " "INNER JOIN requests USING (req_id) " "WHERE finished = 0" @@ -195,14 +194,7 @@ class RequestHandler(BaseHTTPRequestHandler): now = datetime.now() running_jobs = {} - # Mapping of job ID (str) to node for running jobs. - job_nodes = dict([ - x.split(",") for x in check_output( - self.job_nodes_cmd, shell=True).decode("utf-8").split("\n") - if x - ]) - - for job_id, status, elapsed, det, proposal, run, act in c: + for job_id, status, elapsed, job_node, det, proposal, run, act in c: key = f'{proposal}/r{int(run):04d}/{det}/{act}' flg = "Q" if status in {"QUEUE", "PENDING"} else "R" rjobs = running_jobs.setdefault(key, []) @@ -211,7 +203,7 @@ class RequestHandler(BaseHTTPRequestHandler): flg, f'{status[0]}-{elapsed}', datetime_to_grafana(now - elapsed_to_timedelta(elapsed)), - job_nodes.get(str(job_id), '') + job_node )) tmpl = self.templates["running-jobs"] diff --git a/webservice/webservice.py b/webservice/webservice.py index 54e94ddd0810af7bbeb8e862f910f2f044b87706..0766b82cfe47a3f033521a21ff99330dcd8d18ef 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -90,7 +90,8 @@ def init_job_db(config): exec_id REFERENCES executions(exec_id), status, finished, - elapsed + elapsed, + hostname ); CREATE INDEX IF NOT EXISTS job_by_exec ON slurm_jobs(exec_id); CREATE INDEX IF NOT EXISTS job_by_finished ON slurm_jobs(finished); @@ -488,7 +489,7 @@ async def run_action(job_db, cmd, mode, proposal, run, exec_id) -> str: jobs.append((int(jobid.strip()), exec_id)) with time_db_transaction(job_db, 'Insert jobs'): job_db.executemany( - "INSERT INTO slurm_jobs VALUES (?, ?, 'PENDING', 0, '0')", + "INSERT INTO slurm_jobs VALUES (?, ?, 'PENDING', 0, '0', '')", jobs )