Skip to content
Snippets Groups Projects
Commit a94b685f authored by David Hammer's avatar David Hammer
Browse files

Merge branch 'deployed/spb-jungfrau' into devel

parents 412a34de 4113bf0c
No related branches found
No related tags found
1 merge request!12Snapshot: field test deployed version as of end of run 202201
Showing
with 3985 additions and 1247 deletions
TrainMatcher, 1.2.0-2.10.2
PipeToZeroMQ, 3.2.6-2.11.0
calngDeps, 0.0.3-2.10.0
calibrationClient, 9.0.6
# calng # calng
calng is a collection of Karabo devices to perform online processing of 2D X-ray detector data at runtime. It is the successor of the calPy package. calng is a collection of Karabo devices to perform online processing of 2D X-ray detector data at runtime. It is the successor of the calPy package.
# CalCat secrets and deployment
Correction devices each run their own `calibration_client.CalibrationClient`, so they need to have credentials for CalCat.
They expect to be able to load these from a JSON file; by default, this will be in `$KARABO/var/data/calibration-client-secrets.json` (`var/data` is CWD of Karabo devices).
The file should look something like:
```json
{
"base_url": "https://in.xfel.eu/test_calibration",
"client_id": "[sort of secret]",
"client_secret": "[actual secret]",
"user_email": "[eh, not that secret]",
"caldb_store_path": "/gpfs/exfel/d/cal/caldb_store"
}
```
For deployment, you'll want `/calibration` instead of `/test_calibration` and the caldb store as seen from ONC will be `/common/cal/caldb_store`.
...@@ -24,15 +24,21 @@ setup(name='calng', ...@@ -24,15 +24,21 @@ setup(name='calng',
packages=find_packages('src'), packages=find_packages('src'),
entry_points={ entry_points={
'karabo.bound_device': [ 'karabo.bound_device': [
'AgipdCorrection = calng.AgipdCorrection:AgipdCorrection',
'DsscCorrection = calng.DsscCorrection:DsscCorrection', 'DsscCorrection = calng.DsscCorrection:DsscCorrection',
'JungfrauCorrection = calng.JungfrauCorrection:JungfrauCorrection',
'ModuleStacker = calng.ModuleStacker:ModuleStacker', 'ModuleStacker = calng.ModuleStacker:ModuleStacker',
'ManualAgipdGeometry = calng.ManualAgipdGeometry:ManualAgipdGeometry',
'ManualDsscGeometry = calng.ManualDsscGeometry:ManualDsscGeometry',
'ManualJungfrauGeometry = calng.ManualJungfrauGeometry:ManualJungfrauGeometry',
'ShmemToZMQ = calng.ShmemToZMQ:ShmemToZMQ', 'ShmemToZMQ = calng.ShmemToZMQ:ShmemToZMQ',
'SimpleAssembler = calng.SimpleAssembler:SimpleAssembler',
], ],
'karabo.middlelayer_device': [ 'karabo.middlelayer_device': [
'CalibrationManager = calng.CalibrationManager:CalibrationManager' 'CalibrationManager = calng.CalibrationManager:CalibrationManager'
], ],
}, },
package_data={'': ['*.cpp']}, package_data={'': ['kernels/*']},
requires=[], requires=[],
) )
This diff is collapsed.
...@@ -23,7 +23,7 @@ from karabo.middlelayer import ( ...@@ -23,7 +23,7 @@ from karabo.middlelayer import (
KaraboError, Device, DeviceClientBase, Descriptor, Hash, Configurable, KaraboError, Device, DeviceClientBase, Descriptor, Hash, Configurable,
Slot, Node, Type, Slot, Node, Type,
AccessMode, AccessLevel, Assignment, DaqPolicy, State, Unit, AccessMode, AccessLevel, Assignment, DaqPolicy, State, Unit,
UInt16, UInt32, Bool, Double, String, VectorString, VectorHash, UInt16, UInt32, Bool, Double, Schema, String, VectorString, VectorHash,
background, call, callNoWait, setNoWait, sleep, instantiate, slot, coslot, background, call, callNoWait, setNoWait, sleep, instantiate, slot, coslot,
getDevice, getTopology, getConfiguration, getConfigurationFromPast, getDevice, getTopology, getConfiguration, getConfigurationFromPast,
get_property) get_property)
...@@ -31,6 +31,7 @@ from karabo.middlelayer_api.proxy import ProxyFactory ...@@ -31,6 +31,7 @@ from karabo.middlelayer_api.proxy import ProxyFactory
from karabo import version as karaboVersion from karabo import version as karaboVersion
from ._version import version as deviceVersion from ._version import version as deviceVersion
from . import scenes
''' '''
...@@ -81,14 +82,6 @@ class ClassIdsNode(Configurable): ...@@ -81,14 +82,6 @@ class ClassIdsNode(Configurable):
accessMode=AccessMode.INITONLY, accessMode=AccessMode.INITONLY,
assignment=Assignment.MANDATORY) assignment=Assignment.MANDATORY)
previewMatcherClass = String(
displayedName='Preview matcher class',
description='Device class to use for matching the output of a preview '
'layer.',
defaultValue='ModuleStacker',
accessMode=AccessMode.INITONLY,
assignment=Assignment.MANDATORY)
assemblerClass = String( assemblerClass = String(
displayedName='Assembler class', displayedName='Assembler class',
description='Device class to use for assembling the matched output of ' description='Device class to use for assembling the matched output of '
...@@ -124,14 +117,6 @@ class DeviceIdsNode(Configurable): ...@@ -124,14 +117,6 @@ class DeviceIdsNode(Configurable):
accessMode=AccessMode.INITONLY, accessMode=AccessMode.INITONLY,
assignment=Assignment.MANDATORY) assignment=Assignment.MANDATORY)
previewMatcherSuffix = String(
displayedName='Preview matcher suffix',
description='Suffix for preview layer matching device IDs. The '
'formatting placeholder \'layer\' may be used.',
defaultValue='MATCH_{layer}',
accessMode=AccessMode.INITONLY,
assignment=Assignment.MANDATORY)
assemblerSuffix = String( assemblerSuffix = String(
displayedName='Assembler suffix', displayedName='Assembler suffix',
description='Suffix for assembler device IDs. The formatting ' description='Suffix for assembler device IDs. The formatting '
...@@ -302,6 +287,45 @@ class CalibrationManager(DeviceClientBase, Device): ...@@ -302,6 +287,45 @@ class CalibrationManager(DeviceClientBase, Device):
else []), else []),
accessMode=AccessMode.READONLY) accessMode=AccessMode.READONLY)
availableScenes = VectorString(
displayedName='Available scenes',
displayType='Scenes',
requiredAccessLevel=AccessLevel.OBSERVER,
accessMode=AccessMode.READONLY,
defaultValue=['overview', 'managed_keys'],
daqPolicy=DaqPolicy.OMIT)
@slot
def requestScene(self, params):
name = params.get('name', default='overview')
if name == 'overview':
# Assumes there are correction devices known to manager
scene_data = scenes.manager_device_overview_scene(
self.deviceId,
self.getDeviceSchema(),
self._correction_device_schema,
self._correction_device_ids,
self._domain_device_ids,
)
payload = Hash('success', True, 'name', name, 'data', scene_data)
elif name.startswith('browse_schema'):
if ':' in name:
prefix = name[len('browse_schema:'):]
else:
prefix = 'managed'
scene_data = scenes.recursive_subschema_scene(
self.deviceId,
self.getDeviceSchema(),
prefix,
)
payload = Hash('success', True, 'name', name, 'data', scene_data)
else:
payload = Hash('success', False, 'name', name)
return Hash('type', 'deviceScene',
'origin', self.deviceId,
'payload', payload)
detectorType = String( detectorType = String(
displayedName='Detector type', displayedName='Detector type',
description='Type of the detector to manage.', description='Type of the detector to manage.',
...@@ -368,6 +392,13 @@ class CalibrationManager(DeviceClientBase, Device): ...@@ -368,6 +392,13 @@ class CalibrationManager(DeviceClientBase, Device):
self.deviceServers = value self.deviceServers = value
self._servers_changed = True self._servers_changed = True
imageDataPath = String(
displayedName='Image data path',
description='Path in DAQ hash to actual image data, used for preview',
accessMode=AccessMode.RECONFIGURABLE,
assignment=Assignment.OPTIONAL,
defaultValue='image.data')
geometryDevice = String( geometryDevice = String(
displayedName='Geometry device', displayedName='Geometry device',
description='[NYI] Device ID for a geometry device defining the ' description='[NYI] Device ID for a geometry device defining the '
...@@ -591,6 +622,9 @@ class CalibrationManager(DeviceClientBase, Device): ...@@ -591,6 +622,9 @@ class CalibrationManager(DeviceClientBase, Device):
# Obtain the device schema from a correction device server. # Obtain the device schema from a correction device server.
managed_schema, _, _ = await call(corr_server, 'slotGetClassSchema', managed_schema, _, _ = await call(corr_server, 'slotGetClassSchema',
self._correction_class_id) self._correction_class_id)
# saving this for later
self._correction_device_schema = Schema()
self._correction_device_schema.copy(managed_schema)
if managed_schema.name != self._correction_class_id: if managed_schema.name != self._correction_class_id:
self._set_fatal( self._set_fatal(
...@@ -1012,7 +1046,7 @@ class CalibrationManager(DeviceClientBase, Device): ...@@ -1012,7 +1046,7 @@ class CalibrationManager(DeviceClientBase, Device):
device_id_templates = {} device_id_templates = {}
class_args = (self.detectorType.value.lower().capitalize(),) class_args = (self.detectorType.value.lower().capitalize(),)
for role in ['correction', 'groupMatcher', 'bridge', 'previewMatcher', for role in ['correction', 'groupMatcher', 'bridge',
'assembler']: 'assembler']:
class_ids[role] = getattr( class_ids[role] = getattr(
self.classIds, f'{role}Class').value.format(*class_args) self.classIds, f'{role}Class').value.format(*class_args)
...@@ -1061,10 +1095,8 @@ class CalibrationManager(DeviceClientBase, Device): ...@@ -1061,10 +1095,8 @@ class CalibrationManager(DeviceClientBase, Device):
config = Hash() config = Hash()
# Legacy keys for calibrationBase. config['constantParameters.detectorName'] = self.detectorIdentifier.value
config['det_type'] = self.detectorType config['constantParameters.karaboDa'] = aggregator
config['det_identifier'] = self.detectorIdentifier
config['da_name'] = aggregator
config['dataInput.connectedOutputChannels'] = [input_channel] config['dataInput.connectedOutputChannels'] = [input_channel]
config['fastSources'] = [input_source] config['fastSources'] = [input_source]
...@@ -1153,78 +1185,26 @@ class CalibrationManager(DeviceClientBase, Device): ...@@ -1153,78 +1185,26 @@ class CalibrationManager(DeviceClientBase, Device):
background(_activate_bridge(bridge_device_id)) background(_activate_bridge(bridge_device_id))
# Instantiate preview layer matchers and assemblers. # Instantiate preview layer assemblers.
geometry_device_id = self.geometryDevice.value
for layer, output_pipeline, server in self.previewLayers.value: for layer, output_pipeline, server in self.previewLayers.value:
# Preview matcher. assembler_device_id = device_id_templates['assembler'].format(
matcher_device_id = device_id_templates['previewMatcher'].format(
layer=layer) layer=layer)
config = Hash() config = Hash()
config['channels'] = [ # TODO: put _image_data_path in corr dev schema, get from there
f'{device_id}:{output_pipeline}' config['pathToStack'] = self.imageDataPath.value
for device_id in correct_device_id_by_module.values()]
config['fastSources'] = [ config['fastSources'] = [
Hash('fsSelect', True, Hash('fsSelect', True,
'fsSource', 'fsSource',
f'{input_source_by_module[virtual_id]}') f'{input_source_by_module[virtual_id]}')
for (virtual_id, device_id) for (virtual_id, device_id)
in correct_device_id_by_module.items()] in correct_device_id_by_module.items()]
config['pathToStack'] = 'data.adc' config['channels'] = [
f'{device_id}:{output_pipeline}'
if not await self._instantiate_device( for device_id in correct_device_id_by_module.values()]
server, class_ids['previewMatcher'], matcher_device_id, config config['geometryInput.connectedOutputChannels'] = [
): f'{geometry_device_id}:geometryOutput']
return
# Preview assembler.
assembler_device_id = device_id_templates['assembler'].format(
layer=layer)
config = Hash()
config['input.connectedOutputChannels'] = [
f'{matcher_device_id}:output']
config['modules'] = [
Hash('source', input_source_by_module.get('Q1M1', ''),
'offX', 474, 'offY', 612, 'rot', 90),
Hash('source', input_source_by_module.get('Q1M2', ''),
'offX', 316, 'offY', 612, 'rot', 90),
Hash('source', input_source_by_module.get('Q1M3', ''),
'offX', 158, 'offY', 612, 'rot', 90),
Hash('source', input_source_by_module.get('Q1M4', ''),
'offX', 0, 'offY', 612, 'rot', 90),
Hash('source', input_source_by_module.get('Q2M1', ''),
'offX', 1136, 'offY', 612, 'rot', 90),
Hash('source', input_source_by_module.get('Q2M2', ''),
'offX', 978, 'offY', 612, 'rot', 90),
Hash('source', input_source_by_module.get('Q2M3', ''),
'offX', 820, 'offY', 612, 'rot', 90),
Hash('source', input_source_by_module.get('Q2M4', ''),
'offX', 662, 'offY', 612, 'rot', 90),
Hash('source', input_source_by_module.get('Q3M1', ''),
'offX', 712, 'offY', 0, 'rot', 270),
Hash('source', input_source_by_module.get('Q3M2', ''),
'offX', 870, 'offY', 0, 'rot', 270),
Hash('source', input_source_by_module.get('Q3M3', ''),
'offX', 1028, 'offY', 0, 'rot', 270),
Hash('source', input_source_by_module.get('Q3M4', ''),
'offX', 1186, 'offY', 0, 'rot', 270),
Hash('source', input_source_by_module.get('Q4M1', ''),
'offX', 50, 'offY', 0, 'rot', 270),
Hash('source', input_source_by_module.get('Q4M2', ''),
'offX', 208, 'offY', 0, 'rot', 270),
Hash('source', input_source_by_module.get('Q4M3', ''),
'offX', 366, 'offY', 0, 'rot', 270),
Hash('source', input_source_by_module.get('Q4M4', ''),
'offX', 524, 'offY', 0, 'rot', 270),
]
config['pathsToCombine'] = ['data.adc']
config['trainIdPath'] = 'image.trainId'
config['pulseIdPath'] = 'image.pulseId'
config['preview.enablePreview'] = True
config['preview.pathToPreview'] = 'data.adc'
config['preview.downSample'] = 2
config['badpixelPath'] = 'image.bad_pixels'
config['rotated90Grad'] = True
if not await self._instantiate_device( if not await self._instantiate_device(
server, class_ids['assembler'], assembler_device_id, config server, class_ids['assembler'], assembler_device_id, config
......
This diff is collapsed.
import enum
import cupy
import numpy as np
from karabo.bound import (
DOUBLE_ELEMENT,
KARABO_CLASSINFO,
OUTPUT_CHANNEL,
OVERWRITE_ELEMENT,
STRING_ELEMENT,
VECTOR_STRING_ELEMENT,
)
from . import base_gpu, calcat_utils, utils
from ._version import version as deviceVersion
from .base_correction import BaseCorrection, add_correction_step_schema, preview_schema
_pretend_pulse_table = np.arange(16, dtype=np.uint8)
class JungfrauConstants(enum.Enum):
Offset10Hz = enum.auto()
BadPixelsDark10Hz = enum.auto()
BadPixelsFF10Hz = enum.auto()
RelativeGain10Hz = enum.auto()
class CorrectionFlags(enum.IntFlag):
NONE = 0
OFFSET = 1
REL_GAIN = 2
BPMASK = 4
class JungfrauGpuRunner(base_gpu.BaseGpuRunner):
_kernel_source_filename = "jungfrau_gpu.cu"
_corrected_axis_order = "cyx"
def __init__(
self,
pixels_x,
pixels_y,
memory_cells,
constant_memory_cells,
input_data_dtype=cupy.uint16,
output_data_dtype=cupy.float32,
bad_pixel_mask_value=cupy.nan,
):
self.input_shape = (memory_cells, pixels_y, pixels_x)
self.processed_shape = self.input_shape
super().__init__(
pixels_x,
pixels_y,
memory_cells,
constant_memory_cells,
input_data_dtype,
output_data_dtype,
)
# TODO: avoid superclass creating cell table with wrong dtype first
self.cell_table_gpu = cupy.empty(self.memory_cells, dtype=cupy.uint8)
self.input_gain_map_gpu = cupy.empty(self.input_shape, dtype=cupy.uint8)
self.preview_buffer_getters.append(self._get_gain_map_for_preview)
self.map_shape = (self.constant_memory_cells, self.pixels_y, self.pixels_x, 3)
self.offset_map_gpu = cupy.zeros(self.map_shape, dtype=cupy.float32)
self.rel_gain_map_gpu = cupy.ones(self.map_shape, dtype=cupy.float32)
self.bad_pixel_map_gpu = cupy.zeros(self.map_shape, dtype=cupy.uint32)
self.bad_pixel_mask_value = bad_pixel_mask_value
self.update_block_size((1, 1, 64))
def _init_kernels(self):
kernel_source = self._kernel_template.render(
{
"pixels_x": self.pixels_x,
"pixels_y": self.pixels_y,
"data_memory_cells": self.memory_cells,
"constant_memory_cells": self.constant_memory_cells,
"input_data_dtype": utils.np_dtype_to_c_type(self.input_data_dtype),
"output_data_dtype": utils.np_dtype_to_c_type(self.output_data_dtype),
"corr_enum": utils.enum_to_c_template(CorrectionFlags),
"burst_mode": self.burst_mode,
}
)
self.source_module = cupy.RawModule(code=kernel_source)
self.correction_kernel = self.source_module.get_function("correct")
@property
def burst_mode(self):
return self.memory_cells > 1
def _get_raw_for_preview(self):
return self.input_data_gpu
def _get_corrected_for_preview(self):
return self.processed_data_gpu
def _get_gain_map_for_preview(self):
return self.input_gain_map_gpu
def load_data(self, image_data, input_gain_map, cell_table):
"""Experiment: loading all three in one function as they are tied"""
self.input_data_gpu.set(image_data)
self.input_gain_map_gpu.set(input_gain_map)
if self.burst_mode:
self.cell_table_gpu.set(cell_table)
def flush_buffers(self):
self.offset_map_gpu.fill(0)
self.rel_gain_map_gpu.fill(1)
self.bad_pixel_map_gpu.fill(0)
def correct(self, flags):
self.correction_kernel(
self.full_grid,
self.full_block,
(
self.input_data_gpu,
self.input_gain_map_gpu,
self.cell_table_gpu,
cupy.uint8(flags),
self.offset_map_gpu,
self.rel_gain_map_gpu,
self.bad_pixel_map_gpu,
self.bad_pixel_mask_value,
self.processed_data_gpu,
)
)
class JungfrauCalcatFriend(calcat_utils.BaseCalcatFriend):
_constant_enum_class = JungfrauConstants
def __init__(self, device, *args, **kwargs):
super().__init__(device, *args, **kwargs)
self._constants_need_conditions = {
JungfrauConstants.Offset10Hz: self.dark_condition,
JungfrauConstants.BadPixelsDark10Hz: self.dark_condition,
JungfrauConstants.BadPixelsFF10Hz: self.dark_condition,
JungfrauConstants.RelativeGain10Hz: self.dark_condition,
}
@staticmethod
def add_schema(
schema,
managed_keys,
param_prefix="constantParameters",
status_prefix="foundConstants",
):
super(JungfrauCalcatFriend, JungfrauCalcatFriend).add_schema(
schema, managed_keys, "jungfrau-Type", param_prefix, status_prefix
)
# set some defaults for common parameters
(
OVERWRITE_ELEMENT(schema)
.key(f"{param_prefix}.pixelsX")
.setNewDefaultValue(1024)
.commit(),
OVERWRITE_ELEMENT(schema)
.key(f"{param_prefix}.pixelsY")
.setNewDefaultValue(512)
.commit(),
OVERWRITE_ELEMENT(schema)
.key(f"{param_prefix}.memoryCells")
.setNewDefaultValue(1)
.commit(),
OVERWRITE_ELEMENT(schema)
.key(f"{param_prefix}.biasVoltage")
.setNewDefaultValue(90)
.commit(),
)
# add extra parameters
(
DOUBLE_ELEMENT(schema)
.key(f"{param_prefix}.integrationTime")
.displayedName("Integration time")
.description("Integration time in ms")
.assignmentOptional()
.defaultValue(350)
.reconfigurable()
.commit(),
DOUBLE_ELEMENT(schema)
.key(f"{param_prefix}.sensorTemperature")
.displayedName("Sensor temperature")
.description("Sensor temperature in K")
.assignmentOptional()
.defaultValue(291)
.reconfigurable()
.commit(),
DOUBLE_ELEMENT(schema)
.key(f"{param_prefix}.gainSetting")
.displayedName("Gain setting")
.description("Feedback capacitor setting; 0 is default, 1 is HG0")
.assignmentOptional()
.defaultValue(0)
.reconfigurable()
.commit(),
STRING_ELEMENT(schema)
.key(f"{param_prefix}.gainMode")
.displayedName("Gain mode")
.description(
"Detector may be operating in one of several gain modes. For this "
"device to query appropriate constants, it is sufficient to know "
"whether gain mode is dynamic or fixed."
)
.assignmentOptional()
.defaultValue("dynamicgain")
.options("dynamicgain,fixedgain")
.commit(),
)
managed_keys.add(f"{param_prefix}.integrationTime")
managed_keys.add(f"{param_prefix}.sensorTemperature")
managed_keys.add(f"{param_prefix}.gainSetting")
managed_keys.add(f"{param_prefix}.gainMode")
calcat_utils.add_status_schema_from_enum(
schema, status_prefix, JungfrauConstants
)
def dark_condition(self):
res = calcat_utils.OperatingConditions()
res["Memory cells"] = self._get_param("memoryCells")
res["Sensor Bias Voltage"] = self._get_param("biasVoltage")
res["Pixels X"] = self._get_param("pixelsX")
res["Pixels Y"] = self._get_param("pixelsY")
res["Integration Time"] = self._get_param("integrationTime")
res["Sensor Temperature"] = self._get_param("sensorTemperature")
res["Gain Setting"] = self._get_param("gainSetting")
gain_mode = self._get_param("gainMode")
if gain_mode != "dynamicgain":
# NOTE: always include if CalCat is updated for this
res["Gain mode"] = 1
return res
@KARABO_CLASSINFO("JungfrauCorrection", deviceVersion)
class JungfrauCorrection(BaseCorrection):
_correction_flag_class = CorrectionFlags
_correction_field_names = (
("offset", CorrectionFlags.OFFSET),
("relGain", CorrectionFlags.REL_GAIN),
("badPixels", CorrectionFlags.BPMASK),
)
_kernel_runner_class = JungfrauGpuRunner
_calcat_friend_class = JungfrauCalcatFriend
_constant_enum_class = JungfrauConstants
_managed_keys = BaseCorrection._managed_keys.copy()
_image_data_path = "data.adc"
_cell_table_path = "data.memoryCell"
@staticmethod
def expectedParameters(expected):
super(JungfrauCorrection, JungfrauCorrection).expectedParameters(expected)
(
OVERWRITE_ELEMENT(expected)
.key("dataFormat.pixelsX")
.setNewDefaultValue(1024)
.commit(),
OVERWRITE_ELEMENT(expected)
.key("dataFormat.pixelsY")
.setNewDefaultValue(512)
.commit(),
OVERWRITE_ELEMENT(expected)
.key("dataFormat.memoryCells")
.setNewDefaultValue(1)
.commit(),
OVERWRITE_ELEMENT(expected)
.key("preview.selectionMode")
.setNewDefaultValue("frame")
.commit(),
)
(
OUTPUT_CHANNEL(expected)
.key("preview.outputGainMap")
.dataSchema(preview_schema)
.commit(),
)
JungfrauCalcatFriend.add_schema(expected, JungfrauCorrection._managed_keys)
add_correction_step_schema(
expected,
JungfrauCorrection._managed_keys,
JungfrauCorrection._correction_field_names,
)
# mandatory: manager needs this in schema
(
VECTOR_STRING_ELEMENT(expected)
.key("managedKeys")
.assignmentOptional()
.defaultValue(list(JungfrauCorrection._managed_keys))
.commit()
)
@property
def input_data_shape(self):
return (
self.unsafe_get("dataFormat.memoryCells"),
self.unsafe_get("dataFormat.pixelsY"),
self.unsafe_get("dataFormat.pixelsX"),
)
def __init__(self, config):
super().__init__(config)
# TODO: gain mode as constant parameter and / or device configuration
try:
self.bad_pixel_mask_value = np.float32(
config.get("corrections.badPixels.maskingValue")
)
except ValueError:
self.bad_pixel_mask_value = np.float32("nan")
self._kernel_runner_init_args = {
"bad_pixel_mask_value": self.bad_pixel_mask_value,
}
def process_data(
self,
data_hash,
metadata,
source,
train_id,
image_data,
cell_table,
do_generate_preview,
):
if len(cell_table.shape) == 0:
cell_table = cell_table[np.newaxis]
try:
self.kernel_runner.load_data(
image_data, data_hash.get("data.gain"), cell_table
)
except ValueError as e:
self.log_status_warn(f"Failed to load data: {e}")
return
except Exception as e:
self.log_status_warn(f"Unknown exception when loading data to GPU: {e}")
buffer_handle, buffer_array = self._shmem_buffer.next_slot()
self.kernel_runner.correct(self._correction_flag_enabled)
self.kernel_runner.reshape(
output_order=self.unsafe_get("dataFormat.outputAxisOrder"),
out=buffer_array,
)
if do_generate_preview:
if self._correction_flag_enabled != self._correction_flag_preview:
self.kernel_runner.correct(self._correction_flag_preview)
(
preview_slice_index,
preview_cell,
preview_pulse,
) = utils.pick_frame_index(
self.unsafe_get("preview.selectionMode"),
self.unsafe_get("preview.index"),
cell_table,
_pretend_pulse_table,
warn_func=self.log_status_warn,
)
(
preview_raw,
preview_corrected,
preview_gain_map
) = self.kernel_runner.compute_previews(preview_slice_index)
# reusing input data hash for sending
data_hash.set(self._image_data_path, buffer_handle)
data_hash.set("calngShmemPaths", [self._image_data_path])
self._write_output(data_hash, metadata)
if do_generate_preview:
self._write_combiner_previews(
(
("preview.outputRaw", preview_raw),
("preview.outputCorrected", preview_corrected),
("preview.outputGainMap", preview_gain_map),
),
train_id,
source,
)
def _load_constant_to_runner(self, constant, constant_data):
if constant_data.shape[0] == self.get("dataFormat.pixelsX"):
constant_data = np.transpose(constant_data, (2, 1, 0, 3))
else:
constant_data = np.transpose(constant_data, (2, 0, 1, 3))
if constant is JungfrauConstants.Offset10Hz:
self.kernel_runner.offset_map_gpu.set(constant_data.astype(np.float32))
if not self.get("corrections.offset.available"):
self.set("corrections.offset.available", True)
elif constant is JungfrauConstants.RelativeGain10Hz:
self.kernel_runner.rel_gain_map_gpu.set(constant_data.astype(np.float32))
if not self.get("corrections.relGain.available"):
self.set("corrections.relGain.available", True)
elif constant in (
JungfrauConstants.BadPixelsDark10Hz, JungfrauConstants.BadPixelsFF10Hz
):
self.kernel_runner.bad_pixel_map_gpu |= cupy.asarray(constant_data)
if not self.get("corrections.badPixels.available"):
self.set("corrections.badPixels.available", True)
self._update_correction_flags()
self.log_status_info(f"Done loading {constant.name} to GPU")
import extra_geom
from karabo.bound import KARABO_CLASSINFO
from ._version import version as deviceVersion
from .manual_geometry_base import ManualQuadrantsGeometryBase
@KARABO_CLASSINFO("ManualAgipdGeometry", deviceVersion)
class ManualAgipdGeometry(ManualQuadrantsGeometryBase):
geometry_class = extra_geom.AGIPD_1MGeometry
@staticmethod
def expectedParameters(expected):
super(ManualAgipdGeometry, ManualAgipdGeometry).expectedParameters(expected)
expected.setDefaultValue("quadrantCorners.Q1.x", -525)
expected.setDefaultValue("quadrantCorners.Q1.y", 625)
expected.setDefaultValue("quadrantCorners.Q2.x", -550)
expected.setDefaultValue("quadrantCorners.Q2.y", -10)
expected.setDefaultValue("quadrantCorners.Q3.x", 520)
expected.setDefaultValue("quadrantCorners.Q3.y", -160)
expected.setDefaultValue("quadrantCorners.Q4.x", 542.5)
expected.setDefaultValue("quadrantCorners.Q4.y", 475)
import extra_geom
from karabo.bound import KARABO_CLASSINFO
from ._version import version as deviceVersion
from .manual_geometry_base import ManualQuadrantsGeometryBase
@KARABO_CLASSINFO("ManualDsscGeometry", deviceVersion)
class ManualDsscGeometry(ManualQuadrantsGeometryBase):
geometry_class = extra_geom.DSSC_1MGeometry
@staticmethod
def expectedParameters(expected):
super(ManualDsscGeometry, ManualDsscGeometry).expectedParameters(expected)
expected.setDefaultValue("quadrantCorners.Q1.x", -130)
expected.setDefaultValue("quadrantCorners.Q1.y", 5)
expected.setDefaultValue("quadrantCorners.Q2.x", -130)
expected.setDefaultValue("quadrantCorners.Q2.y", -125)
expected.setDefaultValue("quadrantCorners.Q3.x", 5)
expected.setDefaultValue("quadrantCorners.Q3.y", -125)
expected.setDefaultValue("quadrantCorners.Q4.x", 5)
expected.setDefaultValue("quadrantCorners.Q4.y", 5)
import extra_geom
from karabo.bound import KARABO_CLASSINFO, OVERWRITE_ELEMENT, Hash
from ._version import version as deviceVersion
from .manual_geometry_base import ManualModulesGeometryBase
@KARABO_CLASSINFO("ManualJungfrauGeometry", deviceVersion)
class ManualJungfrauGeometry(ManualModulesGeometryBase):
geometry_class = extra_geom.JUNGFRAUGeometry
@staticmethod
def expectedParameters(expected):
# TODO: come up with some sweet defaults (this is two modules from docs 4M)
(
OVERWRITE_ELEMENT(expected)
.key("modules")
.setNewDefaultValue(
[
Hash(
"posX", 95, "posY", 564, "orientationX", -1, "orientationY", -1
),
Hash(
"posX", 95, "posY", 17, "orientationX", -1, "orientationY", -1
),
]
)
)
import numpy as np import numpy as np
from karabo.bound import ( from karabo.bound import (
ChannelMetaData,
FLOAT_ELEMENT, FLOAT_ELEMENT,
KARABO_CLASSINFO, KARABO_CLASSINFO,
NODE_ELEMENT, NODE_ELEMENT,
STRING_ELEMENT, STRING_ELEMENT,
ChannelMetaData,
Epochstamp, Epochstamp,
Hash, Hash,
MetricPrefix, MetricPrefix,
...@@ -31,7 +31,6 @@ class ModuleStacker(TrainMatcher.TrainMatcher): ...@@ -31,7 +31,6 @@ class ModuleStacker(TrainMatcher.TrainMatcher):
@staticmethod @staticmethod
def expectedParameters(expected): def expectedParameters(expected):
super(ModuleStacker, ModuleStacker).expectedParameters(expected)
( (
FLOAT_ELEMENT(expected) FLOAT_ELEMENT(expected)
.key("timeOfFlight") .key("timeOfFlight")
...@@ -45,6 +44,7 @@ class ModuleStacker(TrainMatcher.TrainMatcher): ...@@ -45,6 +44,7 @@ class ModuleStacker(TrainMatcher.TrainMatcher):
.metricPrefix(MetricPrefix.MILLI) .metricPrefix(MetricPrefix.MILLI)
.readOnly() .readOnly()
.commit(), .commit(),
STRING_ELEMENT(expected) STRING_ELEMENT(expected)
.key("pathToStack") .key("pathToStack")
.displayedName("Data path to stack") .displayedName("Data path to stack")
...@@ -79,8 +79,11 @@ class ModuleStacker(TrainMatcher.TrainMatcher): ...@@ -79,8 +79,11 @@ class ModuleStacker(TrainMatcher.TrainMatcher):
"These nodes are not used." "These nodes are not used."
) )
schema = Schema() schema = Schema()
NODE_ELEMENT(schema).key("start").description(desc).commit() (
NODE_ELEMENT(schema).key("stop").description(desc).commit() NODE_ELEMENT(schema).key("start").description(desc).commit(),
NODE_ELEMENT(schema).key("stop").description(desc).commit(),
)
self.path_to_stack = self.get("pathToStack") self.path_to_stack = self.get("pathToStack")
self.updateSchema(schema) self.updateSchema(schema)
...@@ -133,9 +136,6 @@ class ModuleStacker(TrainMatcher.TrainMatcher): ...@@ -133,9 +136,6 @@ class ModuleStacker(TrainMatcher.TrainMatcher):
out_hash[self.path_to_stack] = stacked_data out_hash[self.path_to_stack] = stacked_data
out_hash["sources"] = stacked_sources out_hash["sources"] = stacked_sources
out_hash["modulesPresent"] = stacked_present out_hash["modulesPresent"] = stacked_present
if not out_hash.has("image.passport"):
out_hash.set("image.passport", [])
out_hash["image.passport"].append(self.getInstanceId())
channel = self.signalSlotable.getOutputChannel("output") channel = self.signalSlotable.getOutputChannel("output")
channel.write(out_hash, ChannelMetaData(self.getInstanceId(), timestamp)) channel.write(out_hash, ChannelMetaData(self.getInstanceId(), timestamp))
channel.update() channel.update()
......
import threading
from time import time from time import time
from karabo.bound import KARABO_CLASSINFO from karabo.bound import KARABO_CLASSINFO
from PipeToZeroMQ import PipeToZeroMQ, conversion, device_schema from PipeToZeroMQ import PipeToZeroMQ, conversion, device_schema
from . import shmem_utils from . import shmem_utils
from ._version import version as deviceVersion from ._version import version as deviceVersion
...@@ -13,32 +11,7 @@ from ._version import version as deviceVersion ...@@ -13,32 +11,7 @@ from ._version import version as deviceVersion
class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
def initialization(self): def initialization(self):
super().initialization() super().initialization()
self._source_to_shmem_ary = {} self._shmem_handler = shmem_utils.ShmemCircularBufferReceiver()
self._source_to_shmem_mem = {}
self._buffer_lock = threading.Lock()
def _get_shmem_buffer_data(self, source, shmem_handle):
# TODO: handle failure if this was not a shmem handle
name, dtype, shape, index = shmem_utils.parse_shmem_handle(shmem_handle)
with self._buffer_lock:
# may have to open shared memory buffer
if source not in self._source_to_shmem_ary:
self.log.INFO(f"Opening buffer {name} for source {source}")
try:
mem, ary = shmem_utils.open_shmem_from_handle(shmem_handle)
except OSError:
self.log.WARN(f"Failed to open buffer {name}")
return None
self._source_to_shmem_mem[source] = mem
self._source_to_shmem_ary[source] = ary
elif self._source_to_shmem_ary[source].shape != shape:
self.log.INFO(f"Updating buffer shape for {source} to {shape}")
self._source_to_shmem_ary[source] = self._source_to_shmem_mem[
source
].ndarray(shape=shape, dtype=dtype)
# grab data from shared memory buffer
return self._source_to_shmem_ary[source][index]
def onInput(self, input_channel): def onInput(self, input_channel):
actual = self.getActualTimestamp() actual = self.getActualTimestamp()
...@@ -80,7 +53,12 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): ...@@ -80,7 +53,12 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
f"Hash from {source} did not have {shmem_handle_path}" f"Hash from {source} did not have {shmem_handle_path}"
) )
continue continue
actual_data = self._get_shmem_buffer_data(source, shmem_handle) elif shmem_handle_path == "":
self.log.INFO(
f"Hash from {source} had empty {shmem_handle_path}"
)
continue
actual_data = self._shmem_handler.get(shmem_handle)
arr[shmem_handle_path] = actual_data arr[shmem_handle_path] = actual_data
data[source] = (dic, arr) data[source] = (dic, arr)
...@@ -95,8 +73,3 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): ...@@ -95,8 +73,3 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
self._updateProperties(output_tic) self._updateProperties(output_tic)
# block if device is in passive state # block if device is in passive state
self.monitoring.wait() self.monitoring.wait()
def preDestruction(self):
for ary in self._source_to_shmem_ary.values():
del ary
super().preDestruction()
import functools
import pickle
import re
import threading
import time
import numpy as np
from karabo.bound import (
FLOAT_ELEMENT,
IMAGEDATA_ELEMENT,
INPUT_CHANNEL,
KARABO_CLASSINFO,
OUTPUT_CHANNEL,
OVERWRITE_ELEMENT,
STRING_ELEMENT,
UINT32_ELEMENT,
UINT64_ELEMENT,
ChannelMetaData,
Dims,
Encoding,
Epochstamp,
Hash,
ImageData,
MetricPrefix,
Schema,
Timestamp,
Trainstamp,
Unit,
)
from karabo.common.api import KARABO_SCHEMA_DISPLAY_TYPE_SCENES as DT_SCENES
from TrainMatcher import TrainMatcher
from TrainMatcher import scenes as trainmatcher_scenes
from . import scenes
from ._version import version as deviceVersion
preview_schema = Schema()
(
IMAGEDATA_ELEMENT(preview_schema).key("image").commit(),
UINT64_ELEMENT(preview_schema).key("trainId").readOnly().commit(),
)
xtdf_source_re = re.compile(r".*\/DET\/(\d+)CH0:xtdf")
daq_source_re = re.compile(r".*\/DET\/.*?(\d+):daqOutput")
# TODO: merge scene with TrainMatcher's nice overview
@KARABO_CLASSINFO("SimpleAssembler", deviceVersion)
class SimpleAssembler(TrainMatcher.TrainMatcher):
@staticmethod
def expectedParameters(expected):
(
OVERWRITE_ELEMENT(expected)
.key("availableScenes")
.setNewDefaultValue(["overview", "trainMatcherScene"])
.commit(),
FLOAT_ELEMENT(expected)
.key("processingTime")
.unit(Unit.SECOND)
.metricPrefix(MetricPrefix.MILLI)
.readOnly()
.initialValue(0)
.warnHigh(500)
.info("Cannot keep up with GUI limit")
.needsAcknowledging(False)
.commit(),
FLOAT_ELEMENT(expected)
.key("timeOfFlight")
.unit(Unit.SECOND)
.metricPrefix(MetricPrefix.MILLI)
.readOnly()
.initialValue(0)
.warnHigh(1000)
.info("Time of flight exceeding 1 s")
.needsAcknowledging(False)
.commit(),
STRING_ELEMENT(expected)
.key("pathToStack")
.assignmentOptional()
.defaultValue("image.data")
.commit(),
UINT32_ELEMENT(expected)
.key("downsamplingFactor")
.description(
"If greater than 1, the assembled image will be downsampled by this "
"factor in x and y dimensions before sending. This is only to save "
"bandwidth in case GUI updates start lagging."
)
.assignmentOptional()
.defaultValue(1)
.options("1,2,4,8")
.reconfigurable()
.commit(),
STRING_ELEMENT(expected)
.key("downsamplingFunction")
.description("Reduction function used during downsampling.")
.assignmentOptional()
.defaultValue("nanmax")
.options("nanmax,nanmean,nanmin,nanmedian")
.reconfigurable()
.commit(),
INPUT_CHANNEL(expected)
.key("geometryInput")
.displayedName("Geometry input")
.commit(),
OUTPUT_CHANNEL(expected) # can OVERWRITE_ELEMENT even do this?
.key("output")
.dataSchema(preview_schema)
.commit(),
)
def initialization(self):
super().initialization()
# TODO: match inside device, fill multiple independent buffers
self._path_to_stack = self.get("pathToStack")
self.geometry = None
self.input_buffer = None
self.KARABO_ON_DATA("geometryInput", self.receive_geometry)
self.KARABO_SLOT(self.requestScene)
self.ask_for_geometry()
self.start()
def requestScene(self, params):
# TODO: unify with TrainMatcher overview
scene_name = params.get("name", default="")
if scene_name == "overview":
payload = Hash("name", scene_name, "success", True)
payload["data"] = scenes.simple_assembler_overview(
device_id=self.getInstanceId(),
geometry_device_id=self.get("geometryInput.connectedOutputChannels")[
0
].split(":")[0],
)
self.reply(
Hash(
"type",
"deviceScene",
"origin",
self.getInstanceId(),
"payload",
payload,
)
)
elif scene_name == "trainMatcherScene":
params["name"] = "scene"
return super().requestScene(params)
def receive_geometry(self, data, metadata):
self.log.INFO("Received a new geometry")
self.geometry = pickle.loads(data.get("pickledGeometry"))
# TODO: allow multiple memory cells (extra geom notion of extra dimensions)
self.input_buffer = np.zeros(self.geometry.expected_data_shape)
def ask_for_geometry(self):
def runner():
self.log.INFO("Will ask around for a geometry")
max_tries = 10
for i in range(max_tries):
time.sleep(np.random.random() * 10)
if self.geometry is None:
missing_connections = set(
self.get("geometryInput.missingConnections")
)
# note: connectedOutputChannels not necessarily connected...
geometry_device_list = [
channel
for channel in self.get("geometryInput.connectedOutputChannels")
if channel not in missing_connections
]
if not geometry_device_list:
self.log.INFO("No geometry device connected")
continue
geometry_device = geometry_device_list[0].split(":")[0]
self.log.INFO(f"Asking {geometry_device} for a geometry")
self.signalSlotable.call(geometry_device, "pleaseSendYourGeometry")
time.sleep(1)
if self.geometry is not None:
return
self.log.INFO(f"Failed to get geometry in {max_tries} tries, need help")
threading.Thread(target=runner, daemon=True).start()
def _send(self, train_id, sources):
# TODO: adapt to appropriate hook for new TrainMatcher (no _send)
if self.geometry is None:
self.log.WARN("Have not received a geometry yet")
return
timestamp = Timestamp(Epochstamp(), Trainstamp(train_id))
module_indices_unfilled = set(range(self.input_buffer.shape[0]))
for source, (data, metadata) in sources.items():
# TODO: handle failure to "parse" source, get data out
module_index = self._source_to_index(source)
self.input_buffer[module_index] = np.squeeze(data.get(self._path_to_stack))
module_indices_unfilled.discard(module_index)
for unfilled_module in module_indices_unfilled:
self.input_buffer[unfilled_module].fill(0)
# TODO: configurable treatment of missing modules
# TODO: reusable output buffer to save on allocation
assembled, _ = self.geometry.position_modules_fast(self.input_buffer)
downsampling_factor = self.get("downsamplingFactor")
if downsampling_factor > 1:
assembled = downsample_2d(
assembled,
downsampling_factor,
reduction_fun=getattr(np, self.get("downsamplingFunction"))
)
# TODO: optionally include control data
out_hash = Hash(
"image",
ImageData(
# TODO: get around this being mirrored...
(assembled[::-1, ::-1]).astype(np.int32),
Dims(*assembled.shape),
Encoding.GRAY,
),
"trainId",
train_id,
)
channel = self.signalSlotable.getOutputChannel("output")
channel.write(out_hash, ChannelMetaData(self.getInstanceId(), timestamp))
channel.update()
self.rate_out.update()
@functools.lru_cache()
def _source_to_index(self, source):
# note: cache means warning only shows up once (also not performance-critical)
# TODO: allow user to inspect, modify the mapping
match = xtdf_source_re.match(source)
if match is not None:
return int(match.group(1))
match = daq_source_re.match(source)
if match is not None:
return int(match.group(1)) - 1
self.log.WARN(f"Couldn't figure out index for source {source}")
return 0
def downsample_2d(arr, factor, reduction_fun=np.nanmax):
"""Generalization of downsampling from FemDataAssembler
Expects first two dimensions of arr to be multiple of 2 ** factor
Useful if you're sitting at home and ssh connection is slow to get full-resolution
previews."""
for i in range(factor // 2):
arr = reduction_fun(
(
arr[:-1:2],
arr[1::2],
), axis=0
)
arr = reduction_fun(
(
arr[:, :-1:2],
arr[:, 1::2],
), axis=0
)
return arr
This diff is collapsed.
import pathlib
import cupy
import jinja2
import numpy as np
from . import utils
class BaseGpuRunner:
"""Class to handle GPU buffers and execution of CUDA kernels on image data
All GPU buffers are kept within this class and it is intentionally very stateful.
This generally means that you will want to load data into it and then do something.
Typical usage in correct order:
1. instantiate
2. load constants
3. load_data
4. load_cell_table
5. correct
6a. reshape (only here does data transfer back to host)
6b. compute_preview (optional)
repeat from 2. or 3.
In case no constants are available / correction is not desired, can skip 3 and 4 and
pass CorrectionFlags.NONE to correct(...). Generally, user must handle which
correction steps are appropriate given the constants loaded so far.
"""
# These must be set by subclass
_kernel_source_filename = None
_corrected_axis_order = None
def __init__(
self,
pixels_x,
pixels_y,
memory_cells,
constant_memory_cells,
input_data_dtype=np.uint16,
output_data_dtype=np.float32,
):
_src_dir = pathlib.Path(__file__).absolute().parent
# subclass must define _kernel_source_filename
with (_src_dir / "kernels" / self._kernel_source_filename).open("r") as fd:
self._kernel_template = jinja2.Template(fd.read())
self.pixels_x = pixels_x
self.pixels_y = pixels_y
self.memory_cells = memory_cells
if constant_memory_cells == 0:
# if not set, guess same as input; may save one recompilation
self.constant_memory_cells = memory_cells
else:
self.constant_memory_cells = constant_memory_cells
# preview will only be single memory cell
self.preview_shape = (self.pixels_x, self.pixels_y)
self.input_data_dtype = input_data_dtype
self.output_data_dtype = output_data_dtype
self._init_kernels()
# reuse buffers for input / output
self.cell_table_gpu = cupy.empty(self.memory_cells, dtype=np.uint16)
self.input_data_gpu = cupy.empty(self.input_shape, dtype=input_data_dtype)
self.processed_data_gpu = cupy.empty(
self.processed_shape, dtype=output_data_dtype
)
self.reshaped_data_gpu = None # currently not reusing buffer
# default preview layers: raw and corrected (subclass can extend)
self.preview_buffer_getters = [
self._get_raw_for_preview,
self._get_corrected_for_preview,
]
# to get data from respective buffers to cell, x, y shape for preview computation
def _get_raw_for_preview(self):
"""Should return view of self.input_data_gpu with shape (cell, x/y, x/y)"""
raise NotImplementedError()
def _get_corrected_for_preview(self):
"""Should return view of self.processed_data_gpu with shape (cell, x/y, x/y)"""
raise NotImplementedError()
def flush_buffers(self):
"""Optional reset GPU buffers (implement in subclasses which need this)"""
pass
def correct(self, flags):
"""Correct (already loaded) image data according to flags
Subclass must define this method. It should assume that image data, cell table,
and other data (including constants) has already been loaded. It should
probably run some GPU kernel and output should go into self.processed_data_gpu.
Keep in mind that user only gets output from compute_preview or reshape
(either of these should come after correct).
The submodules providing subclasses should have some IntFlag enums defining
which flags are available to pass along to the kernel. A zero flag should allow
the kernel to do no actual correction - but still copy the data between buffers
and cast it to desired output type.
"""
raise NotImplementedError()
def reshape(self, output_order, out=None):
"""Move axes to desired output order and copy to host memory
The out parameter is passed directly to the get function of GPU array: if
None, then a new ndarray (in host memory) is returned. If not None, then data
will be loaded into the provided array, which must match shape / dtype.
"""
# TODO: avoid copy
if output_order == self._corrected_axis_order:
self.reshaped_data_gpu = self.processed_data_gpu
else:
self.reshaped_data_gpu = cupy.transpose(
self.processed_data_gpu,
utils.transpose_order(self._corrected_axis_order, output_order),
)
return self.reshaped_data_gpu.get(out=out)
def load_data(self, raw_data):
self.input_data_gpu.set(np.squeeze(raw_data))
def load_cell_table(self, cell_table):
self.cell_table_gpu.set(cell_table)
def compute_previews(self, preview_index):
"""Generate single slice or reduction preview of raw and corrected data
Special values of preview_index are -1 for max, -2 for mean, -3 for sum, and
-4 for stdev (across cells).
Note that preview_index is taken from data without checking cell table.
Caller has to figure out which index along memory cell dimension they
actually want to preview in case it needs to be a specific pulse.
Will reuse data from corrected output buffer. Therefore, correct(...) must have
been called with the appropriate flags before compute_preview(...).
"""
if preview_index < -4:
raise ValueError(f"No statistic with code {preview_index} defined")
elif preview_index >= self.memory_cells:
raise ValueError(f"Memory cell index {preview_index} out of range")
# TODO: enum around reduction type
return tuple(
self._compute_a_preview(image_data=getter(), preview_index=preview_index)
for getter in self.preview_buffer_getters
)
def _compute_a_preview(self, image_data, preview_index):
"""image_data must have cells on first axis; X and Y order is not important
here for now (and can differ between AGIPD and DSSC)"""
if preview_index >= 0:
# TODO: reuse pinned buffers for this
return image_data[preview_index].astype(np.float32).get()
elif preview_index == -1:
# TODO: confirm that max is pixel and not integrated intensity
# separate from next case because dtype not applicable here
return cupy.nanmax(image_data, axis=0).astype(cupy.float32).get()
elif preview_index in (-2, -3, -4):
stat_fun = {
-2: cupy.nanmean,
-3: cupy.nansum,
-4: cupy.nanstd,
}[preview_index]
return stat_fun(image_data, axis=0, dtype=cupy.float32).get()
def update_block_size(self, full_block):
"""Set execution grid such that it covers processed_shape with full_blocks
Execution is scheduled with 3d "blocks" of CUDA threads. Tuning can affect
performance. Correction kernels are "monolithic" for simplicity (i.e. each
logical thread handles one entry in output data), so in each dimension we
parallelize, grid * block >= length to cover all entries.
Note that individual kernels must themselves check whether they go out of
bounds; grid dimensions get rounded up in case ndarray size is not multiple of
block size.
"""
assert len(full_block) == 3
self.full_block = tuple(full_block)
self.full_grid = tuple(
utils.ceil_div(a_length, block_length)
for (a_length, block_length) in zip(self.processed_shape, full_block)
)
This diff is collapsed.
This diff is collapsed.
import re
import pycuda.driver
import pycuda.gpuarray
_gpuptr_re = re.compile(
r"GPUPTR:(?P<gpu_pointer>\w+)" r"DEVID:(?P<device_id>.+)" r"SHAPE:(?P<shape>.+)"
)
def get_shape_from_ipc_handle(handle_string):
match = _gpuptr_re.match(handle_string)
return tuple(int(num) for num in match.group("shape").split(","))
class IPCGPUArray:
"""Context manager providing a GPUArray opened from string encoding IPC handle
Arguments:
handle_string: String encoding a "GPU pointer" (IPC address) plus some more
stuff. This is "parsed" using _gpuptr_re.
dtype: self-explanatory (but make sure it is correct)
aray shape is parsed from the handle_string
"""
def __init__(self, handle_string, dtype, gpu_pointer_re=None):
match = _gpuptr_re.match(handle_string)
assert match is not None
self.dtype = dtype
self.handle_address = bytearray.fromhex(match.group("gpu_pointer"))
self.shape = tuple(int(num) for num in match.group("shape").split(","))
# assuming contiguous C-order strides probably
# TODO: smarter
self.open_handle = None
self.gpu_array = None
def __enter__(self):
self.open_handle = pycuda.driver.IPCMemoryHandle(self.handle_address)
self.gpu_array = pycuda.gpuarray.GPUArray(
self.shape, dtype=self.dtype, gpudata=self.open_handle
)
return self.gpu_array
def __exit__(self, t, v, tb):
self.open_handle.close()
class GPUContextContext:
def __init__(self, gpu_context):
self.gpu_context = gpu_context
def __enter__(self):
self.gpu_context.push()
return self.gpu_context
def __exit__(self, t, v, tb):
self.gpu_context.pop()
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment