Skip to content
Snippets Groups Projects
crystfel_utils.py 22.4 KiB
Newer Older
#    This file is part of cfelpyutils.
#
#    cfelpyutils is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    cfelpyutils is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with cfelpyutils.  If not, see <http://www.gnu.org/licenses/>.
"""
Utilities for interoperability with data formats used in the CrystFEL
software package.
Exports:

    Functions:

        load_crystfel_geometry: a python reimplementation of the
            get_detector_geometry_2 function from CrystFEL.
Valerio Mariani's avatar
Valerio Mariani committed
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
Valerio Mariani's avatar
Valerio Mariani committed
import re
def _assplode_algebraic(value):
    # Reimplementation of assplode_algegraic from
    # libcrystfel/src/detector.c.
    items = [
        for item in re.split('([+-])', string=value.strip())
Valerio Mariani's avatar
Valerio Mariani committed
    if items and items[0] not in ('+', '-'):
        ''.join((items[x], items[x + 1]))
        for x in range(0, len(items), 2)
def _dir_conv(direction_x, direction_y, direction_z, value):
    # Reimplementation of dir_conv from
    # libcrystfel/src/detector.c.
    direction = [
        direction_x,
        direction_y,
        direction_z
    items = _assplode_algebraic(value)
    if not items:
        raise RuntimeError("Invalid direction: {}.".format(value))
    for item in items:
        axis = item[-1]
        if axis != 'x' and axis != 'y' and axis != 'z':
                "Invalid Symbol: {} (must be x, y or z).".format(axis)
        if item[:-1] == '+':
            value = '1.0'
        elif item[:-1] == '-':
            value = '-1.0'
            value = item[:-1]
        if axis == 'x':
            direction[0] = float(value)
        elif axis == 'y':
            direction[1] = float(value)
        elif axis == 'z':
            direction[2] = float(value)
    return direction
def _set_dim_structure_entry(key, value, panel):
    # Reimplementation of set_dim_structure_entry from
    # libcrystfel/src/events.c.
    if panel['dim_structure'] is not None:
        dim = panel['dim_structure']
    else:
        dim = []
    dim_index = int(key[3])
    if dim_index > len(dim) - 1:
        for _ in range(len(dim), dim_index + 1):
            dim.append(None)

    if value == 'ss' or value == 'fs' or value == '%':
        dim[dim_index] = value
    elif value.isdigit():
        dim[dim_index] = int(value)
    else:
        raise RuntimeError("Invalid dim entry: {}.".format(value))


def _parse_field_for_panel(key, value, panel):
    # Reimplementation of parse_field_for_panel from
    # libcrystfel/src/detector.c.
    if key == 'min_fs':
        panel['origin_min_fs'] = int(value)
        panel['min_fs'] = int(value)
    elif key == 'max_fs':
        panel['origin_max_fs'] = int(value)
        panel['max_fs'] = int(value)
    elif key == 'min_ss':
        panel['origin_min_ss'] = int(value)
        panel['min_ss'] = int(value)
    elif key == 'max_ss':
        panel['origin_max_ss'] = int(value)
        panel['max_ss'] = int(value)
    elif key == 'corner_x':
        panel['cnx'] = float(value)
    elif key == 'corner_y':
        panel['cny'] = float(value)
    elif key == 'rail_direction':
        try:
            panel['rail_x'], panel['rail_y'], panel['rail_z'] = _dir_conv(
                direction_x=panel['rail_x'],
                direction_y=panel['rail_y'],
                direction_z=panel['rail_z'],
                value=value
            )
        except RuntimeError as exc:
            raise RuntimeError("Invalid rail direction. ", exc)

    elif key == 'clen_for_centering':
        panel['clen_for_centering'] = float(value)
    elif key == 'adu_per_eV':
        panel['adu_per_eV'] = float(value)
    elif key == 'adu_per_photon':
        panel['adu_per_photon'] = float(value)
    elif key == 'rigid_group':
        panel['rigid_group'] = value
    elif key == 'clen':
        try:
            panel['clen'] = float(value)
            panel['clen_from'] = None
        except ValueError:
            panel['clen'] = -1
            panel['clen_from'] = value

    elif key == 'data':
        if not value.startswith("/"):
            raise RuntimeError("Invalid data location: {}".format(value))
        panel['data'] = value

    elif key == 'mask':
        if not value.startswith("/"):
            raise RuntimeError("Invalid data location: {}".format(value))
        panel['mask'] = value

    elif key == 'mask_file':
        panel['mask_file'] = value
    elif key == 'saturation_map':
        panel['saturation_map'] = value
    elif key == 'saturation_map_file':
        panel['saturation_map_file'] = value
    elif key == 'coffset':
        panel['coffset'] = float(value)
    elif key == 'res':
        panel['res'] = float(value)
    elif key == 'max_adu':
        panel['max_adu'] = value
    elif key == 'badrow_direction':
        if value == 'x':
            panel['badrow'] = 'f'
        elif value == 'y':
            panel['badrow'] = 's'
        elif value == 'f':
            panel['badrow'] = 'f'
        elif value == 's':
            panel['badrow'] = 's'
        elif value == '-':
            panel['badrow'] = '-'
            print("badrow_direction must be x, t, f, s, or \'-\'")
            print("Assuming \'-\'.")
            panel['badrow'] = '-'
    elif key == 'no_index':
        panel['no_index'] = bool(value)
    elif key == 'fs':
        try:
            panel['fsx'], panel['fsy'], panel['fsz'] = _dir_conv(
                direction_x=panel['fsx'],
                direction_y=panel['fsy'],
                direction_z=panel['fsz'],
                value=value
            )
        except RuntimeError as exc:
            raise RuntimeError("Invalid fast scan direction.", exc)
    elif key == 'ss':
        try:
            panel['ssx'], panel['ssy'], panel['ssz'] = _dir_conv(
                direction_x=panel['ssx'],
                direction_y=panel['ssy'],
                direction_z=panel['ssz'],
                value=value
            )
        except RuntimeError as exc:
            raise RuntimeError("Invalid slow scan direction.", exc)
    elif key.startswith("dim"):
        _set_dim_structure_entry(
            key=key,
            value=value,
            panel=panel
        )
        raise RuntimeError("Unrecognised field: {}".format(key))
def _parse_toplevel(key, value, detector, beam, panel):
    # Reimplementation of parse_toplevel from
    # libcrystfel/src/detector.c.
    if key == 'mask_bad':
        try:
            detector['mask_bad'] = int(value)
        except ValueError:
            detector['mask_bad'] = int(x=value, base=16)
    elif key == 'mask_good':
        try:
            detector['mask_good'] = int(value)
        except ValueError:
            detector['mask_good'] = int(x=value, base=16)
    elif key == 'coffset':
        panel['coffset'] = float(value)
    elif key == 'photon_energy':
        if value.startswith('/'):
            beam['photon_energy'] = 0.0
            beam['photon_energy_from'] = value
            beam['photon_energy'] = float(value)
            beam['photon_energy_from'] = None
    elif key == 'photon_energy_scale':
        beam['photon_energy_scale'] = float(value)
    elif key == 'peak_info_location':
        detector['peak_info_location'] = value
            key.startswith("rigid_group") and not
            key.startswith("rigid_group_collection")
        detector['rigid_groups'][key[12:]] = value.split(',')
    elif key.startswith("rigid_group_collection"):
        detector['rigid_group_collections'][key[23:]] = value.split(',')
    else:
        _parse_field_for_panel(
            key=key,
            value=value,
            panel=panel
        )
def _check_bad_fsss(bad_region, is_fsss):
    # Reimplementation of check_bad_fsss from
    # libcrystfel/src/detector.c.
    if bad_region['is_fsss'] == 99:
        bad_region['is_fsss'] = is_fsss
        return
    if is_fsss != bad_region['is_fsss']:
        raise RuntimeError("You can't mix x/y and fs/ss in a bad region")

    return


def _parse_field_bad(key, value, bad):
    # Reimplementation of parse_field_bad from
    # libcrystfel/src/detector.c.
    if key == 'min_x':
        bad['min_x'] = float(value)
        _check_bad_fsss(bad_region=bad, is_fsss=False)
    elif key == 'max_x':
        bad['max_x'] = float(value)
        _check_bad_fsss(bad_region=bad, is_fsss=False)
    elif key == 'min_y':
        bad['min_y'] = float(value)
        _check_bad_fsss(bad_region=bad, is_fsss=False)
    elif key == 'max_y':
        bad['max_y'] = float(value)
        _check_bad_fsss(bad_region=bad, is_fsss=False)
    elif key == 'min_fs':
        bad['min_fs'] = int(value)
        _check_bad_fsss(bad_region=bad, is_fsss=True)
    elif key == 'max_fs':
        bad['max_fs'] = int(value)
        _check_bad_fsss(bad_region=bad, is_fsss=True)
    elif key == 'min_ss':
        bad['min_ss'] = int(value)
        _check_bad_fsss(bad_region=bad, is_fsss=True)
    elif key == 'max_ss':
        bad['max_ss'] = int(value)
        _check_bad_fsss(bad_region=bad, is_fsss=True)
    elif key == 'panel':
        bad['panel'] = value
    else:
        raise RuntimeError("Unrecognised field: {}".format(key))
    # Reimplementation of check_point from
    # libcrystfel/src/detector.c.
    xs_ = fs_ * panel['fsx'] + ss_ * panel['ssx']
    ys_ = fs_ * panel['fsy'] + ss_ * panel['ssy']
    rx_ = (xs_ + panel['cnx']) / panel['res']
    ry_ = (ys_ + panel['cny']) / panel['res']
    dist = math.sqrt(rx_ * rx_ + ry_ * ry_)
    if dist > max_d:
        detector['furthest_out_panel'] = name
        detector['furthest_out_fs'] = fs_
        detector['furthest_out_ss'] = ss_
        max_d = dist
    elif dist < min_d:
        detector['furthest_in_panel'] = name
        detector['furthest_in_fs'] = fs_
        detector['furthest_in_ss'] = ss_
        min_d = dist

    return min_d, max_d


def _find_min_max_d(detector):
    # Reimplementation of find_min_max_d from
    # libcrystfel/src/detector.c.
    min_d = float('inf')
    max_d = 0.0
    for name, panel in detector['panels'].items():
        min_d, max_d = _check_point(
            name=name,
            panel=panel,
            min_d=min_d,
            max_d=max_d,
            detector=detector
        )

        min_d, max_d = _check_point(
            name=name,
            panel=panel,
        min_d, max_d = _check_point(
            name=name,
            panel=panel,
            min_d=min_d,
            max_d=max_d,
            detector=detector
        )

        min_d, max_d = _check_point(
            name=name,
            panel=panel,
def load_crystfel_geometry(filename):
    """
    Load a CrystFEL geometry file into a dictionary.
Valerio Mariani's avatar
Valerio Mariani committed

    Reimplementation of the get_detector_geometry_2 function from
    CrystFEL in python. Return a dictionary with the geometry
    information read from the file. Convert entries in the geometry
    file to keys in the returned dictionary. For a full documentation
    on the CrystFEL geometry format, see:
Valerio Mariani's avatar
Valerio Mariani committed

    http://www.desy.de/~twhite/crystfel/manual-crystfel_geometry.html
Valerio Mariani's avatar
Valerio Mariani committed

    The code of this function is synced with the code of the function
    'get_detector_geometry_2' in CrystFEL at commit
    41a8fa9819010fe8ddeb66676fee717f5226c7b8.

Valerio Mariani's avatar
Valerio Mariani committed

        filename (str): filename of the geometry file.
Valerio Mariani's avatar
Valerio Mariani committed

Valerio Mariani's avatar
Valerio Mariani committed

        Dict: dictionary with the geometry information loaded from the
    beam = {
        'photon_energy': 0.0,
        'photon_energy_from': None,
        'photon_energy_scale': 1
    }

    detector = {
        'panels': collections.OrderedDict(),
        'bad': collections.OrderedDict(),
        'mask_good': 0,
        'mask_bad': 0,
        'rigid_groups': {},
        'rigid_group_collections': {}
    }

    default_panel = {
        'cnx': None,
        'cny': None,
        'clen': None,
        'coffset': 0.0,
        'res': -1.0,
        'badrow': '-',
        'no_index': False,
        'fsx': 1.0,
        'fsy': 0.0,
        'fsz': 0.0,
        'ssx': 0.0,
        'ssy': 1.0,
        'ssz': 0.0,
        'rail_x': None,
        'rail_y': None,
        'rail_z': None,
        'clen_for_centering': None,
        'adu_per_eV': None,
        'adu_per_photon': None,
        'mask': None,
        'mask_file': None,
        'satmap': None,
        'satmap_file': None,
        'data': None,
        'dim_structure': None,
        'name': ''
    }

    default_bad_region = {
        'min_x': None,
        'max_x': None,
        'min_y': None,
        'max_y': None,
        'min_fs': 0,
        'max_fs': 0,
        'min_ss': 0,
        'max_ss': 0,
        'is_fsss': 99,
        'name': ''
    }

    for line in fhlines:
        if line.startswith(";"):
        line_without_comments = line.strip().split(';')[0]
        line_items = re.split(pattern='([ \t])', string=line_without_comments)
        line_items = [
            item
            for item in line_items
            if item not in ('', ' ', '\t')
            continue

        value = ''.join(line_items[2:])
        if line_items[1] != '=':
            continue

        path = re.split('(/)', line_items[0])
        path = [item for item in path if item not in '/']
            _parse_toplevel(
                key=line_items[0],
                value=value,
                detector=detector,
                beam=beam,
                panel=default_panel
            )
            continue

        curr_bad = None
        curr_panel = None
        if path[0].startswith("bad"):
            if path[0] in detector['bad']:
                curr_bad = detector['bad'][path[0]]
            else:
                curr_bad = copy.deepcopy(default_bad_region)
                detector['bad'][path[0]] = curr_bad

        else:
            if path[0] in detector['panels']:
                curr_panel = detector['panels'][path[0]]
            else:
                curr_panel = copy.deepcopy(default_panel)
                detector['panels'][path[0]] = curr_panel

        if curr_panel is not None:
            _parse_field_for_panel(
                key=path[1],
                value=value,
                panel=curr_panel
            )
            _parse_field_bad(
                key=path[1],
                value=value,
                bad=curr_bad
            )
Valerio Mariani's avatar
Valerio Mariani committed
    if not detector['panels']:
        raise RuntimeError("No panel descriptions in geometry file.")

    num_placeholders_in_panels = None
    for panel in detector['panels'].values():
        if panel['dim_structure'] is not None:
            curr_num_placeholders = panel['dim_structure'].values().count('%')
        else:
            curr_num_placeholders = 0

        if num_placeholders_in_panels is None:
            num_placeholders_in_panels = curr_num_placeholders
        else:
            if curr_num_placeholders != num_placeholders_in_panels:
                    "All panels\' data and mask entries must have the same "
                    "number of placeholders."

    num_placeholders_in_masks = None
    for panel in detector['panels'].values():
        if panel['mask'] is not None:
            curr_num_placeholders = panel['mask'].count('%')
        else:
            curr_num_placeholders = 0

        if num_placeholders_in_masks is None:
            num_placeholders_in_masks = curr_num_placeholders
        else:
            if curr_num_placeholders != num_placeholders_in_masks:
                    "All panels\' data and mask entries must have the same "
                    "number of placeholders."

    if num_placeholders_in_masks > num_placeholders_in_panels:
            "Number of placeholders in mask cannot be larger the number "
            "than for data."

    dim_length = None
    for panel in detector['panels'].values():
        if panel['dim_structure'] is None:
            panel['dim_structure'] = copy.deepcopy(default_dim)

        found_ss = False
        found_fs = False
        found_placeholder = False
        for entry in panel['dim_structure']:
            if entry is None:
                    "Not all dim entries are defined for all panels."
            elif entry == 'ss':
                if found_ss is True:
                        "Only one slow scan dim coordinate is allowed."
                else:
                    found_ss = True
            elif entry == 'fs':
                if found_fs is True:
                        "Only one fast scan dim coordinate is allowed."
                else:
                    found_fs = True
            elif entry == '%':
                if found_placeholder is True:
                        "Only one placeholder dim coordinate is allowed."
                else:
                    found_placeholder = True

        if dim_length is None:
            dim_length = len(panel['dim_structure'])
        elif dim_length != len(panel['dim_structure']):
                "Number of dim coordinates must be the same for all panels."
        if dim_length == 1:
                "Number of dim coordinates must be at least two."
Valerio Mariani's avatar
Valerio Mariani committed
    for panel in detector['panels'].values():
        if panel['origin_min_fs'] < 0:
                "Please specify the minimum fs coordinate for "
                "panel {}.".format(panel['name'])
Valerio Mariani's avatar
Valerio Mariani committed
        if panel['origin_max_fs'] < 0:
                "Please specify the maximum fs coordinate for "
                "panel {}.".format(panel['name'])
Valerio Mariani's avatar
Valerio Mariani committed
        if panel['origin_min_ss'] < 0:
                "Please specify the minimum ss coordinate for "
                "panel {}.".format(panel['name'])
Valerio Mariani's avatar
Valerio Mariani committed
        if panel['origin_max_ss'] < 0:
                "Please specify the maximum ss coordinate for "
                "panel {}.".format(panel['name'])

        if panel['cnx'] is None:
                "Please specify the corner X coordinate for "
                "panel {}.".format(panel['name'])

        if panel['clen'] is None and panel['clen_from'] is None:
                "Please specify the camera length for "
                "panel {}.".format(panel['name'])

        if panel['res'] < 0:
                "Please specify the resolution or "
                "panel {}.".format(panel['name'])

        if panel['adu_per_eV'] is None and panel['adu_per_photon'] is None:
                "Please specify either adu_per_eV or adu_per_photon "
                "for panel {}.".format(panel['name'])

        if panel['clen_for_centering'] is None and panel['rail_x'] is not None:
                "You must specify clen_for_centering if you specify the "
                "rail direction (panel {})".format(panel['name'])

        if panel['rail_x'] is None:
            panel['rail_x'] = 0.0
            panel['rail_y'] = 0.0
            panel['rail_z'] = 1.0

        if panel['clen_for_centering'] is None:
            panel['clen_for_centering'] = 0.0

        panel['w'] = panel['origin_max_fs'] - panel['origin_min_fs'] + 1
        panel['h'] = panel['origin_max_ss'] - panel['origin_min_ss'] + 1

    for bad_region in detector['bad'].values():
        if bad_region['is_fsss'] == 99:
                "Please specify the coordinate ranges for bad "
                "region {}.".format(bad_region['name'])
Valerio Mariani's avatar
Valerio Mariani committed
    for group in detector['rigid_groups']:
        for name in detector['rigid_groups'][group]:
            if name not in detector['panels']:
                    "Cannot add panel to rigid_group. Panel not "
                    "found: {}".format(name)
Valerio Mariani's avatar
Valerio Mariani committed
    for group_collection in detector['rigid_group_collections']:
        for name in detector['rigid_group_collections'][group_collection]:
            if name not in detector['rigid_groups']:
                    "Cannot add rigid_group to collection. Rigid group "
                    "not found: {}".format(name)

    for panel in detector['panels'].values():
        d__ = panel['fsx'] * panel['ssy'] - panel['ssx'] * panel['fsy']
        if d__ == 0.0:
            raise RuntimeError("Panel {} transformation is singluar.")
        panel['xfs'] = panel['ssy'] / d__
        panel['yfs'] = panel['ssx'] / d__
        panel['xss'] = panel['fsy'] / d__
        panel['yss'] = panel['fsx'] / d__
    _find_min_max_d(detector)
    # The code of this function is synced with the code of the function
    # 'get_detector_geometry_2' in CrystFEL at commit
    # 41a8fa9819010fe8ddeb66676fee717f5226c7b8

    return detector