diff --git a/src/calng/CrystfelRunner.py b/src/calng/CrystfelRunner.py index fe7be1876bd1aef847fd0ed09dee65ab3aea92a1..78b5a64db14ffc48f7526d02d77072fe39d35ad9 100644 --- a/src/calng/CrystfelRunner.py +++ b/src/calng/CrystfelRunner.py @@ -17,7 +17,9 @@ from karabo.bound import ( OVERWRITE_ELEMENT, SLOT_ELEMENT, STRING_ELEMENT, + UINT64_ELEMENT, VECTOR_STRING_ELEMENT, + Hash, PythonDevice, State, Unit, @@ -80,23 +82,42 @@ class CrystfelRunner(PythonDevice): .setNewDefaultValue(State.INIT) .commit(), + SLOT_ELEMENT(expected) + .key("startCrystfel") + .allowedStates(State.STOPPED) + .commit(), + + SLOT_ELEMENT(expected) + .key("stopCrystfel") + .allowedStates(State.STARTED) + .commit(), + NODE_ELEMENT(expected) - .key("crystfelProcess") + .key("crystfelStats") .commit(), - STRING_ELEMENT(expected) - .key("crystfelProcess.state") - .setSpecialDisplayType("State") + UINT64_ELEMENT(expected) + .key("crystfelStats.images") .readOnly() - .initialValue("STOPPED") + .initialValue(0) .commit(), - SLOT_ELEMENT(expected) - .key("crystfelProcess.start") + UINT64_ELEMENT(expected) + .key("crystfelStats.hits") + .readOnly() + .initialValue(0) .commit(), - SLOT_ELEMENT(expected) - .key("crystfelProcess.stop") + UINT64_ELEMENT(expected) + .key("crystfelStats.indexed") + .readOnly() + .initialValue(0) + .commit(), + + UINT64_ELEMENT(expected) + .key("crystfelStats.crystals") + .readOnly() + .initialValue(0) .commit(), NODE_ELEMENT(expected) @@ -248,8 +269,8 @@ class CrystfelRunner(PythonDevice): self._crystfel_proc = None self._tail_proc = None - self.KARABO_SLOT(self.crystfelProcess_start) - self.KARABO_SLOT(self.crystfelProcess_stop) + self.KARABO_SLOT(self.startCrystfel) + self.KARABO_SLOT(self.stopCrystfel) self.KARABO_SLOT(self.trainQueue_reset) self.registerInitialFunction(self._initialization) @@ -289,21 +310,21 @@ class CrystfelRunner(PythonDevice): "doesn't specify peak list path" ) - self.updateState(State.STOPPED) self._serve_to_crystfel() - self.crystfelProcess_start() + self.updateState(State.STOPPED) + self.startCrystfel() - def crystfelProcess_start(self): + def startCrystfel(self): self._run_crystfel() - self.set("crystfelProcess.state", "STARTED") + self.updateState(State.STARTED) - def crystfelProcess_stop(self): + def stopCrystfel(self): self._tail_proc.terminate() self._crystfel_proc.terminate() - self.set("crystfelProcess.state", "STOPPED") + self.updateState(State.STOPPED) def trainQueue_reset(self): - self._crystfel_queue = queue.Queue(maxlen=self.get("trainQueue.capacity")) + self._crystfel_queue = queue.Queue(maxsize=self.get("trainQueue.capacity")) self.set("trainQueue.fullness", 0) def input_handler(self, input_channel): @@ -364,7 +385,7 @@ class CrystfelRunner(PythonDevice): self._crystfel_queue.put_nowait(msgpack.packb([payload])) self.set( "trainQueue.fullness", - len(self._crystfel_queue) / self._crystfel_queue.maxlen * 100 + len(self._crystfel_queue) / self._crystfel_queue.maxsize * 100 ) def _serve_to_crystfel(self): @@ -375,11 +396,11 @@ class CrystfelRunner(PythonDevice): msg = self._crystfel_socket.recv() assert msg == b"next" self._crystfel_socket.send( - self._crystfel_queue.get(block=True, timeout=5) + self._crystfel_queue.get(block=True) ) self.set( "trainQueue.fullness", - len(self._crystfel_queue) / self._crystfel_queue.maxlen * 100 + len(self._crystfel_queue) / self._crystfel_queue.maxsize * 100 ) threading.Thread(target=server, daemon=True).start() @@ -429,8 +450,20 @@ class CrystfelRunner(PythonDevice): ) def handle_stderr(): + stderr_re = re.compile( + r"(?P<images>\d+) images processed, (?P<hits>\d+) hits" + r".*? (?P<indexed>\d+) indexable" + r".*? (?P<crystals>\d+) crystals" + ) for line in self._crystfel_proc.stderr: - self.log.INFO(f"crystfel stderr: {line.rstrip()}") + if (match := stderr_re.match(line)) is None: + print(line) + else: + self.set( + "crystfelStats", + Hash(*itertools.chain.from_iterable(match.groupdict().items())) + ) + #self.log.INFO(f"crystfel stderr: {line.rstrip()}") def handle_stdout(): for line in self._crystfel_proc.stdout: