Newer
Older
# This file is part of cfelpyutils.
#
# cfelpyutils is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cfelpyutils is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cfelpyutils. If not, see <http://www.gnu.org/licenses/>.
"""
Utilities for interoperability with data formats used in the CrystFEL
software package.
Valerio Mariani
committed
Exports:
Functions:
load_crystfel_geometry: a python reimplementation of the
get_detector_geometry_2 function from CrystFEL.
from __future__ import (absolute_import, division, print_function,
unicode_literals)
Valerio Mariani
committed
import collections
import copy
import math
def _assplode_algebraic(value):
Valerio Mariani
committed
# Reimplementation of assplode_algegraic from
# libcrystfel/src/detector.c.
items = [
Valerio Mariani
committed
for item in re.split('([+-])', string=value.strip())
Valerio Mariani
committed
]
Valerio Mariani
committed
items.insert(0, '+')
Valerio Mariani
committed
return [
''.join((items[x], items[x + 1]))
for x in range(0, len(items), 2)
Valerio Mariani
committed
]
def _dir_conv(direction_x, direction_y, direction_z, value):
Valerio Mariani
committed
# Reimplementation of dir_conv from
# libcrystfel/src/detector.c.
direction = [
direction_x,
direction_y,
direction_z
Valerio Mariani
committed
]
items = _assplode_algebraic(value)
if not items:
raise RuntimeError("Invalid direction: {}.".format(value))
for item in items:
axis = item[-1]
if axis != 'x' and axis != 'y' and axis != 'z':
Valerio Mariani
committed
raise RuntimeError(
"Invalid Symbol: {} (must be x, y or z).".format(axis)
Valerio Mariani
committed
)
def _set_dim_structure_entry(key, value, panel):
Valerio Mariani
committed
# Reimplementation of set_dim_structure_entry from
# libcrystfel/src/events.c.
if panel['dim_structure'] is not None:
dim = panel['dim_structure']
else:
dim = []
dim_index = int(key[3])
if dim_index > len(dim) - 1:
for _ in range(len(dim), dim_index + 1):
dim.append(None)
if value == 'ss' or value == 'fs' or value == '%':
dim[dim_index] = value
elif value.isdigit():
dim[dim_index] = int(value)
else:
raise RuntimeError("Invalid dim entry: {}.".format(value))
def _parse_field_for_panel(key, value, panel):
Valerio Mariani
committed
# Reimplementation of parse_field_for_panel from
# libcrystfel/src/detector.c.
if key == 'min_fs':
panel['origin_min_fs'] = int(value)
panel['min_fs'] = int(value)
elif key == 'max_fs':
panel['origin_max_fs'] = int(value)
panel['max_fs'] = int(value)
elif key == 'min_ss':
panel['origin_min_ss'] = int(value)
panel['min_ss'] = int(value)
elif key == 'max_ss':
panel['origin_max_ss'] = int(value)
panel['max_ss'] = int(value)
elif key == 'corner_x':
panel['cnx'] = float(value)
elif key == 'corner_y':
panel['cny'] = float(value)
elif key == 'rail_direction':
try:
Valerio Mariani
committed
panel['rail_x'], panel['rail_y'], panel['rail_z'] = _dir_conv(
direction_x=panel['rail_x'],
direction_y=panel['rail_y'],
direction_z=panel['rail_z'],
value=value
)
except RuntimeError as exc:
raise RuntimeError("Invalid rail direction. ", exc)
elif key == 'clen_for_centering':
panel['clen_for_centering'] = float(value)
elif key == 'adu_per_eV':
panel['adu_per_eV'] = float(value)
elif key == 'adu_per_photon':
panel['adu_per_photon'] = float(value)
elif key == 'rigid_group':
panel['rigid_group'] = value
elif key == 'clen':
try:
panel['clen'] = float(value)
panel['clen_from'] = None
except ValueError:
panel['clen'] = -1
panel['clen_from'] = value
elif key == 'data':
if not value.startswith("/"):
raise RuntimeError("Invalid data location: {}".format(value))
panel['data'] = value
elif key == 'mask':
if not value.startswith("/"):
raise RuntimeError("Invalid data location: {}".format(value))
panel['mask'] = value
elif key == 'mask_file':
panel['mask_file'] = value
elif key == 'saturation_map':
panel['saturation_map'] = value
elif key == 'saturation_map_file':
panel['saturation_map_file'] = value
elif key == 'coffset':
panel['coffset'] = float(value)
elif key == 'res':
panel['res'] = float(value)
elif key == 'max_adu':
panel['max_adu'] = value
elif key == 'badrow_direction':
if value == 'x':
panel['badrow'] = 'f'
elif value == 'y':
panel['badrow'] = 's'
elif value == 'f':
panel['badrow'] = 'f'
elif value == 's':
panel['badrow'] = 's'
elif value == '-':
panel['badrow'] = '-'
print("badrow_direction must be x, t, f, s, or \'-\'")
print("Assuming \'-\'.")
elif key == 'no_index':
panel['no_index'] = bool(value)
elif key == 'fs':
try:
Valerio Mariani
committed
panel['fsx'], panel['fsy'], panel['fsz'] = _dir_conv(
direction_x=panel['fsx'],
direction_y=panel['fsy'],
direction_z=panel['fsz'],
value=value
)
except RuntimeError as exc:
raise RuntimeError("Invalid fast scan direction.", exc)
Valerio Mariani
committed
panel['ssx'], panel['ssy'], panel['ssz'] = _dir_conv(
direction_x=panel['ssx'],
direction_y=panel['ssy'],
direction_z=panel['ssz'],
value=value
)
except RuntimeError as exc:
raise RuntimeError("Invalid slow scan direction.", exc)
elif key.startswith("dim"):
Valerio Mariani
committed
_set_dim_structure_entry(
key=key,
value=value,
panel=panel
)
raise RuntimeError("Unrecognised field: {}".format(key))
Valerio Mariani
committed
def _parse_toplevel(key, value, detector, beam, panel):
# Reimplementation of parse_toplevel from
# libcrystfel/src/detector.c.
if key == 'mask_bad':
try:
detector['mask_bad'] = int(value)
except ValueError:
Valerio Mariani
committed
detector['mask_bad'] = int(x=value, base=16)
elif key == 'mask_good':
try:
detector['mask_good'] = int(value)
except ValueError:
Valerio Mariani
committed
detector['mask_good'] = int(x=value, base=16)
elif key == 'coffset':
panel['coffset'] = float(value)
elif key == 'photon_energy':
if value.startswith('/'):
beam['photon_energy'] = 0.0
beam['photon_energy_from'] = value
beam['photon_energy'] = float(value)
beam['photon_energy_from'] = None
elif key == 'photon_energy_scale':
beam['photon_energy_scale'] = float(value)
elif key == 'peak_info_location':
detector['peak_info_location'] = value
Valerio Mariani
committed
elif (
key.startswith("rigid_group") and not
key.startswith("rigid_group_collection")
Valerio Mariani
committed
):
detector['rigid_groups'][key[12:]] = value.split(',')
elif key.startswith("rigid_group_collection"):
detector['rigid_group_collections'][key[23:]] = value.split(',')
else:
Valerio Mariani
committed
_parse_field_for_panel(
key=key,
value=value,
panel=panel
)
def _check_bad_fsss(bad_region, is_fsss):
Valerio Mariani
committed
# Reimplementation of check_bad_fsss from
# libcrystfel/src/detector.c.
if bad_region['is_fsss'] == 99:
bad_region['is_fsss'] = is_fsss
return
if is_fsss != bad_region['is_fsss']:
raise RuntimeError("You can't mix x/y and fs/ss in a bad region")
return
def _parse_field_bad(key, value, bad):
Valerio Mariani
committed
# Reimplementation of parse_field_bad from
# libcrystfel/src/detector.c.
bad['min_x'] = float(value)
Valerio Mariani
committed
_check_bad_fsss(bad_region=bad, is_fsss=False)
bad['max_x'] = float(value)
Valerio Mariani
committed
_check_bad_fsss(bad_region=bad, is_fsss=False)
bad['min_y'] = float(value)
Valerio Mariani
committed
_check_bad_fsss(bad_region=bad, is_fsss=False)
bad['max_y'] = float(value)
Valerio Mariani
committed
_check_bad_fsss(bad_region=bad, is_fsss=False)
bad['min_fs'] = int(value)
Valerio Mariani
committed
_check_bad_fsss(bad_region=bad, is_fsss=True)
bad['max_fs'] = int(value)
Valerio Mariani
committed
_check_bad_fsss(bad_region=bad, is_fsss=True)
bad['min_ss'] = int(value)
Valerio Mariani
committed
_check_bad_fsss(bad_region=bad, is_fsss=True)
bad['max_ss'] = int(value)
Valerio Mariani
committed
_check_bad_fsss(bad_region=bad, is_fsss=True)
elif key == 'panel':
bad['panel'] = value
else:
raise RuntimeError("Unrecognised field: {}".format(key))
Valerio Mariani
committed
def _check_point(name,
panel,
fs_,
ss_,
min_d,
max_d,
detector):
Valerio Mariani
committed
# Reimplementation of check_point from
# libcrystfel/src/detector.c.
Valerio Mariani
committed
xs_ = fs_ * panel['fsx'] + ss_ * panel['ssx']
ys_ = fs_ * panel['fsy'] + ss_ * panel['ssy']
rx_ = (xs_ + panel['cnx']) / panel['res']
ry_ = (ys_ + panel['cny']) / panel['res']
dist = math.sqrt(rx_ * rx_ + ry_ * ry_)
if dist > max_d:
detector['furthest_out_panel'] = name
Valerio Mariani
committed
detector['furthest_out_fs'] = fs_
detector['furthest_out_ss'] = ss_
max_d = dist
elif dist < min_d:
detector['furthest_in_panel'] = name
Valerio Mariani
committed
detector['furthest_in_fs'] = fs_
detector['furthest_in_ss'] = ss_
min_d = dist
return min_d, max_d
def _find_min_max_d(detector):
Valerio Mariani
committed
# Reimplementation of find_min_max_d from
# libcrystfel/src/detector.c.
max_d = 0.0
for name, panel in detector['panels'].items():
Valerio Mariani
committed
min_d, max_d = _check_point(
name=name,
panel=panel,
Valerio Mariani
committed
fs_=0,
ss_=0,
Valerio Mariani
committed
min_d=min_d,
max_d=max_d,
detector=detector
)
min_d, max_d = _check_point(
name=name,
panel=panel,
Valerio Mariani
committed
fs_=panel['w'],
ss_=0,
Valerio Mariani
committed
min_d=min_d,
max_d=max_d,
detector=detector
)
Valerio Mariani
committed
min_d, max_d = _check_point(
name=name,
panel=panel,
Valerio Mariani
committed
fs_=0,
ss_=panel['h'],
Valerio Mariani
committed
min_d=min_d,
max_d=max_d,
detector=detector
)
min_d, max_d = _check_point(
name=name,
panel=panel,
Valerio Mariani
committed
fs_=panel['w'],
ss_=panel['h'],
Valerio Mariani
committed
min_d=min_d,
max_d=max_d,
detector=detector
)
def load_crystfel_geometry(filename):
"""
Load a CrystFEL geometry file into a dictionary.
Reimplementation of the get_detector_geometry_2 function from
CrystFEL in python. Return a dictionary with the geometry
information read from the file. Convert entries in the geometry
file to keys in the returned dictionary. For a full documentation
on the CrystFEL geometry format, see:
http://www.desy.de/~twhite/crystfel/manual-crystfel_geometry.html
The code of this function is synced with the code of the function
Valerio Mariani
committed
'get_detector_geometry_2' in CrystFEL at commit
41a8fa9819010fe8ddeb66676fee717f5226c7b8.
Valerio Mariani
committed
filename (str): filename of the geometry file.
Valerio Mariani
committed
Dict: dictionary with the geometry information loaded from the
beam = {
'photon_energy': 0.0,
'photon_energy_from': None,
'photon_energy_scale': 1
}
detector = {
Valerio Mariani
committed
'panels': collections.OrderedDict(),
'bad': collections.OrderedDict(),
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
'mask_good': 0,
'mask_bad': 0,
'rigid_groups': {},
'rigid_group_collections': {}
}
default_panel = {
'cnx': None,
'cny': None,
'clen': None,
'coffset': 0.0,
'res': -1.0,
'badrow': '-',
'no_index': False,
'fsx': 1.0,
'fsy': 0.0,
'fsz': 0.0,
'ssx': 0.0,
'ssy': 1.0,
'ssz': 0.0,
'rail_x': None,
'rail_y': None,
'rail_z': None,
'clen_for_centering': None,
'adu_per_eV': None,
'adu_per_photon': None,
Valerio Mariani
committed
'max_adu': float(x='inf'),
'mask': None,
'mask_file': None,
'satmap': None,
'satmap_file': None,
'data': None,
'dim_structure': None,
'name': ''
}
default_bad_region = {
'min_x': None,
'max_x': None,
'min_y': None,
'max_y': None,
'min_fs': 0,
'max_fs': 0,
'min_ss': 0,
'max_ss': 0,
'is_fsss': 99,
'name': ''
}
default_dim = [
'ss', 'fs'
]
Valerio Mariani
committed
fhandle = open(filename, mode='r')
Valerio Mariani
committed
fhlines = fhandle.readlines()
if line.startswith(";"):
Valerio Mariani
committed
line_without_comments = line.strip().split(';')[0]
Valerio Mariani
committed
line_items = re.split(pattern='([ \t])', string=line_without_comments)
line_items = [
item
for item in line_items
if item not in ('', ' ', '\t')
Valerio Mariani
committed
]
if len(line_items) < 3:
continue
value = ''.join(line_items[2:])
if line_items[1] != '=':
continue
path = re.split('(/)', line_items[0])
path = [item for item in path if item not in '/']
if len(path) < 2:
Valerio Mariani
committed
_parse_toplevel(
key=line_items[0],
value=value,
detector=detector,
beam=beam,
panel=default_panel
)
continue
curr_bad = None
curr_panel = None
if path[0].startswith("bad"):
if path[0] in detector['bad']:
curr_bad = detector['bad'][path[0]]
else:
Valerio Mariani
committed
curr_bad = copy.deepcopy(default_bad_region)
detector['bad'][path[0]] = curr_bad
else:
if path[0] in detector['panels']:
curr_panel = detector['panels'][path[0]]
else:
Valerio Mariani
committed
curr_panel = copy.deepcopy(default_panel)
detector['panels'][path[0]] = curr_panel
if curr_panel is not None:
Valerio Mariani
committed
_parse_field_for_panel(
key=path[1],
value=value,
panel=curr_panel
)
Valerio Mariani
committed
_parse_field_bad(
key=path[1],
value=value,
bad=curr_bad
)
raise RuntimeError("No panel descriptions in geometry file.")
num_placeholders_in_panels = None
for panel in detector['panels'].values():
if panel['dim_structure'] is not None:
curr_num_placeholders = panel['dim_structure'].values().count('%')
else:
curr_num_placeholders = 0
if num_placeholders_in_panels is None:
num_placeholders_in_panels = curr_num_placeholders
else:
if curr_num_placeholders != num_placeholders_in_panels:
Valerio Mariani
committed
raise RuntimeError(
"All panels\' data and mask entries must have the same "
"number of placeholders."
Valerio Mariani
committed
)
num_placeholders_in_masks = None
for panel in detector['panels'].values():
if panel['mask'] is not None:
curr_num_placeholders = panel['mask'].count('%')
else:
curr_num_placeholders = 0
if num_placeholders_in_masks is None:
num_placeholders_in_masks = curr_num_placeholders
else:
if curr_num_placeholders != num_placeholders_in_masks:
Valerio Mariani
committed
raise RuntimeError(
"All panels\' data and mask entries must have the same "
"number of placeholders."
Valerio Mariani
committed
)
if num_placeholders_in_masks > num_placeholders_in_panels:
Valerio Mariani
committed
raise RuntimeError(
"Number of placeholders in mask cannot be larger the number "
"than for data."
Valerio Mariani
committed
)
dim_length = None
for panel in detector['panels'].values():
if panel['dim_structure'] is None:
Valerio Mariani
committed
panel['dim_structure'] = copy.deepcopy(default_dim)
found_ss = False
found_fs = False
found_placeholder = False
for entry in panel['dim_structure']:
if entry is None:
Valerio Mariani
committed
raise RuntimeError(
"Not all dim entries are defined for all panels."
Valerio Mariani
committed
)
elif entry == 'ss':
if found_ss is True:
Valerio Mariani
committed
raise RuntimeError(
"Only one slow scan dim coordinate is allowed."
Valerio Mariani
committed
)
elif entry == 'fs':
if found_fs is True:
Valerio Mariani
committed
raise RuntimeError(
"Only one fast scan dim coordinate is allowed."
Valerio Mariani
committed
)
elif entry == '%':
if found_placeholder is True:
Valerio Mariani
committed
raise RuntimeError(
"Only one placeholder dim coordinate is allowed."
Valerio Mariani
committed
)
else:
found_placeholder = True
if dim_length is None:
dim_length = len(panel['dim_structure'])
elif dim_length != len(panel['dim_structure']):
Valerio Mariani
committed
raise RuntimeError(
"Number of dim coordinates must be the same for all panels."
Valerio Mariani
committed
)
Valerio Mariani
committed
raise RuntimeError(
"Number of dim coordinates must be at least two."
Valerio Mariani
committed
)
for panel in detector['panels'].values():
if panel['origin_min_fs'] < 0:
Valerio Mariani
committed
raise RuntimeError(
"Please specify the minimum fs coordinate for "
"panel {}.".format(panel['name'])
Valerio Mariani
committed
)
Valerio Mariani
committed
raise RuntimeError(
"Please specify the maximum fs coordinate for "
"panel {}.".format(panel['name'])
Valerio Mariani
committed
)
Valerio Mariani
committed
raise RuntimeError(
"Please specify the minimum ss coordinate for "
"panel {}.".format(panel['name'])
Valerio Mariani
committed
)
Valerio Mariani
committed
raise RuntimeError(
"Please specify the maximum ss coordinate for "
"panel {}.".format(panel['name'])
Valerio Mariani
committed
)
Valerio Mariani
committed
raise RuntimeError(
"Please specify the corner X coordinate for "
"panel {}.".format(panel['name'])
Valerio Mariani
committed
)
if panel['clen'] is None and panel['clen_from'] is None:
Valerio Mariani
committed
raise RuntimeError(
"Please specify the camera length for "
"panel {}.".format(panel['name'])
Valerio Mariani
committed
)
Valerio Mariani
committed
raise RuntimeError(
"Please specify the resolution or "
"panel {}.".format(panel['name'])
Valerio Mariani
committed
)
if panel['adu_per_eV'] is None and panel['adu_per_photon'] is None:
Valerio Mariani
committed
raise RuntimeError(
"Please specify either adu_per_eV or adu_per_photon "
"for panel {}.".format(panel['name'])
Valerio Mariani
committed
)
if panel['clen_for_centering'] is None and panel['rail_x'] is not None:
Valerio Mariani
committed
raise RuntimeError(
"You must specify clen_for_centering if you specify the "
"rail direction (panel {})".format(panel['name'])
Valerio Mariani
committed
)
if panel['rail_x'] is None:
panel['rail_x'] = 0.0
panel['rail_y'] = 0.0
panel['rail_z'] = 1.0
if panel['clen_for_centering'] is None:
panel['clen_for_centering'] = 0.0
panel['w'] = panel['origin_max_fs'] - panel['origin_min_fs'] + 1
panel['h'] = panel['origin_max_ss'] - panel['origin_min_ss'] + 1
for bad_region in detector['bad'].values():
if bad_region['is_fsss'] == 99:
Valerio Mariani
committed
raise RuntimeError(
"Please specify the coordinate ranges for bad "
"region {}.".format(bad_region['name'])
Valerio Mariani
committed
)
for name in detector['rigid_groups'][group]:
if name not in detector['panels']:
Valerio Mariani
committed
raise RuntimeError(
"Cannot add panel to rigid_group. Panel not "
"found: {}".format(name)
Valerio Mariani
committed
)
for group_collection in detector['rigid_group_collections']:
for name in detector['rigid_group_collections'][group_collection]:
if name not in detector['rigid_groups']:
Valerio Mariani
committed
raise RuntimeError(
"Cannot add rigid_group to collection. Rigid group "
"not found: {}".format(name)
Valerio Mariani
committed
)
for panel in detector['panels'].values():
Valerio Mariani
committed
d__ = panel['fsx'] * panel['ssy'] - panel['ssx'] * panel['fsy']
if d__ == 0.0:
raise RuntimeError("Panel {} transformation is singluar.")
Valerio Mariani
committed
panel['xfs'] = panel['ssy'] / d__
panel['yfs'] = panel['ssx'] / d__
panel['xss'] = panel['fsy'] / d__
panel['yss'] = panel['fsx'] / d__
Valerio Mariani
committed
fhandle.close()
Valerio Mariani
committed
# The code of this function is synced with the code of the function
# 'get_detector_geometry_2' in CrystFEL at commit
# 41a8fa9819010fe8ddeb66676fee717f5226c7b8