# This file is part of CFELPyUtils.
#
# CFELPyUtils is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# CFELPyUtils is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with OnDA. If
# not, see <http://www.gnu.org/licenses/>.
#
# Copyright 2014-2019 Deutsches Elektronen-Synchrotron DESY,
# a research centre of the Helmholtz Association.
"""
CrystFEL utilities.
This module contains Python reimplementations of some functions from the `CrystFEL
<http://www.desy.de/~twhite/crystfel>`_ software package.
"""
from __future__ import absolute_import, division, print_function
import collections
import copy
import math
import re
from typing import Any, Dict, List, Union, Tuple # pylint: disable=unused-import
from future.utils import viewitems
def _assplode_algebraic(value):
# type: (str) -> List[str]
# Reimplementation of assplode_algegraic from libcrystfel/src/detector.c.
items = [item for item in re.split("([+-])", string=value.strip()) if item != ""]
if items and items[0] not in ("+", "-"):
items.insert(0, "+")
return ["".join((items[x], items[x + 1])) for x in range(0, len(items), 2)]
def _dir_conv(direction_x, direction_y, direction_z, value):
# type: (float, float, float, str) -> List[float]
# Reimplementation of dir_conv from libcrystfel/src/detector.c.
direction = [direction_x, direction_y, direction_z]
items = _assplode_algebraic(value)
if not items:
raise RuntimeError("Invalid direction: {}.".format(value))
for item in items:
axis = item[-1]
if axis != "x" and axis != "y" and axis != "z":
raise RuntimeError("Invalid Symbol: {} (must be x, y or z).".format(axis))
if item[:-1] == "+":
value = "1.0"
elif item[:-1] == "-":
value = "-1.0"
else:
value = item[:-1]
if axis == "x":
direction[0] = float(value)
elif axis == "y":
direction[1] = float(value)
elif axis == "z":
direction[2] = float(value)
return direction
def _set_dim_structure_entry(key, value, panel):
# type: (str, str, Dict[str, Any]) -> None
# Reimplementation of set_dim_structure_entry from libcrystfel/src/events.c.
if panel["dim_structure"] is not None:
dim = panel["dim_structure"]
else:
dim = []
try:
dim_index = int(key[3])
except IndexError:
raise RuntimeError("'dim' must be followed by a number, e.g. 'dim0')")
except ValueError:
raise RuntimeError("Invalid dimension number {}".format(key[3]))
if dim_index > len(dim) - 1:
for _ in range(len(dim), dim_index + 1):
dim.append(None)
if value == "ss" or value == "fs" or value == "%":
dim[dim_index] = value
elif value.isdigit():
dim[dim_index] = int(value)
else:
raise RuntimeError("Invalid dim entry: {}.".format(value))
panel["dim_structure"] = dim
def _parse_field_for_panel(key, value, panel):
# type: (str, str, Dict[str, Any]) -> None
# Reimplementation of parse_field_for_panel from libcrystfel/src/detector.c.
if key == "min_fs":
panel["origin_min_fs"] = int(value)
panel["min_fs"] = int(value)
elif key == "max_fs":
panel["origin_max_fs"] = int(value)
panel["max_fs"] = int(value)
elif key == "min_ss":
panel["origin_min_ss"] = int(value)
panel["min_ss"] = int(value)
elif key == "max_ss":
panel["origin_max_ss"] = int(value)
panel["max_ss"] = int(value)
elif key == "corner_x":
panel["cnx"] = float(value)
elif key == "corner_y":
panel["cny"] = float(value)
elif key == "rail_direction":
try:
panel["rail_x"], panel["rail_y"], panel["rail_z"] = _dir_conv(
direction_x=panel["rail_x"],
direction_y=panel["rail_y"],
direction_z=panel["rail_z"],
value=value,
)
except RuntimeError as exc:
raise RuntimeError("Invalid rail direction. ", exc)
elif key == "clen_for_centering":
panel["clen_for_centering"] = float(value)
elif key == "adu_per_eV":
panel["adu_per_eV"] = float(value)
elif key == "adu_per_photon":
panel["adu_per_photon"] = float(value)
elif key == "rigid_group":
panel["rigid_group"] = value
elif key == "clen":
try:
panel["clen"] = float(value)
panel["clen_from"] = None
except ValueError:
panel["clen"] = -1
panel["clen_from"] = value
elif key == "data":
if not value.startswith("/"):
raise RuntimeError("Invalid data location: {}".format(value))
panel["data"] = value
elif key == "mask":
if not value.startswith("/"):
raise RuntimeError("Invalid data location: {}".format(value))
panel["mask"] = value
elif key == "mask_file":
panel["mask_file"] = value
elif key == "saturation_map":
panel["saturation_map"] = value
elif key == "saturation_map_file":
panel["saturation_map_file"] = value
elif key == "coffset":
panel["coffset"] = float(value)
elif key == "res":
panel["res"] = float(value)
elif key == "max_adu":
panel["max_adu"] = value
elif key == "badrow_direction":
if value == "x":
panel["badrow"] = "f"
elif value == "y":
panel["badrow"] = "s"
elif value == "f":
panel["badrow"] = "f"
elif value == "s":
panel["badrow"] = "s"
elif value == "-":
panel["badrow"] = "-"
else:
print("badrow_direction must be x, t, f, s, or '-'")
print("Assuming '-'.")
panel["badrow"] = "-"
elif key == "no_index":
panel["no_index"] = bool(value)
elif key == "fs":
try:
panel["fsx"], panel["fsy"], panel["fsz"] = _dir_conv(
direction_x=panel["fsx"],
direction_y=panel["fsy"],
direction_z=panel["fsz"],
value=value,
)
except RuntimeError as exc:
raise RuntimeError("Invalid fast scan direction.", exc)
elif key == "ss":
try:
panel["ssx"], panel["ssy"], panel["ssz"] = _dir_conv(
direction_x=panel["ssx"],
direction_y=panel["ssy"],
direction_z=panel["ssz"],
value=value,
)
except RuntimeError as exc:
raise RuntimeError("Invalid slow scan direction.", exc)
elif key.startswith("dim"):
_set_dim_structure_entry(key=key, value=value, panel=panel)
else:
RuntimeError("Unrecognised field: {}".format(key))
def _parse_toplevel(key, value, detector, beam, panel):
# type: (str, str, Dict[str, Any], Dict[str, Any], Dict[str, Any]) -> None
# Reimplementation of parse_toplevel from libcrystfel/src/detector.c.
if key == "mask_bad":
try:
detector["mask_bad"] = int(value)
except ValueError:
detector["mask_bad"] = int(value, base=16)
elif key == "mask_good":
try:
detector["mask_good"] = int(value)
except ValueError:
detector["mask_good"] = int(value, base=16)
elif key == "coffset":
panel["coffset"] = float(value)
elif key == "photon_energy":
if value.startswith("/"):
beam["photon_energy"] = 0.0
beam["photon_energy_from"] = value
else:
beam["photon_energy"] = float(value)
beam["photon_energy_from"] = None
elif key == "photon_energy_scale":
beam["photon_energy_scale"] = float(value)
elif key == "peak_info_location":
detector["peak_info_location"] = value
elif key.startswith("rigid_group") and not key.startswith("rigid_group_collection"):
detector["rigid_groups"][key[12:]] = value.split(",")
elif key.startswith("rigid_group_collection"):
detector["rigid_group_collections"][key[23:]] = value.split(",")
else:
_parse_field_for_panel(key=key, value=value, panel=panel)
def _check_bad_fsss(bad_region, is_fsss):
# type: (Dict[str, Any], int) -> None
# Reimplementation of check_bad_fsss from libcrystfel/src/detector.c.
if bad_region["is_fsss"] == 99:
bad_region["is_fsss"] = is_fsss
return
if is_fsss != bad_region["is_fsss"]:
raise RuntimeError("You can't mix x/y and fs/ss in a bad region")
return
def _parse_field_bad(key, value, bad):
# type: (str, str, Dict[str, Any]) -> None
# Reimplementation of parse_field_bad from libcrystfel/src/detector.c.
if key == "min_x":
bad["min_x"] = float(value)
_check_bad_fsss(bad_region=bad, is_fsss=False)
elif key == "max_x":
bad["max_x"] = float(value)
_check_bad_fsss(bad_region=bad, is_fsss=False)
elif key == "min_y":
bad["min_y"] = float(value)
_check_bad_fsss(bad_region=bad, is_fsss=False)
elif key == "max_y":
bad["max_y"] = float(value)
_check_bad_fsss(bad_region=bad, is_fsss=False)
elif key == "min_fs":
bad["min_fs"] = int(value)
_check_bad_fsss(bad_region=bad, is_fsss=True)
elif key == "max_fs":
bad["max_fs"] = int(value)
_check_bad_fsss(bad_region=bad, is_fsss=True)
elif key == "min_ss":
bad["min_ss"] = int(value)
_check_bad_fsss(bad_region=bad, is_fsss=True)
elif key == "max_ss":
bad["max_ss"] = int(value)
_check_bad_fsss(bad_region=bad, is_fsss=True)
elif key == "panel":
bad["panel"] = value
else:
raise RuntimeError("Unrecognised field: {}".format(key))
return
def _check_point(
name, # type: str
panel, # type: Dict[str, Any]
fs_, # type: int
ss_, # type: int
min_d, # type: float
max_d, # type: float
detector, # type: Dict[str, Any]
):
# type: (...) -> Tuple[float, float]
# Reimplementation of check_point from libcrystfel/src/detector.c.
xs_ = fs_ * panel["fsx"] + ss_ * panel["ssx"]
ys_ = fs_ * panel["fsy"] + ss_ * panel["ssy"]
rx_ = (xs_ + panel["cnx"]) / panel["res"]
ry_ = (ys_ + panel["cny"]) / panel["res"]
dist = math.sqrt(rx_ * rx_ + ry_ * ry_)
if dist > max_d:
detector["furthest_out_panel"] = name
detector["furthest_out_fs"] = fs_
detector["furthest_out_ss"] = ss_
max_d = dist
elif dist < min_d:
detector["furthest_in_panel"] = name
detector["furthest_in_fs"] = fs_
detector["furthest_in_ss"] = ss_
min_d = dist
return min_d, max_d
def _find_min_max_d(detector):
# type: (Dict[str, Any]) -> None
# Reimplementation of find_min_max_d from libcrystfel/src/detector.c.
min_d = float("inf")
max_d = 0.0
for name, panel in detector["panels"].items():
min_d, max_d = _check_point(
name=name,
panel=panel,
fs_=0,
ss_=0,
min_d=min_d,
max_d=max_d,
detector=detector,
)
min_d, max_d = _check_point(
name=name,
panel=panel,
fs_=panel["w"],
ss_=0,
min_d=min_d,
max_d=max_d,
detector=detector,
)
min_d, max_d = _check_point(
name=name,
panel=panel,
fs_=0,
ss_=panel["h"],
min_d=min_d,
max_d=max_d,
detector=detector,
)
min_d, max_d = _check_point(
name=name,
panel=panel,
fs_=panel["w"],
ss_=panel["h"],
min_d=min_d,
max_d=max_d,
detector=detector,
)
[docs]def load_crystfel_geometry(filename):
# type: (str) -> Dict[str, Any]
"""
Loads a CrystFEL geometry file.
This function is a reimplementation of the get_detector_geometry_2 function from
CrystFEL. It reads information from a CrystFEL geometry file.
For a full documentation of the CrystFEL geometry format, see the relevant `man
page <http://www.desy.de/~twhite/crystfel/manual-crystfel_geometry.html>`_.
The function returns a dictionary with the geometry information.
* The CrystFEL geometry file uses a key/value language. The keys in the returned
dictionary match the keys in the geometry file.
* The dictionary values store the corresponding values.
* The code of this function is currently synchronized with the code of the function
'get_detector_geometry_2' in CrystFEL at commit 41a8fa9819010.
Arguments:
filename (str): the absolute or relative path to a CrystFEL geometry file.
Returns:
Dict[str, Any]: a dictionary with the geometry information loaded from the
file.
"""
beam = {"photon_energy": 0.0, "photon_energy_from": None, "photon_energy_scale": 1}
detector = {
"panels": collections.OrderedDict(),
"bad": collections.OrderedDict(),
"mask_good": 0,
"mask_bad": 0,
"rigid_groups": {},
"rigid_group_collections": {},
}
default_panel = {
"cnx": None,
"cny": None,
"clen": None,
"coffset": 0.0,
"res": -1.0,
"badrow": "-",
"no_index": False,
"fsx": 1.0,
"fsy": 0.0,
"fsz": 0.0,
"ssx": 0.0,
"ssy": 1.0,
"ssz": 0.0,
"rail_x": None,
"rail_y": None,
"rail_z": None,
"clen_for_centering": None,
"adu_per_eV": None,
"adu_per_photon": None,
"max_adu": float("inf"),
"mask": None,
"mask_file": None,
"satmap": None,
"satmap_file": None,
"data": None,
"dim_structure": None,
}
default_bad_region = {
"min_x": None,
"max_x": None,
"min_y": None,
"max_y": None,
"min_fs": 0,
"max_fs": 0,
"min_ss": 0,
"max_ss": 0,
"is_fsss": 99,
}
default_dim = ["ss", "fs"]
fhandle = open(filename, mode="r")
fhlines = fhandle.readlines()
for line in fhlines:
if line.startswith(";"):
continue
line_without_comments = line.strip().split(";")[0]
line_items = re.split(pattern="([ \t])", string=line_without_comments)
line_items = [item for item in line_items if item not in ("", " ", "\t")]
if len(line_items) < 3:
continue
value = "".join(line_items[2:])
if line_items[1] != "=":
continue
path = re.split("(/)", line_items[0])
path = [item for item in path if item not in "/"]
if len(path) < 2:
_parse_toplevel(
key=line_items[0],
value=value,
detector=detector,
beam=beam,
panel=default_panel,
)
continue
curr_bad = None
curr_panel = None
if path[0].startswith("bad"):
if path[0] in detector["bad"]:
curr_bad = detector["bad"][path[0]]
else:
curr_bad = copy.deepcopy(default_bad_region)
detector["bad"][path[0]] = curr_bad
else:
if path[0] in detector["panels"]:
curr_panel = detector["panels"][path[0]]
else:
curr_panel = copy.deepcopy(default_panel)
detector["panels"][path[0]] = curr_panel
if curr_panel is not None:
_parse_field_for_panel(key=path[1], value=value, panel=curr_panel)
else:
_parse_field_bad(key=path[1], value=value, bad=curr_bad)
if not detector["panels"]:
raise RuntimeError("No panel descriptions in geometry file.")
num_placeholders_in_panels = None
for panel in detector["panels"].values():
if panel["dim_structure"] is not None:
curr_num_placeholders = panel["dim_structure"].count("%")
else:
curr_num_placeholders = 0
if num_placeholders_in_panels is None:
num_placeholders_in_panels = curr_num_placeholders
else:
if curr_num_placeholders != num_placeholders_in_panels:
raise RuntimeError(
"All panels' data and mask entries must have the same number of "
"placeholders."
)
num_placeholders_in_masks = None
for panel in detector["panels"].values():
if panel["mask"] is not None:
curr_num_placeholders = panel["mask"].count("%")
else:
curr_num_placeholders = 0
if num_placeholders_in_masks is None:
num_placeholders_in_masks = curr_num_placeholders
else:
if curr_num_placeholders != num_placeholders_in_masks:
raise RuntimeError(
"All panels' data and mask entries must have the same number of "
"placeholders."
)
if num_placeholders_in_masks > num_placeholders_in_panels:
raise RuntimeError(
"Number of placeholders in mask cannot be larger the number than for data."
)
dim_length = None
for panel_name, panel in viewitems(detector["panels"]):
if panel["dim_structure"] is None:
panel["dim_structure"] = copy.deepcopy(default_dim)
found_ss = 0
found_fs = 0
found_placeholder = 0
for dim_index, entry in enumerate(panel["dim_structure"]):
if entry is None:
raise RuntimeError(
"Dimension {} for panel {} is undefined.".format(
dim_index, panel_name
)
)
elif entry == "ss":
found_ss += 1
elif entry == "fs":
found_fs += 1
elif entry == "%":
found_placeholder += 1
if found_ss != 1:
raise RuntimeError(
"Exactly one slow scan dim coordinate is needed (found {} for panel "
"{}).".format(found_ss, panel_name)
)
if found_fs != 1:
raise RuntimeError(
"Exactly one fast scan dim coordinate is needed (found {} for panel "
"{}).".format(found_fs, panel_name)
)
if found_placeholder > 1:
raise RuntimeError(
"Only one placeholder dim coordinate is allowed. Maximum one "
"placeholder dim coordinate is allowed (found {} for panel {})".format(
found_placeholder, panel_name
)
)
if dim_length is None:
dim_length = len(panel["dim_structure"])
elif dim_length != len(panel["dim_structure"]):
raise RuntimeError(
"Number of dim coordinates must be the same for all panels."
)
if dim_length == 1:
raise RuntimeError("Number of dim coordinates must be at least two.")
for panel_name, panel in viewitems(detector["panels"]):
if panel["origin_min_fs"] < 0:
raise RuntimeError(
"Please specify the minimum fs coordinate for panel {}.".format(
panel_name
)
)
if panel["origin_max_fs"] < 0:
raise RuntimeError(
"Please specify the maximum fs coordinate for panel {}.".format(
panel_name
)
)
if panel["origin_min_ss"] < 0:
raise RuntimeError(
"Please specify the minimum ss coordinate for panel {}.".format(
panel_name
)
)
if panel["origin_max_ss"] < 0:
raise RuntimeError(
"Please specify the maximum ss coordinate for panel {}.".format(
panel_name
)
)
if panel["cnx"] is None:
raise RuntimeError(
"Please specify the corner X coordinate for panel {}.".format(
panel_name
)
)
if panel["clen"] is None and panel["clen_from"] is None:
raise RuntimeError(
"Please specify the camera length for panel {}.".format(panel_name)
)
if panel["res"] < 0:
raise RuntimeError(
"Please specify the resolution or panel {}.".format(panel_name)
)
if panel["adu_per_eV"] is None and panel["adu_per_photon"] is None:
raise RuntimeError(
"Please specify either adu_per_eV or adu_per_photon for panel "
"{}.".format(panel_name)
)
if panel["clen_for_centering"] is None and panel["rail_x"] is not None:
raise RuntimeError(
"You must specify clen_for_centering if you specify the rail "
"direction (panel {})".format(panel_name)
)
if panel["rail_x"] is None:
panel["rail_x"] = 0.0
panel["rail_y"] = 0.0
panel["rail_z"] = 1.0
if panel["clen_for_centering"] is None:
panel["clen_for_centering"] = 0.0
panel["w"] = panel["origin_max_fs"] - panel["origin_min_fs"] + 1
panel["h"] = panel["origin_max_ss"] - panel["origin_min_ss"] + 1
for bad_region_name, bad_region in viewitems(detector["bad"]):
if bad_region["is_fsss"] == 99:
raise RuntimeError(
"Please specify the coordinate ranges for bad region {}.".format(
bad_region_name
)
)
for group in detector["rigid_groups"]:
for name in detector["rigid_groups"][group]:
if name not in detector["panels"]:
raise RuntimeError(
"Cannot add panel to rigid_group. Panel not found: {}".format(name)
)
for group_collection in detector["rigid_group_collections"]:
for name in detector["rigid_group_collections"][group_collection]:
if name not in detector["rigid_groups"]:
raise RuntimeError(
"Cannot add rigid_group to collection. Rigid group not "
"found: {}".format(name)
)
for panel in detector["panels"].values():
d__ = panel["fsx"] * panel["ssy"] - panel["ssx"] * panel["fsy"]
if d__ == 0.0:
raise RuntimeError("Panel {} transformation is singluar.")
panel["xfs"] = panel["ssy"] / d__
panel["yfs"] = panel["ssx"] / d__
panel["xss"] = panel["fsy"] / d__
panel["yss"] = panel["fsx"] / d__
_find_min_max_d(detector)
fhandle.close()
return detector