Newer
Older
# This file is part of cfelpyutils.
#
# cfelpyutils is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cfelpyutils is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cfelpyutils. If not, see <http://www.gnu.org/licenses/>.
"""
Utilities for interoperability with data formats used in the CrystFEL
software package.
This module currently contains a python reimplementation of the
get_detector_geometry_2 function from CrystFEL.
from __future__ import (absolute_import, division, print_function,
unicode_literals)
Valerio Mariani
committed
import collections
import copy
import math
def _assplode_algebraic(value):
Valerio Mariani
committed
# Reimplementation of assplode_algegraic from
# libcrystfel/src/detector.c.
items = [
item for item in re.split(
pattern='([+-])', string=value.strip()
) if item != ''
]
Valerio Mariani
committed
items.insert(index=0, object='+')
Valerio Mariani
committed
return [
''.join(
iterable=(items[x], items[x + 1])
) for x in range(start=0, stop=len(items), step=2)
]
def _dir_conv(direction_x, direction_y, direction_z, value):
Valerio Mariani
committed
# Reimplementation of dir_conv from
# libcrystfel/src/detector.c.
direction = [
direction_x, direction_y, direction_z
]
items = _assplode_algebraic(value)
raise RuntimeError('Invalid direction: {}.'.format(value))
for item in items:
axis = item[-1]
if axis != 'x' and axis != 'y' and axis != 'z':
Valerio Mariani
committed
raise RuntimeError(
'Invalid Symbol: {} (must be x, y or z).'.format(axis)
)
def _set_dim_structure_entry(key, value, panel):
Valerio Mariani
committed
# Reimplementation of set_dim_structure_entry from
# libcrystfel/src/events.c.
if panel['dim_structure'] is not None:
dim = panel['dim_structure']
else:
dim = []
dim_index = int(key[3])
if dim_index > len(dim) - 1:
Valerio Mariani
committed
for _ in range(start=len(dim), stop=dim_index + 1):
dim.append(None)
if value == 'ss' or value == 'fs' or value == '%':
dim[dim_index] = value
elif value.isdigit():
dim[dim_index] = int(value)
else:
raise RuntimeError('Invalid dim entry: {}.'.format(value))
def _parse_field_for_panel(key, value, panel):
Valerio Mariani
committed
# Reimplementation of parse_field_for_panel from
# libcrystfel/src/detector.c.
if key == 'min_fs':
panel['origin_min_fs'] = int(value)
panel['min_fs'] = int(value)
elif key == 'max_fs':
panel['origin_max_fs'] = int(value)
panel['max_fs'] = int(value)
elif key == 'min_ss':
panel['origin_min_ss'] = int(value)
panel['min_ss'] = int(value)
elif key == 'max_ss':
panel['origin_max_ss'] = int(value)
panel['max_ss'] = int(value)
elif key == 'corner_x':
panel['cnx'] = float(value)
elif key == 'corner_y':
panel['cny'] = float(value)
elif key == 'rail_direction':
try:
Valerio Mariani
committed
panel['rail_x'], panel['rail_y'], panel['rail_z'] = _dir_conv(
direction_x=panel['rail_x'],
direction_y=panel['rail_y'],
direction_z=panel['rail_z'],
value=value
)
except RuntimeError as exc:
raise RuntimeError('Invalid rail direction. ', exc)
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
elif key == 'clen_for_centering':
panel['clen_for_centering'] = float(value)
elif key == 'adu_per_eV':
panel['adu_per_eV'] = float(value)
elif key == 'adu_per_photon':
panel['adu_per_photon'] = float(value)
elif key == 'rigid_group':
panel['rigid_group'] = value
elif key == 'clen':
try:
panel['clen'] = float(value)
panel['clen_from'] = None
except ValueError:
panel['clen'] = -1
panel['clen_from'] = value
elif key == 'data':
if not value.startswith('/'):
raise RuntimeError('Invalid data location: {}'.format(value))
panel['data'] = value
elif key == 'mask':
if not value.startswith('/'):
raise RuntimeError('Invalid data location: {}'.format(value))
panel['mask'] = value
elif key == 'mask_file':
panel['mask_file'] = value
elif key == 'saturation_map':
panel['saturation_map'] = value
elif key == 'saturation_map_file':
panel['saturation_map_file'] = value
elif key == 'coffset':
panel['coffset'] = float(value)
elif key == 'res':
panel['res'] = float(value)
elif key == 'max_adu':
panel['max_adu'] = value
elif key == 'badrow_direction':
if value == 'x':
panel['badrow'] = 'f'
elif value == 'y':
panel['badrow'] = 's'
elif value == 'f':
panel['badrow'] = 'f'
elif value == 's':
panel['badrow'] = 's'
elif value == '-':
panel['badrow'] = '-'
print('badrow_direction must be x, t, f, s, or \'-\'')
print('Assuming \'-\'.')
panel['badrow'] = '-'
elif key == 'no_index':
panel['no_index'] = bool(value)
Valerio Mariani
committed
panel['fsx'], panel['fsy'], panel['fsz'] = _dir_conv(
direction_x=panel['fsx'],
direction_y=panel['fsy'],
direction_z=panel['fsz'],
value=value
)
Valerio Mariani
committed
except RuntimeError as exc:
raise RuntimeError('Invalid fast scan direction. ', exc)
Valerio Mariani
committed
panel['ssx'], panel['ssy'], panel['ssz'] = _dir_conv(
direction_x=panel['ssx'],
direction_y=panel['ssy'],
direction_z=panel['ssz'],
value=value
)
Valerio Mariani
committed
except RuntimeError as exc:
raise RuntimeError('Invalid slow scan direction. ', exc)
Valerio Mariani
committed
_set_dim_structure_entry(
key=key,
value=value,
panel=panel
)
else:
raise RuntimeError('Unrecognised field: {}'.format(key))
Valerio Mariani
committed
def _parse_toplevel(key, value, detector, beam, panel):
# Reimplementation of parse_toplevel from
# libcrystfel/src/detector.c.
if key == 'mask_bad':
try:
detector['mask_bad'] = int(value)
except ValueError:
Valerio Mariani
committed
detector['mask_bad'] = int(x=value, base=16)
elif key == 'mask_good':
try:
detector['mask_good'] = int(value)
except ValueError:
Valerio Mariani
committed
detector['mask_good'] = int(x=value, base=16)
elif key == 'coffset':
panel['coffset'] = float(value)
elif key == 'photon_energy':
if value.startswith('/'):
beam['photon_energy'] = 0.0
beam['photon_energy_from'] = value
beam['photon_energy'] = float(value)
beam['photon_energy_from'] = None
elif key == 'photon_energy_scale':
beam['photon_energy_scale'] = float(value)
elif key == 'peak_info_location':
detector['peak_info_location'] = value
Valerio Mariani
committed
elif (
key.startswith('rigid_group') and not
key.startswith('rigid_group_collection')
):
detector['rigid_groups'][key[12:]] = value.split(',')
elif key.startswith('rigid_group_collection'):
detector['rigid_group_collections'][key[23:]] = value.split(',')
Valerio Mariani
committed
_parse_field_for_panel(
key=key,
value=value,
panel=panel
)
def _check_bad_fsss(bad_region, is_fsss):
Valerio Mariani
committed
# Reimplementation of check_bad_fsss from
# libcrystfel/src/detector.c.
if bad_region['is_fsss'] == 99:
bad_region['is_fsss'] = is_fsss
return
if is_fsss != bad_region['is_fsss']:
raise RuntimeError("You can't mix x/y and fs/ss in a bad region")
return
def _parse_field_bad(key, value, bad):
Valerio Mariani
committed
# Reimplementation of parse_field_bad from
# libcrystfel/src/detector.c.
Valerio Mariani
committed
bad['min_x'] = float(x=value)
_check_bad_fsss(bad_region=bad, is_fsss=False)
Valerio Mariani
committed
bad['max_x'] = float(x=value)
_check_bad_fsss(bad_region=bad, is_fsss=False)
Valerio Mariani
committed
bad['min_y'] = float(x=value)
_check_bad_fsss(bad_region=bad, is_fsss=False)
Valerio Mariani
committed
bad['max_y'] = float(x=value)
_check_bad_fsss(bad_region=bad, is_fsss=False)
Valerio Mariani
committed
bad['min_fs'] = int(x=value)
_check_bad_fsss(bad_region=bad, is_fsss=True)
Valerio Mariani
committed
bad['max_fs'] = int(x=value)
_check_bad_fsss(bad_region=bad, is_fsss=True)
Valerio Mariani
committed
bad['min_ss'] = int(x=value)
_check_bad_fsss(bad_region=bad, is_fsss=True)
Valerio Mariani
committed
bad['max_ss'] = int(x=value)
_check_bad_fsss(bad_region=bad, is_fsss=True)
elif key == 'panel':
bad['panel'] = value
else:
raise RuntimeError('Unrecognised field: {}'.format(key))
return
Valerio Mariani
committed
def _check_point(name, panel, fs_, ss_, min_d, max_d, detector):
# Reimplementation of check_point from
# libcrystfel/src/detector.c.
xs_ = fs_ * panel['fsx'] + ss_ * panel['ssx']
ys_ = fs_ * panel['fsy'] + ss_ * panel['ssy']
Valerio Mariani
committed
rx_ = (xs_ + panel['cnx']) / panel['res']
ry_ = (ys_ + panel['cny']) / panel['res']
Valerio Mariani
committed
dist = math.sqrt(x=rx_ * rx_ + ry_ * ry_)
if dist > max_d:
detector['furthest_out_panel'] = name
Valerio Mariani
committed
detector['furthest_out_fs'] = fs_
detector['furthest_out_ss'] = ss_
max_d = dist
elif dist < min_d:
detector['furthest_in_panel'] = name
Valerio Mariani
committed
detector['furthest_in_fs'] = fs_
detector['furthest_in_ss'] = ss_
min_d = dist
return min_d, max_d
def _find_min_max_d(detector):
Valerio Mariani
committed
# Reimplementation of find_min_max_d from
# libcrystfel/src/detector.c.
max_d = 0.0
for name, panel in detector['panels'].items():
Valerio Mariani
committed
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
min_d, max_d = _check_point(
name=name,
panel=panel,
fs_=0,
ss_=0,
min_d=min_d,
max_d=max_d,
detector=detector
)
min_d, max_d = _check_point(
name=name,
panel=panel,
fs_=panel['w'],
ss_=0,
min_d=min_d,
max_d=max_d,
detector=detector
)
min_d, max_d = _check_point(
name=name,
panel=panel,
fs_=0,
ss_=panel['h'],
min_d=min_d,
max_d=max_d,
detector=detector
)
min_d, max_d = _check_point(
name=name,
panel=panel,
fs_=panel['w'],
ss_=panel['h'],
min_d=min_d,
max_d=max_d,
detector=detector
)
def load_crystfel_geometry(filename):
Valerio Mariani
committed
"""Load a CrystFEL geometry file into a dictionary.
Valerio Mariani
committed
Reimplementation of the get_detector_geometry_2 function from CrystFEL
in python. Return a dictionary with the geometry information read from
the file. Convert entries in the geometry file to keys in the
returned dictionary. For a full documentation on the CrystFEL geometry
format, see:
http://www.desy.de/~twhite/crystfel/manual-crystfel_geometry.html
The code of this function is synced with the code of the function
Valerio Mariani
committed
'get_detector_geometry_2' in CrystFEL at commit
41a8fa9819010fe8ddeb66676fee717f5226c7b8.
Valerio Mariani
committed
filename (str): filename of the geometry file.
dict: dictionary with the geometry information loaded from the file.
Valerio Mariani
committed
fhandle = open(file=filename, mode='r')
beam = {
'photon_energy': 0.0,
'photon_energy_from': None,
'photon_energy_scale': 1
}
detector = {
Valerio Mariani
committed
'panels': collections.OrderedDict(),
'bad': collections.OrderedDict(),
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
'mask_good': 0,
'mask_bad': 0,
'rigid_groups': {},
'rigid_group_collections': {}
}
default_panel = {
'cnx': None,
'cny': None,
'clen': None,
'coffset': 0.0,
'res': -1.0,
'badrow': '-',
'no_index': False,
'fsx': 1.0,
'fsy': 0.0,
'fsz': 0.0,
'ssx': 0.0,
'ssy': 1.0,
'ssz': 0.0,
'rail_x': None,
'rail_y': None,
'rail_z': None,
'clen_for_centering': None,
'adu_per_eV': None,
'adu_per_photon': None,
Valerio Mariani
committed
'max_adu': float(x='inf'),
'mask': None,
'mask_file': None,
'satmap': None,
'satmap_file': None,
'data': None,
'dim_structure': None,
'name': ''
}
default_bad_region = {
'min_x': None,
'max_x': None,
'min_y': None,
'max_y': None,
'min_fs': 0,
'max_fs': 0,
'min_ss': 0,
'max_ss': 0,
'is_fsss': 99,
'name': ''
}
default_dim = ['ss', 'fs']
Valerio Mariani
committed
fhlines = fhandle.readlines()
for line in fhlines:
if line.startswith(';'):
continue
Valerio Mariani
committed
line_without_comments = line.strip().split(sep=';')[0]
line_items = re.split(pattern='([ \t])', string=line_without_comments)
line_items = [
item for item in line_items if item not in ('', ' ', '\t')
]
Valerio Mariani
committed
if len(s=line_items) < 3:
continue
value = ''.join(line_items[2:])
if line_items[1] != '=':
continue
path = re.split('(/)', line_items[0])
path = [item for item in path if item not in '/']
Valerio Mariani
committed
if len(s=path) < 2:
_parse_toplevel(
key=line_items[0],
value=value,
detector=detector,
beam=beam,
panel=default_panel
)
continue
curr_bad = None
curr_panel = None
if path[0].startswith('bad'):
if path[0] in detector['bad']:
curr_bad = detector['bad'][path[0]]
else:
Valerio Mariani
committed
curr_bad = copy.deepcopy(default_bad_region)
detector['bad'][path[0]] = curr_bad
else:
if path[0] in detector['panels']:
curr_panel = detector['panels'][path[0]]
else:
Valerio Mariani
committed
curr_panel = copy.deepcopy(default_panel)
detector['panels'][path[0]] = curr_panel
if curr_panel is not None:
Valerio Mariani
committed
_parse_field_for_panel(
key=path[1],
value=value,
panel=curr_panel
)
Valerio Mariani
committed
_parse_field_bad(
key=path[1],
value=value,
bad=curr_bad
)
raise RuntimeError("No panel descriptions in geometry file.")
num_placeholders_in_panels = None
for panel in detector['panels'].values():
if panel['dim_structure'] is not None:
curr_num_placeholders = panel['dim_structure'].values().count('%')
else:
curr_num_placeholders = 0
if num_placeholders_in_panels is None:
num_placeholders_in_panels = curr_num_placeholders
else:
if curr_num_placeholders != num_placeholders_in_panels:
Valerio Mariani
committed
raise RuntimeError(
'All panels\' data and mask entries must have the same '
'number of placeholders.'
)
num_placeholders_in_masks = None
for panel in detector['panels'].values():
if panel['mask'] is not None:
curr_num_placeholders = panel['mask'].count('%')
else:
curr_num_placeholders = 0
if num_placeholders_in_masks is None:
num_placeholders_in_masks = curr_num_placeholders
else:
if curr_num_placeholders != num_placeholders_in_masks:
Valerio Mariani
committed
raise RuntimeError(
'All panels\' data and mask entries must have the same '
'number of placeholders.'
)
if num_placeholders_in_masks > num_placeholders_in_panels:
Valerio Mariani
committed
raise RuntimeError(
'Number of placeholders in mask cannot be larger the number '
'than for data.'
)
dim_length = None
for panel in detector['panels'].values():
if panel['dim_structure'] is None:
Valerio Mariani
committed
panel['dim_structure'] = copy.deepcopy(default_dim)
found_ss = False
found_fs = False
found_placeholder = False
for entry in panel['dim_structure']:
if entry is None:
Valerio Mariani
committed
raise RuntimeError(
'Not all dim entries are defined for all panels.'
)
elif entry == 'ss':
if found_ss is True:
Valerio Mariani
committed
raise RuntimeError(
'Only one slow scan dim coordinate is allowed.'
)
else:
found_ss = True
elif entry == 'fs':
if found_fs is True:
Valerio Mariani
committed
raise RuntimeError(
'Only one fast scan dim coordinate is allowed.'
)
else:
found_fs = True
elif entry == '%':
if found_placeholder is True:
Valerio Mariani
committed
raise RuntimeError(
'Only one placeholder dim coordinate is allowed.'
)
else:
found_placeholder = True
if dim_length is None:
dim_length = len(panel['dim_structure'])
elif dim_length != len(panel['dim_structure']):
Valerio Mariani
committed
raise RuntimeError(
'Number of dim coordinates must be the same for all panels.'
)
Valerio Mariani
committed
raise RuntimeError(
'Number of dim coordinates must be at least two.'
)
Valerio Mariani
committed
raise RuntimeError(
'Please specify the minimum fs coordinate for '
'panel {}.'.format(panel['name'])
)
Valerio Mariani
committed
raise RuntimeError(
'Please specify the maximum fs coordinate for '
'panel {}.'.format(panel['name'])
)
Valerio Mariani
committed
raise RuntimeError(
'Please specify the minimum ss coordinate for '
'panel {}.'.format(panel['name'])
)
Valerio Mariani
committed
raise RuntimeError(
'Please specify the maximum ss coordinate for '
'panel {}.'.format(panel['name'])
)
Valerio Mariani
committed
raise RuntimeError(
'Please specify the corner X coordinate for '
'panel {}.'.format(panel['name'])
)
if panel['clen'] is None and panel['clen_from'] is None:
Valerio Mariani
committed
raise RuntimeError(
'Please specify the camera length for '
'panel {}.'.format(panel['name'])
)
Valerio Mariani
committed
raise RuntimeError(
'Please specify the resolution or '
'panel {}.'.format(panel['name'])
)
if panel['adu_per_eV'] is None and panel['adu_per_photon'] is None:
Valerio Mariani
committed
raise RuntimeError(
'Please specify either adu_per_eV or adu_per_photon '
'for panel {}.'.format(panel['name'])
)
if panel['clen_for_centering'] is None and panel['rail_x'] is not None:
Valerio Mariani
committed
raise RuntimeError(
'You must specify clen_for_centering if you specify the'
'rail direction (panel {})'.format(panel['name'])
)
if panel['rail_x'] is None:
panel['rail_x'] = 0.0
panel['rail_y'] = 0.0
panel['rail_z'] = 1.0
if panel['clen_for_centering'] is None:
panel['clen_for_centering'] = 0.0
panel['w'] = panel['origin_max_fs'] - panel['origin_min_fs'] + 1
panel['h'] = panel['origin_max_ss'] - panel['origin_min_ss'] + 1
for bad_region in detector['bad'].values():
if bad_region['is_fsss'] == 99:
Valerio Mariani
committed
raise RuntimeError(
'Please specify the coordinate ranges for bad '
'region {}.'.format(bad_region['name'])
)
for name in detector['rigid_groups'][group]:
if name not in detector['panels']:
Valerio Mariani
committed
raise RuntimeError(
'Cannot add panel to rigid_group. Panel not'
'found: {}'.format(name)
)
for group_collection in detector['rigid_group_collections']:
for name in detector['rigid_group_collections'][group_collection]:
if name not in detector['rigid_groups']:
Valerio Mariani
committed
raise RuntimeError(
'Cannot add rigid_group to collection. Rigid group '
'not found: {}'.format(name)
)
for panel in detector['panels'].values():
Valerio Mariani
committed
d__ = panel['fsx'] * panel['ssy'] - panel['ssx'] * panel['fsy']
Valerio Mariani
committed
if d__ == 0.0:
raise RuntimeError('Panel {} transformation is singluar.')
Valerio Mariani
committed
panel['xfs'] = panel['ssy'] / d__
panel['yfs'] = panel['ssx'] / d__
panel['xss'] = panel['fsy'] / d__
panel['yss'] = panel['fsx'] / d__
Valerio Mariani
committed
fhandle.close()