Skip to content
Snippets Groups Projects
Commit 4f8bce30 authored by Danilo Ferreira de Lima's avatar Danilo Ferreira de Lima
Browse files

Merged output parser.

parent 42a7ce6e
No related branches found
No related tags found
2 merge requests!59Interface CrystFEL with Karabo and allow automatic parameter tunning with rcrystfel,!53Train picker arbiter kernel
......@@ -22,6 +22,7 @@ from karabo.bound import (
FLOAT_ELEMENT,
VECTOR_STRING_ELEMENT,
VECTOR_FLOAT_ELEMENT,
NDARRAY_ELEMENT,
Hash,
PythonDevice,
State,
......@@ -137,48 +138,38 @@ class CrystfelRunner(PythonDevice):
.initialValue(0)
.commit(),
NODE_ELEMENT(expected)
.key("crystfelDetailedStats")
.commit(),
UINT64_ELEMENT(expected)
.key("crystfelDetailedStats.nHits")
.key("crystfelStats.reflections")
.readOnly()
.initialValue(0)
.commit(),
UINT64_ELEMENT(expected)
.key("crystfelDetailedStats.nCrystals")
.readOnly()
.initialValue(0)
.commit(),
UINT64_ELEMENT(expected)
.key("crystfelDetailedStats.nReflections")
FLOAT_ELEMENT(expected)
.key("crystfelStats.hitRate")
.readOnly()
.initialValue(0)
.commit(),
FLOAT_ELEMENT(expected)
.key("crystfelDetailedStats.hitRate")
.key("crystfelStats.indexRate")
.readOnly()
.initialValue(0)
.commit(),
FLOAT_ELEMENT(expected)
.key("crystfelDetailedStats.indexRate")
VECTOR_FLOAT_ELEMENT(expected)
.key("crystfelStats.detCentre")
.readOnly()
.initialValue(0)
.commit(),
VECTOR_FLOAT_ELEMENT(expected)
.key("crystfelDetailedStats.detCentre")
.key("crystfelStats.detCentreUnc")
.readOnly()
.initialValue(0)
.commit(),
VECTOR_FLOAT_ELEMENT(expected)
.key("crystfelDetailedStats.detCentreUnc")
NDARRAY_ELEMENT(expected)
.key("crystfelStats.cell")
.readOnly()
.initialValue(0)
.commit(),
......@@ -477,13 +468,24 @@ class CrystfelRunner(PythonDevice):
self.get("crystfelArgs.crystfelBinPath") +
":" + env.get("PATH", "")
)
# we must have a stream output!
# if empty, set it to something and delete after we are done reading it
streamOutputPath = self.get("crystfelArgs.streamOutputPath")
deleteOutput = False
if len(streamOutputPath) == 0:
# TODO: Is /tmp ok?
# the goal is only to prevent
user = env["USER"]
name = self.get("deviceId").replace("/", "_")
streamOutputPath = f"/tmp/{user}/out_{name}.stream"
deleteOutput = True
args = [
"indexamajig",
f"--zmq-input=tcp://localhost:{self.get('crystfelArgs.zmqPort')}",
"--zmq-request=next",
"--data-format=msgpack",
"-o",
self.get("crystfelArgs.streamOutputPath"),
streamOutputPath,
"-g",
self.get("crystfelArgs.geometryPath"),
"--no-mask-data",
......@@ -498,6 +500,7 @@ class CrystfelRunner(PythonDevice):
args.extend(reparse_fix_comma_mess(self.get("crystfelArgs.misc")))
self.set("crystfelArgs.commandline", " ".join(args))
# TODO: cd somewhere for the indexamajig folder?
self.log.INFO("Spawning indexamajig")
self._crystfel_proc = subprocess.Popen(
......@@ -510,34 +513,9 @@ class CrystfelRunner(PythonDevice):
env=env,
)
def handle_stderr():
stderr_re = re.compile(
r"(?P<images>\d+) images processed, (?P<hits>\d+) hits"
r".*? (?P<indexed>\d+) indexable"
r".*? (?P<crystals>\d+) crystals"
)
for line in self._crystfel_proc.stderr:
if (match := stderr_re.match(line)) is None:
print(line)
else:
self.set(
"crystfelStats",
Hash(*itertools.chain.from_iterable(match.groupdict().items()))
)
#self.log.INFO(f"crystfel stderr: {line.rstrip()}")
def handle_stdout():
for line in self._crystfel_proc.stdout:
self.log.INFO(f"crystfel stdout: {line.rstrip()}")
threading.Thread(target=handle_stderr, daemon=True).start()
threading.Thread(target=handle_stdout, daemon=True).start()
time.sleep(1)
# TODO: do something with stream file
# do something with stream file
self._tail_proc = subprocess.Popen(
["tail", "-F", "-n", "+0", self.get("crystfelArgs.streamOutputPath")],
["tail", "-F", "-n", "+0", streamOutputPath],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
......@@ -545,87 +523,81 @@ class CrystfelRunner(PythonDevice):
bufsize=1,
)
def handle_tail():
do_print = False
buffered = []
for line in self._tail_proc.stdout:
if line.strip() == "----- Begin chunk -----":
do_print = True
continue
elif line.strip() == "----- End chunk -----":
do_print = False
if buffered:
self.log.INFO(f"From stream file:\n{buffered}")
buffered = []
continue
if not do_print:
continue
if any(
line.startswith(prefix)
for prefix in (
"hit", "indexed_by", "n_indexing_tries", "num_peaks"
)
):
buffered.append(line)
threading.Thread(target=handle_tail, daemon=True).start()
def collect_details():
"""This goes deeper in the output file structure to collect more information.
It is probably not often essential."""
n_data = self["crystfelStats.images"]
results = dict()
reading_chunk = False
n_reflections = list()
cell = list()
det_centre = list()
for line in self._tail_proc.stdout:
if reading_chunk:
if line.startswith('----- End chunk -----'):
reading_chunk = False
elif line.startswith('num_reflections'):
n_reflections[-1] = int(line.split(" = ")[1])
if "Cell parameters" in line:
# Cell parameters 7.96703 7.95613 3.81936 nm, \
# 90.15754 90.52371 90.09970 deg
lsplit = line.split()
a, b, c = [float(i) for i in lsplit[2:5]]
alpha, beta, gamma = [float(i) for i in lsplit[6:9]]
cell.append([a, b, c, alpha, beta, gamma])
if "predict_refine/det_shift" in line:
# predict_refine/det_shift x = -0.013 y = -0.113 mm
lsplit = line.split()
det_x = float(lsplit[-5])*1e-3
det_y = float(lsplit[-2])*1e-3
det_centre += [np.array([det_x, det_y])]
elif line.startswith('----- Begin chunk -----'):
reading_chunk = True
n_reflections.append(0)
n_reflections = np.array(n_reflections)
results['nHits'] = len(n_reflections)
results['nCrystals'] = len(np.where(n_reflections > 0)[0])
results['nReflections'] = np.sum(n_reflections)
if n_data <= 0:
results['hitRate'] = 0.0
results['indexRate'] = 0.0
else:
results['hitRate'] = results['n_hits'] / n_data
results['indexRate'] = results['n_crystals'] / n_data
if len(det_centre) != 0:
det_centre = np.stack(det_centre, axis=0)
results['detCentre'] = np.mean(det_centre, axis=0).tolist()
results['detCentreUnc'] = np.std(det_centre, axis=0).tolist()
else:
results['detCentre'] = [0.0, 0.0]
results['detCentreUnc'] = [0.0, 0.0]
try:
# handle output stream
reading_chunk = False
n_reflections = list()
cell = list()
det_centre = list()
for line in self._tail_proc.stdout:
if reading_chunk:
if line.startswith('----- End chunk -----'):
reading_chunk = False
elif line.startswith('num_reflections'):
n_reflections[-1] = int(line.split(" = ")[1])
if "Cell parameters" in line:
# Cell parameters 7.96703 7.95613 3.81936 nm, \
# 90.15754 90.52371 90.09970 deg
lsplit = line.split()
a, b, c = [float(i) for i in lsplit[2:5]]
alpha, beta, gamma = [float(i) for i in lsplit[6:9]]
cell.append(np.array([a, b, c, alpha, beta, gamma]))
if "predict_refine/det_shift" in line:
# predict_refine/det_shift x = -0.013 y = -0.113 mm
lsplit = line.split()
det_x = float(lsplit[-5])*1e-3
det_y = float(lsplit[-2])*1e-3
det_centre += [np.array([det_x, det_y])]
elif line.startswith('----- Begin chunk -----'):
reading_chunk = True
n_reflections.append(0)
n_reflections = np.array(n_reflections)
results['reflections'] = np.sum(n_reflections)
results['cell'] = np.stack(cell, axis=0)
# handle std. error output
stderr_re = re.compile(
r"(?P<images>\d+) images processed, (?P<hits>\d+) hits"
r".*? (?P<indexed>\d+) indexable"
r".*? (?P<crystals>\d+) crystals"
)
for line in self._crystfel_proc.stderr:
if (match := stderr_re.match(line)) is not None:
results.update(match.groupdict())
n_data = results["images"]
if n_data <= 0:
results['hitRate'] = 0.0
results['indexRate'] = 0.0
else:
results['hitRate'] = results['hits'] / n_data
results['indexRate'] = results['crystals'] / n_data
for k, v in results.items():
if len(det_centre) != 0:
det_centre = np.stack(det_centre, axis=0)
results['detCentre'] = np.mean(det_centre, axis=0).tolist()
results['detCentreUnc'] = np.std(det_centre, axis=0).tolist()
else:
results['detCentre'] = [0.0, 0.0]
results['detCentreUnc'] = [0.0, 0.0]
finally:
# if an exception happens, output what we gathered so far
self.set(
f"crystfelDetailedStats.{k}", v
f"crystfelStats", Hash(*itertools.chain.from_iterable(results.items()))
)
# ... and elete output stream file if requested:
if deleteOutput:
os.remove(streamOutputPath)
threading.Thread(target=collect_details, daemon=True).start()
utils.add_unsafe_get(CrystfelRunner)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment