diff --git a/src/calng/CrystfelRunner.py b/src/calng/CrystfelRunner.py index ee320719a359597d28ef815b058ac65d887b7854..d0daa9f4af57940dd06723a087c1f8601d7ef924 100644 --- a/src/calng/CrystfelRunner.py +++ b/src/calng/CrystfelRunner.py @@ -22,6 +22,7 @@ from karabo.bound import ( FLOAT_ELEMENT, VECTOR_STRING_ELEMENT, VECTOR_FLOAT_ELEMENT, + NDARRAY_ELEMENT, Hash, PythonDevice, State, @@ -137,48 +138,38 @@ class CrystfelRunner(PythonDevice): .initialValue(0) .commit(), - NODE_ELEMENT(expected) - .key("crystfelDetailedStats") - .commit(), - UINT64_ELEMENT(expected) - .key("crystfelDetailedStats.nHits") + .key("crystfelStats.reflections") .readOnly() .initialValue(0) .commit(), - UINT64_ELEMENT(expected) - .key("crystfelDetailedStats.nCrystals") - .readOnly() - .initialValue(0) - .commit(), - - UINT64_ELEMENT(expected) - .key("crystfelDetailedStats.nReflections") + FLOAT_ELEMENT(expected) + .key("crystfelStats.hitRate") .readOnly() .initialValue(0) .commit(), FLOAT_ELEMENT(expected) - .key("crystfelDetailedStats.hitRate") + .key("crystfelStats.indexRate") .readOnly() .initialValue(0) .commit(), - FLOAT_ELEMENT(expected) - .key("crystfelDetailedStats.indexRate") + VECTOR_FLOAT_ELEMENT(expected) + .key("crystfelStats.detCentre") .readOnly() .initialValue(0) .commit(), VECTOR_FLOAT_ELEMENT(expected) - .key("crystfelDetailedStats.detCentre") + .key("crystfelStats.detCentreUnc") .readOnly() .initialValue(0) .commit(), - VECTOR_FLOAT_ELEMENT(expected) - .key("crystfelDetailedStats.detCentreUnc") + NDARRAY_ELEMENT(expected) + .key("crystfelStats.cell") .readOnly() .initialValue(0) .commit(), @@ -477,13 +468,24 @@ class CrystfelRunner(PythonDevice): self.get("crystfelArgs.crystfelBinPath") + ":" + env.get("PATH", "") ) + # we must have a stream output! + # if empty, set it to something and delete after we are done reading it + streamOutputPath = self.get("crystfelArgs.streamOutputPath") + deleteOutput = False + if len(streamOutputPath) == 0: + # TODO: Is /tmp ok? + # the goal is only to prevent + user = env["USER"] + name = self.get("deviceId").replace("/", "_") + streamOutputPath = f"/tmp/{user}/out_{name}.stream" + deleteOutput = True args = [ "indexamajig", f"--zmq-input=tcp://localhost:{self.get('crystfelArgs.zmqPort')}", "--zmq-request=next", "--data-format=msgpack", "-o", - self.get("crystfelArgs.streamOutputPath"), + streamOutputPath, "-g", self.get("crystfelArgs.geometryPath"), "--no-mask-data", @@ -498,6 +500,7 @@ class CrystfelRunner(PythonDevice): args.extend(reparse_fix_comma_mess(self.get("crystfelArgs.misc"))) self.set("crystfelArgs.commandline", " ".join(args)) + # TODO: cd somewhere for the indexamajig folder? self.log.INFO("Spawning indexamajig") self._crystfel_proc = subprocess.Popen( @@ -510,34 +513,9 @@ class CrystfelRunner(PythonDevice): env=env, ) - def handle_stderr(): - stderr_re = re.compile( - r"(?P<images>\d+) images processed, (?P<hits>\d+) hits" - r".*? (?P<indexed>\d+) indexable" - r".*? (?P<crystals>\d+) crystals" - ) - for line in self._crystfel_proc.stderr: - if (match := stderr_re.match(line)) is None: - print(line) - else: - self.set( - "crystfelStats", - Hash(*itertools.chain.from_iterable(match.groupdict().items())) - ) - #self.log.INFO(f"crystfel stderr: {line.rstrip()}") - - def handle_stdout(): - for line in self._crystfel_proc.stdout: - self.log.INFO(f"crystfel stdout: {line.rstrip()}") - - threading.Thread(target=handle_stderr, daemon=True).start() - threading.Thread(target=handle_stdout, daemon=True).start() - - time.sleep(1) - - # TODO: do something with stream file + # do something with stream file self._tail_proc = subprocess.Popen( - ["tail", "-F", "-n", "+0", self.get("crystfelArgs.streamOutputPath")], + ["tail", "-F", "-n", "+0", streamOutputPath], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, @@ -545,87 +523,81 @@ class CrystfelRunner(PythonDevice): bufsize=1, ) - def handle_tail(): - do_print = False - buffered = [] - for line in self._tail_proc.stdout: - if line.strip() == "----- Begin chunk -----": - do_print = True - continue - elif line.strip() == "----- End chunk -----": - do_print = False - if buffered: - self.log.INFO(f"From stream file:\n{buffered}") - buffered = [] - continue - if not do_print: - continue - if any( - line.startswith(prefix) - for prefix in ( - "hit", "indexed_by", "n_indexing_tries", "num_peaks" - ) - ): - buffered.append(line) - - threading.Thread(target=handle_tail, daemon=True).start() - def collect_details(): """This goes deeper in the output file structure to collect more information. It is probably not often essential.""" - n_data = self["crystfelStats.images"] + results = dict() - reading_chunk = False - n_reflections = list() - cell = list() - det_centre = list() - for line in self._tail_proc.stdout: - if reading_chunk: - if line.startswith('----- End chunk -----'): - reading_chunk = False - elif line.startswith('num_reflections'): - n_reflections[-1] = int(line.split(" = ")[1]) - if "Cell parameters" in line: - # Cell parameters 7.96703 7.95613 3.81936 nm, \ - # 90.15754 90.52371 90.09970 deg - lsplit = line.split() - a, b, c = [float(i) for i in lsplit[2:5]] - alpha, beta, gamma = [float(i) for i in lsplit[6:9]] - cell.append([a, b, c, alpha, beta, gamma]) - if "predict_refine/det_shift" in line: - # predict_refine/det_shift x = -0.013 y = -0.113 mm - lsplit = line.split() - det_x = float(lsplit[-5])*1e-3 - det_y = float(lsplit[-2])*1e-3 - det_centre += [np.array([det_x, det_y])] - elif line.startswith('----- Begin chunk -----'): - reading_chunk = True - n_reflections.append(0) - n_reflections = np.array(n_reflections) - - results['nHits'] = len(n_reflections) - results['nCrystals'] = len(np.where(n_reflections > 0)[0]) - results['nReflections'] = np.sum(n_reflections) - if n_data <= 0: - results['hitRate'] = 0.0 - results['indexRate'] = 0.0 - else: - results['hitRate'] = results['n_hits'] / n_data - results['indexRate'] = results['n_crystals'] / n_data - if len(det_centre) != 0: - det_centre = np.stack(det_centre, axis=0) - results['detCentre'] = np.mean(det_centre, axis=0).tolist() - results['detCentreUnc'] = np.std(det_centre, axis=0).tolist() - else: - results['detCentre'] = [0.0, 0.0] - results['detCentreUnc'] = [0.0, 0.0] + try: + # handle output stream + reading_chunk = False + n_reflections = list() + cell = list() + det_centre = list() + for line in self._tail_proc.stdout: + if reading_chunk: + if line.startswith('----- End chunk -----'): + reading_chunk = False + elif line.startswith('num_reflections'): + n_reflections[-1] = int(line.split(" = ")[1]) + if "Cell parameters" in line: + # Cell parameters 7.96703 7.95613 3.81936 nm, \ + # 90.15754 90.52371 90.09970 deg + lsplit = line.split() + a, b, c = [float(i) for i in lsplit[2:5]] + alpha, beta, gamma = [float(i) for i in lsplit[6:9]] + cell.append(np.array([a, b, c, alpha, beta, gamma])) + if "predict_refine/det_shift" in line: + # predict_refine/det_shift x = -0.013 y = -0.113 mm + lsplit = line.split() + det_x = float(lsplit[-5])*1e-3 + det_y = float(lsplit[-2])*1e-3 + det_centre += [np.array([det_x, det_y])] + elif line.startswith('----- Begin chunk -----'): + reading_chunk = True + n_reflections.append(0) + n_reflections = np.array(n_reflections) + results['reflections'] = np.sum(n_reflections) + results['cell'] = np.stack(cell, axis=0) + + # handle std. error output + stderr_re = re.compile( + r"(?P<images>\d+) images processed, (?P<hits>\d+) hits" + r".*? (?P<indexed>\d+) indexable" + r".*? (?P<crystals>\d+) crystals" + ) + for line in self._crystfel_proc.stderr: + if (match := stderr_re.match(line)) is not None: + results.update(match.groupdict()) + + n_data = results["images"] + if n_data <= 0: + results['hitRate'] = 0.0 + results['indexRate'] = 0.0 + else: + results['hitRate'] = results['hits'] / n_data + results['indexRate'] = results['crystals'] / n_data - for k, v in results.items(): + if len(det_centre) != 0: + det_centre = np.stack(det_centre, axis=0) + results['detCentre'] = np.mean(det_centre, axis=0).tolist() + results['detCentreUnc'] = np.std(det_centre, axis=0).tolist() + else: + results['detCentre'] = [0.0, 0.0] + results['detCentreUnc'] = [0.0, 0.0] + + finally: + # if an exception happens, output what we gathered so far self.set( - f"crystfelDetailedStats.{k}", v + f"crystfelStats", Hash(*itertools.chain.from_iterable(results.items())) ) + # ... and elete output stream file if requested: + if deleteOutput: + os.remove(streamOutputPath) + threading.Thread(target=collect_details, daemon=True).start() utils.add_unsafe_get(CrystfelRunner) +