Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
"""Monitor calibration jobs in Slurm and send status updates"""
import argparse
import json
import locale
import logging
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from subprocess import run, PIPE
from kafka import KafkaProducer
from kafka.errors import KafkaError
try:
from .config import webservice as config
from .messages import MDC, Errors, MigrationError, Success
from .webservice import init_job_db, init_md_client
except ImportError:
from config import webservice as config
from messages import MDC, Errors, MigrationError, Success
from webservice import init_job_db, init_md_client
log = logging.getLogger(__name__)
class NoOpProducer:
"""Fills in for Kafka producer object when setting that up fails"""
def send(self, topic, value):
pass
def init_kafka_producer(config):
try:
return KafkaProducer(
bootstrap_servers=config['kafka']['brokers'],
value_serializer=lambda d: json.dumps(d).encode('utf-8'),
max_block_ms=2000, # Don't get stuck trying to send Kafka messages
)
except KafkaError:
logging.warning("Problem initialising Kafka producer; "
"Kafka notifications will not be sent.", exc_info=True)
return NoOpProducer()
def slurm_status(filter_user=True):
""" Return the status of slurm jobs by calling squeue
:param filter_user: set to true to filter ony jobs from current user
:return: a dictionary indexed by slurm jobid and containing a tuple
of (status, run time) as values.
"""
cmd = ["squeue"]
if filter_user:
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
res = run(cmd, stdout=PIPE)
if res.returncode == 0:
rlines = res.stdout.decode().split("\n")
statii = {}
for r in rlines[1:]:
try:
jobid, _, _, _, status, runtime, _, _ = r.split()
jobid = jobid.strip()
statii[jobid] = status, runtime
except ValueError: # not enough values to unpack in split
pass
return statii
def slurm_job_status(jobid):
""" Return the status of slurm job
:param jobid: Slurm job Id
:return: Slurm state, Elapsed.
"""
cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"]
res = run(cmd, stdout=PIPE)
if res.returncode == 0:
rlines = res.stdout.decode().split("\n")
logging.debug("Job {} state {}".format(jobid, rlines[2].split()))
if len(rlines[2].split()) == 3:
return rlines[2].replace("+", "").split()
return "NA", "NA", "NA"
def update_job_db(config):
""" Update the job database and send out updates to MDC
:param config: configuration parsed from webservice YAML
"""
log.info("Starting config db handling")
conn = init_job_db(config)
mdc = init_md_client(config)
kafka_prod = init_kafka_producer(config)
kafka_topic = config['kafka']['topic']
time_interval = int(config['web-service']['job-update-interval'])
while True:
statii = slurm_status()
# Check that slurm is giving proper feedback
if statii is None:
time.sleep(time_interval)
continue
try:
c = conn.cursor()
c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ")
combined = {}
log.debug("SLURM info {}".format(statii))
for r in c.fetchall():
rid, jobid, proposal, run, status, _time, det, action = r
log.debug("DB info {}".format(r))
cflg, cstatus, *_ = combined.setdefault((rid, action), (
[], [], proposal, run, det
))
if jobid in statii:
slstatus, runtime = statii[jobid]
query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?"
c.execute(query, (slstatus, runtime, jobid))
cflg.append('R')
cstatus.append("{}-{}".format(slstatus, runtime))
else:
_, sltime, slstatus = slurm_job_status(jobid)
query = "UPDATE jobs SET status=? WHERE jobid LIKE ?"
c.execute(query, (slstatus, jobid))
if slstatus == 'COMPLETED':
cflg.append("A")
else:
cflg.append("NA")
cstatus.append(slstatus)
conn.commit()
flg_order = {"R": 2, "A": 1, "NA": 0}
dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'}
for rid, action in combined:
if int(rid) == 0: # this job was not submitted from MyMDC
continue
flgs, statii, proposal, run, det = combined[rid, action]
# sort by least done status
flg = max(flgs, key=lambda i: flg_order[i])
if flg != 'R':
log.info(
"Jobs finished - action: %s, run id: %s, status: %s",
action, rid, flg,
)
if action == 'CORRECT':
try:
kafka_prod.send(kafka_topic, {
'event': 'correction_complete',
'proposal': proposal,
'run': run,
'detector': det,
'success': (flg == 'A'), # A for Available
})
except KafkaError:
log.warning("Error sending Kafka notification",
exc_info=True)
if all(s.startswith('PD-') for s in statii):
# Avoid swamping myMdC with updates for jobs still pending.
log.debug(
"No update for action %s, rid %s: jobs pending",
action, rid
)
continue
msg = "\n".join(statii)
msg_debug = f"Update MDC {rid}, {msg}"
log.debug(msg_debug.replace('\n', ', '))
if action == 'CORRECT':
data = {'flg_cal_data_status': flg,
'cal_pipeline_reply': msg}
if flg != 'R':
data['cal_last_end_at'] = datetime.now(tz=timezone.utc).isoformat()
response = mdc.update_run_api(rid, data)
else: # action == 'DARK' but it's dark_request
data = {'dark_run': {'flg_status': dark_flags[flg],
'calcat_feedback': msg}}
response = mdc.update_dark_run_api(rid, data)
if response.status_code != 200:
log.error("Failed to update MDC for action %s, rid %s",
action, rid)
log.error(Errors.MDC_RESPONSE.format(response))
except Exception:
log.error("Failure to update job DB", exc_info=True)
time.sleep(time_interval)
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
# Ensure files are opened as UTF-8 by default, regardless of environment.
locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8'))
parser = argparse.ArgumentParser(
description='Start the calibration webservice'
)
parser.add_argument('--config-file', type=str, default=None)
parser.add_argument('--log-file', type=str, default='./monitor.log')
parser.add_argument(
'--log-level', type=str, default="INFO", choices=['INFO', 'DEBUG', 'ERROR'] # noqa
)
args = parser.parse_args(argv)
if args.config_file is not None:
config.configure(includes_for_dynaconf=[Path(args.config_file).absolute()])
fmt = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] %(message)s' # noqa
logging.basicConfig(
filename=args.log_file,
level=getattr(logging, args.log_level),
format=fmt
)
update_job_db(config)
if __name__ == "__main__":
main()