diff --git a/setup.py b/setup.py index 85f6b2ada7ee2cb26c065409707b0bc22cf94e4e..94b98d0da6a06754b9221906afb1fd947e521031 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,7 @@ setup(name='calng', packages=find_packages('src'), entry_points={ 'karabo.bound_device': [ + 'AgipdCorrection = calng.AgipdCorrection:AgipdCorrection', 'DsscCorrection = calng.DsscCorrection:DsscCorrection', 'ModuleStacker = calng.ModuleStacker:ModuleStacker', 'ShmemToZMQ = calng.ShmemToZMQ:ShmemToZMQ', diff --git a/src/calng/AgipdCorrection.py b/src/calng/AgipdCorrection.py new file mode 100644 index 0000000000000000000000000000000000000000..2c1c5537e2b51d175e7da5f85dbc8bfb6d7f6d95 --- /dev/null +++ b/src/calng/AgipdCorrection.py @@ -0,0 +1,303 @@ +import timeit + +import calibrationBase +import numpy as np +from karabo.bound import KARABO_CLASSINFO +from karabo.common.states import State + +from . import shmem_utils, utils +from ._version import version as deviceVersion +from .base_correction import BaseCorrection +from .agipd_gpu import AgipdGpuRunner + + +@KARABO_CLASSINFO("AgipdCorrection", deviceVersion) +class AgipdCorrection(BaseCorrection): + @staticmethod + def expectedParameters(expected): + AgipdCorrection.addConstant( + "ThresholdsDark", + "Dark", + expected, + optional=False, + mandatoryForIteration=True, + ) + AgipdCorrection.addConstant( + "Offset", "Dark", expected, optional=False, mandatoryForIteration=True + ) + AgipdCorrection.addConstant( + "SlopesPC", "Dark", expected, optional=True, mandatoryForIteration=True + ) + AgipdCorrection.addConstant( + "SlopesFF", + "Illuminated", + expected, + optional=True, + mandatoryForIteration=True, + ) + AgipdCorrection.addConstant( + "BadPixelsDark", "Dark", expected, optional=True, mandatoryForIteration=True + ) + AgipdCorrection.addConstant( + "BadPixelsPC", "Dark", expected, optional=True, mandatoryForIteration=True + ) + AgipdCorrection.addConstant( + "BadPixelsFF", + "Illuminated", + expected, + optional=True, + mandatoryForIteration=True, + ) + super(AgipdCorrection, AgipdCorrection).expectedParameters(expected) + + def __init__(self, config): + super().__init__(config) + output_axis_order = config.get("dataFormat.outputAxisOrder") + if output_axis_order == "pixels-fast": + self._output_transpose = (0, 2, 1) + elif output_axis_order == "memorycells-fast": + self._output_transpose = (2, 1, 0) + else: + self._output_transpose = None + self._offset_map = None + self._update_pulse_filter(config.get("pulseFilter")) + self._update_shapes( + config.get("dataFormat.pixelsX"), + config.get("dataFormat.pixelsY"), + config.get("dataFormat.memoryCells"), + self.pulse_filter, + self._output_transpose, + ) + + self.updateState(State.ON) + + def process_input(self, data, metadata): + """Registered for dataInput, handles all processing and sending""" + + if not self.get("doAnything"): + if self.get("state") is State.PROCESSING: + self.updateState(State.ACTIVE) + return + + source = metadata.get("source") + + if source not in self.sources: + self.log.INFO(f"Ignoring unknown source {source}") + return + + # TODO: what are these empty things for? + if not data.has("image"): + self.log.INFO("Ignoring hash without image node") + return + + time_start = timeit.default_timer() + + train_id = metadata.getAttribute("timestamp", "tid") + cell_table = np.squeeze(data.get("image.cellId")) + assert isinstance(cell_table, np.ndarray), "image.cellId should be ndarray" + if len(cell_table.shape) == 0: + msg = "cellId had 0 dimensions. DAQ may not be sending data." + self.set("status", msg) + self.log.WARN(msg) + return + # original shape: memory_cell, data/raw_gain, x, y + # TODO: consider making paths configurable + image_data = data.get("image.data") + self.log.INFO(f"Image data had shape: {image_data.shape}") + return + if image_data.shape[0] != self.get("dataFormat.memoryCells"): + self.set( + "status", f"Updating input shapes based on received {image_data.shape}" + ) + # TODO: truncate if > 800 + self.set("dataFormat.memoryCells", image_data.shape[0]) + with self._buffer_lock: + self._update_pulse_filter(self.get("pulseFilter")) + self._update_shapes( + self.get("dataFormat.pixelsX"), + self.get("dataFormat.pixelsY"), + self.get("dataFormat.memoryCells"), + self.pulse_filter, + self._output_transpose, + ) + # TODO: check shape (DAQ fake data and RunToPipe don't agree) + # TODO: consider just updating shapes based on whatever comes in + + correction_cell_num = self.get("dataFormat.memoryCellsCorrection") + do_generate_preview = train_id % self.get( + "preview.trainIdModulo" + ) == 0 and self.get("preview.enable") + can_apply_correction = correction_cell_num > 0 + do_apply_correction = self.get("applyCorrection") + + if not self.get("state") is State.PROCESSING: + self.updateState(State.PROCESSING) + self.set("status", "Processing data") + if self._state_reset_timer is None: + self._state_reset_timer = utils.DelayableTimer( + timeout=self.get("processingStateTimeout"), + callback=self._reset_state_from_processing, + ) + else: + self._state_reset_timer.set_timeout(self.get("processingStateTimeout")) + + with self._buffer_lock: + cell_table = cell_table[self.pulse_filter] + pulse_table = np.squeeze(data.get("image.pulseId"))[self.pulse_filter] + + cell_table_max = np.max(cell_table) + if do_apply_correction: + if not can_apply_correction: + msg = "No constant loaded, correction will not be applied." + self.log.WARN(msg) + self.set("status", msg) + do_apply_correction = False + elif cell_table_max >= correction_cell_num: + msg = ( + f"Max cell ID ({cell_table_max}) exceeds range for loaded " + f"constant (has {correction_cell_num} cells). Some frames " + "will not be corrected." + ) + self.log.WARN(msg) + self.set("status", msg) + + self.gpu_runner.load_data(image_data) + buffer_handle, buffer_array = self._shmem_buffer.next_slot() + if do_apply_correction: + self.gpu_runner.load_cell_table(cell_table) + self.gpu_runner.correct() + else: + self.gpu_runner.only_cast() + self.gpu_runner.reshape(out=buffer_array) + if do_generate_preview: + preview_slice_index = self.get("preview.pulse") + if preview_slice_index >= 0: + # look at pulse_table to find which index this pulse ID is in + pulse_id_found = np.where(pulse_table == preview_slice_index)[0] + if len(pulse_id_found) == 0: + pulse_found_instead = pulse_table[0] + msg = ( + f"Pulse {preview_slice_index} not found in " + f"image.pulseId, arbitrary pulse " + f"{pulse_found_instead} will be shown." + ) + preview_slice_index = 0 + self.log.WARN(msg) + self.set("status", msg) + else: + preview_slice_index = pulse_id_found[0] + if not do_apply_correction: + if can_apply_correction: + # in this case, cell table has not been loaded, but needs to be now + self.gpu_runner.load_cell_table(cell_table) + else: + # in this case, there will be no corrected preview + self.log.WARN( + "Corrected preview will not actually be corrected." + ) + preview_raw, preview_corrected = self.gpu_runner.compute_preview( + preview_slice_index, + have_corrected=do_apply_correction, + can_correct=can_apply_correction, + ) + + data.set("image.data", buffer_handle) + data.set("image.cellId", cell_table[:, np.newaxis]) + data.set("image.pulseId", pulse_table[:, np.newaxis]) + data.set("calngShmemPaths", ["image.data"]) + self._write_output(data, metadata) + if do_generate_preview: + self._write_combiner_preview( + preview_raw, preview_corrected, train_id, source + ) + + # update rate etc. + self._buffered_status_update.set("trainId", train_id) + self._rate_tracker.update() + time_spent = timeit.default_timer() - time_start + self._buffered_status_update.set( + "performance.lastProcessingDuration", time_spent * 1000 + ) + if self.get("performance.rateUpdateOnEachInput"): + self._update_actual_rate() + + def constantLoaded(self): + """Hook from CalibrationReceiverBaseDevice called after each getConstant + + Here, used to load the received constants (or correction maps derived + fromt them) onto GPU. + + TODO: call after receiving *all* constants instead of calling once per + new constant (will cause some overhead for bigger devices) + + """ + + self.log.WARN("Not ready to handle constants yet") + ... + + def _update_pulse_filter(self, filter_string): + """Called whenever the pulse filter changes, typically followed by + _update_shapes""" + + if filter_string.strip() == "": + new_filter = np.arange(self.get("dataFormat.memoryCells"), dtype=np.uint16) + else: + new_filter = np.array(eval(filter_string), dtype=np.uint16) + assert np.max(new_filter) < self.get("dataFormat.memoryCells") + self.pulse_filter = new_filter + + def _update_shapes( + self, pixels_x, pixels_y, memory_cells, pulse_filter, output_transpose + ): + """(Re)initialize (GPU) buffers according to expected data shapes""" + + input_data_shape = (memory_cells, 1, pixels_y, pixels_x) + # reflect the axis reordering in the expected output shape + output_data_shape = utils.shape_after_transpose( + input_data_shape, output_transpose + ) + self.set("dataFormat.inputDataShape", list(input_data_shape)) + self.set("dataFormat.outputDataShape", list(output_data_shape)) + + if self._shmem_buffer is None: + shmem_buffer_name = self.getInstanceId() + ":dataOutput" + memory_budget = self.get("outputShmemBufferSize") * 2 ** 30 + self.log.INFO(f"Opening new shmem buffer: {shmem_buffer_name}") + self._shmem_buffer = shmem_utils.ShmemCircularBuffer( + memory_budget, + output_data_shape, + self.output_data_dtype, + shmem_buffer_name, + ) + else: + self._shmem_buffer.change_shape(output_data_shape) + + self.gpu_runner = AgipdGpuRunner( + pixels_x, + pixels_y, + memory_cells, + output_transpose=output_transpose, + input_data_dtype=self.input_data_dtype, + output_data_dtype=self.output_data_dtype, + ) + + self._update_maps_on_gpu() + + def _update_maps_on_gpu(self): + """Updates the correction maps stored on GPU based on constants known + + This only does something useful if constants have been retrieved from + CalCat. Should be called automatically upon retrieval and after + changing the data shape. + + """ + + self.set("status", "Updating constants on GPU using known constants") + self.updateState(State.CHANGING) + if self._offset_map is not None: + self.gpu_runner.load_constants(self._offset_map) + msg = "Done transferring known constant(s) to GPU" + self.log.INFO(msg) + self.set("status", msg) + + self.updateState(State.ON) diff --git a/src/calng/DsscCorrection.py b/src/calng/DsscCorrection.py index f86153cbf96fcf7ee8691853e14893cc78e8c0de..fff86e9dd708c50c8523e1a469fd6fd21288685b 100644 --- a/src/calng/DsscCorrection.py +++ b/src/calng/DsscCorrection.py @@ -1,376 +1,27 @@ -import threading import timeit import calibrationBase -import hashToSchema import numpy as np -from karabo.bound import ( - BOOL_ELEMENT, - FLOAT_ELEMENT, - INPUT_CHANNEL, - INT32_ELEMENT, - KARABO_CLASSINFO, - NDARRAY_ELEMENT, - NODE_ELEMENT, - OUTPUT_CHANNEL, - STRING_ELEMENT, - UINT32_ELEMENT, - UINT64_ELEMENT, - VECTOR_STRING_ELEMENT, - VECTOR_UINT32_ELEMENT, - ChannelMetaData, - Epochstamp, - Hash, - MetricPrefix, - Schema, - Timestamp, - Trainstamp, - Unit, -) +from karabo.bound import KARABO_CLASSINFO from karabo.common.states import State -from . import shmem_utils -from . import utils - +from . import shmem_utils, utils from ._version import version as deviceVersion +from .base_correction import BaseCorrection from .dssc_gpu import DsscGpuRunner @KARABO_CLASSINFO("DsscCorrection", deviceVersion) -class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): - _dict_cache_slots = { - "applyCorrection", - "doAnything", - "dataFormat.memoryCells", - "dataFormat.memoryCellsCorrection", - "dataFormat.pixelsX", - "dataFormat.pixelsY", - "preview.enable", - "preview.pulse", - "preview.trainIdModulo", - "processingStateTimeout", - "performance.rateUpdateOnEachInput", - "state", - } - +class DsscCorrection(BaseCorrection): @staticmethod def expectedParameters(expected): DsscCorrection.addConstant( "Offset", "Dark", expected, optional=True, mandatoryForIteration=True ) - - ( - BOOL_ELEMENT(expected) - .key("doAnything") - .displayedName("Enable input processing") - .description( - "Toggle handling of input (at all). If False, the input handler of " - "this device will be skipped. Useful to decrease logspam if device is " - "misconfigured." - ) - .assignmentOptional() - .defaultValue(True) - .reconfigurable() - .commit(), - BOOL_ELEMENT(expected) - .key("applyCorrection") - .displayedName("Enable correction(s)") - .description( - "Toggle whether not correction(s) are applied to image data. If " - "false, this device still reshapes data to output shape, applies the " - "pulse filter, and casts to output dtype. Useful if constants are " - "missing / bad, or if data is sent to application doing its own " - "correction." - ) - .assignmentOptional() - .defaultValue(True) - .reconfigurable() - .commit(), - INPUT_CHANNEL(expected).key("dataInput").commit(), - # note: output schema not set, will be updated to match data later - OUTPUT_CHANNEL(expected).key("dataOutput").commit(), - VECTOR_STRING_ELEMENT(expected) - .key("fastSources") - .displayedName("Fast data sources") - .description( - "Sources to get data from. Only incoming hashes from these sources " - "will be processed." - ) - .assignmentMandatory() - .commit(), - STRING_ELEMENT(expected) - .key("pulseFilter") - .displayedName("[disabled] Pulse filter") - .description( - "Filter pulses: will be evaluated as array of indices to keep from " - "data. Can be anything which can be turned into numpy uint16 array. " - "Numpy is available as np. Take care not to include duplicates. If " - "empty, will not filter at all." - ) - .readOnly() - .initialValue("") - .commit(), - UINT32_ELEMENT(expected) - .key("outputShmemBufferSize") - .displayedName("Output buffer size limit (GB)") - .description( - "Corrected trains are written to shared memory locations. These are " - "pre-allocated and re-used. This parameter determines how big (number " - "of GB) the circular buffer will be." - ) - .assignmentOptional() - .defaultValue(10) - .commit(), - ) - - ( - NODE_ELEMENT(expected) - .key("dataFormat") - .displayedName("Data format (in/out)") - .commit(), - STRING_ELEMENT(expected) - .key("dataFormat.inputImageDtype") - .displayedName("Input image data dtype") - .description("The (numpy) dtype to expect for incoming image data.") - .options("uint16,float32") - .assignmentOptional() - .defaultValue("uint16") - .commit(), - STRING_ELEMENT(expected) - .key("dataFormat.outputImageDtype") - .displayedName("Output image data dtype") - .description( - "The (numpy) dtype to use for outgoing image data. Input is " - "cast to float32, corrections are applied, and only then will " - "the result be cast back to outputImageDtype (all on GPU)." - ) - .options("float16,float32,uint16") - .assignmentOptional() - .defaultValue("float32") - .commit(), - # important: shape of data as going into correction - UINT32_ELEMENT(expected) - .key("dataFormat.pixelsX") - .displayedName("Pixels x") - .description("Number of pixels of image data along X axis") - .assignmentMandatory() - .commit(), - UINT32_ELEMENT(expected) - .key("dataFormat.pixelsY") - .displayedName("Pixels y") - .description("Number of pixels of image data along Y axis") - .assignmentMandatory() - .commit(), - UINT32_ELEMENT(expected) - .key("dataFormat.memoryCells") - .displayedName("Memory cells") - .description("Full number of memory cells in incoming data") - .assignmentMandatory() - .commit(), - STRING_ELEMENT(expected) - .key("dataFormat.outputAxisOrder") - .displayedName("Output axis order") - .description( - "Axes of main data output can be reordered after correction. Choose " - "between 'pixels-fast' (memory_cell, x, y), 'memorycells-fast' " - "(x, y, memory_cell), and 'no-reshape' (memory_cell, y, x)" - ) - .options("pixels-fast,memorycells-fast,no-reshape") - .assignmentOptional() - .defaultValue("pixels-fast") - .commit(), - UINT32_ELEMENT(expected) - .key("dataFormat.memoryCellsCorrection") - .displayedName("(Debug) Memory cells in correction map") - .description( - "Full number of memory cells in currently loaded correction map. " - "May exceed memory cell number in input if veto is on. " - "This value just displayed for debugging." - ) - .readOnly() - .initialValue(0) - .commit(), - VECTOR_UINT32_ELEMENT(expected) - .key("dataFormat.inputDataShape") - .displayedName("Input data shape") - .description( - "Image data shape in incoming data (from reader / DAQ). This value is " - "computed from pixelsX, pixelsY, and memoryCells - this field just " - "shows you what is currently expected." - ) - .readOnly() - .initialValue([]) - .commit(), - VECTOR_UINT32_ELEMENT(expected) - .key("dataFormat.outputDataShape") - .displayedName("Output data shape") - .description( - "Image data shape for data output from this device. This value is " - "computed from pixelsX, pixelsY, and the size of the pulse filter - " - "this field just shows what is currently expected." - ) - .readOnly() - .initialValue([]) - .commit(), - ) - - preview_schema = Schema() - ( - NODE_ELEMENT(expected).key("preview").displayedName("Preview").commit(), - NODE_ELEMENT(preview_schema).key("data").commit(), - NDARRAY_ELEMENT(preview_schema).key("data.adc").dtype("FLOAT").commit(), - OUTPUT_CHANNEL(expected) - .key("preview.outputRaw") - .dataSchema(preview_schema) - .commit(), - OUTPUT_CHANNEL(expected) - .key("preview.outputCorrected") - .dataSchema(preview_schema) - .commit(), - BOOL_ELEMENT(expected) - .key("preview.enable") - .displayedName("Enable preview data generation") - .assignmentOptional() - .defaultValue(True) - .reconfigurable() - .commit(), - INT32_ELEMENT(expected) - .key("preview.pulse") - .displayedName("Pulse (or stat) for preview") - .description( - "If this value is ≥ 0, the corresponding index from data will be " - "sliced for the preview. If this value is ≤ 0, preview will be one of " - "the following stats:" - "-1: max, " - "-2: mean, " - "-3: sum, " - "-4: stdev. " - "Max means selecting the pulse with the maximum integrated value. The " - "others are computed across all filtered pulses in the train." - ) - .assignmentOptional() - .defaultValue(0) - .reconfigurable() - .commit(), - UINT32_ELEMENT(expected) - .key("preview.trainIdModulo") - .displayedName("Train modulo for throttling") - .description( - "Preview will only be generated for trains whose ID modulo this " - "number is zero. Higher values means fewer preview updates. Should be " - "adjusted based on input rate. Keep in mind that the GUI has limited " - "refresh rate anyway and that network is precious." - ) - .assignmentOptional() - .defaultValue(6) - .reconfigurable() - .commit(), - ) - - ( - NODE_ELEMENT(expected) - .key("performance") - .displayedName("Performance measures") - .commit(), - FLOAT_ELEMENT(expected) - .key("performance.rateUpdateInterval") - .displayedName("Rate update interval") - .description( - "Maximum interval (seconds) between updates of the rate. Mostly " - "relevant if not rateUpdateOnEachInput or if input is slow." - ) - .assignmentOptional() - .defaultValue(1) - .reconfigurable() - .commit(), - FLOAT_ELEMENT(expected) - .key("performance.rateBufferSpan") - .displayedName("Rate measurement buffer span") - .description("Event buffer timespan (in seconds) for measuring rate") - .assignmentOptional() - .defaultValue(20) - .reconfigurable() - .commit(), - BOOL_ELEMENT(expected) - .key("performance.rateUpdateOnEachInput") - .displayedName("Update rate on each input") - .description( - "Whether or not to update the device rate for each input (otherwise " - "only based on rateUpdateInterval). Note that processed trains are " - "always registered - this just impacts when the rate is computed " - "based on this." - ) - .assignmentOptional() - .defaultValue(False) - .reconfigurable() - .commit(), - FLOAT_ELEMENT(expected) - .key("processingStateTimeout") - .description( - "Timeout after which the device goes from PROCESSING back to ACTIVE " - "if no new input is processed" - ) - .assignmentOptional() - .defaultValue(10) - .reconfigurable() - .commit(), - # just measurements and counters to display - UINT64_ELEMENT(expected) - .key("trainId") - .displayedName("Train ID") - .description("ID of latest train processed by this device.") - .readOnly() - .initialValue(0) - .commit(), - FLOAT_ELEMENT(expected) - .key("performance.lastProcessingDuration") - .displayedName("Processing time") - .description( - "Amount of time spent in processing latest train. Time includes " - "generating preview and sending data." - ) - .unit(Unit.SECOND) - .metricPrefix(MetricPrefix.MILLI) - .readOnly() - .initialValue(0) - .commit(), - FLOAT_ELEMENT(expected) - .key("performance.rate") - .displayedName("Rate") - .description( - "Actual rate with which this device gets / processes / sends trains" - ) - .unit(Unit.HERTZ) - .readOnly() - .initialValue(0) - .commit(), - FLOAT_ELEMENT(expected) - .key("performance.theoreticalRate") - .displayedName("Processing rate (hypothetical)") - .description( - "Rate with which this device could hypothetically process trains. " - "Based on lastProcessingDuration." - ) - .unit(Unit.HERTZ) - .readOnly() - .initialValue(float("NaN")) - .warnLow(10) - .info("Processing not fast enough for full speed") - .needsAcknowledging(False) - .commit(), - ) + super(DsscCorrection, DsscCorrection).expectedParameters(expected) def __init__(self, config): - self._dict_cache = {k: config.get(k) for k in self._dict_cache_slots} super().__init__(config) - - self.KARABO_ON_DATA("dataInput", self.process_input) - self.KARABO_ON_EOS("dataInput", self.handle_eos) - - self.sources = set(config.get("fastSources")) - - self.input_data_dtype = getattr(np, config.get("dataFormat.inputImageDtype")) - self.output_data_dtype = getattr(np, config.get("dataFormat.outputImageDtype")) output_axis_order = config.get("dataFormat.outputAxisOrder") if output_axis_order == "pixels-fast": self._output_transpose = (0, 2, 1) @@ -380,7 +31,6 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): self._output_transpose = None self._offset_map = None self._update_pulse_filter(config.get("pulseFilter")) - self._shmem_buffer = None self._update_shapes( config.get("dataFormat.pixelsX"), config.get("dataFormat.pixelsY"), @@ -388,44 +38,11 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): self.pulse_filter, self._output_transpose, ) - self._has_set_output_schema = False - self._rate_tracker = calibrationBase.utils.UpdateRate( - interval=config.get("performance.rateBufferSpan") - ) - self._state_reset_timer = None - - self._buffered_status_update = Hash( - "trainId", - 0, - "performance.rate", - 0, - "performance.theoreticalRate", - float("NaN"), - "performance.lastProcessingDuration", - 0, - ) - self._rate_update_timer = utils.RepeatingTimer( - interval=config.get("performance.rateUpdateInterval"), - callback=self._update_actual_rate, - ) - self._buffer_lock = threading.Lock() self.updateState(State.ON) - def get(self, key): - if key in self._dict_cache_slots: - return self._dict_cache.get(key) - else: - return super().get(key) - - def set(self, *args): - if len(args) == 2: - key, value = args - if key in self._dict_cache_slots: - self._dict_cache[key] = value - super().set(*args) - def preReconfigure(self, config): + super().preReconfigure(config) if config.has("pulseFilter"): with self._buffer_lock: # apply new pulse filter @@ -439,22 +56,6 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): self.pulse_filter, ) - if config.has("performance.rateUpdateInterval"): - self._rate_update_timer.stop() - self._rate_update_timer = utils.RepeatingTimer( - interval=config.get("performance.rateUpdateInterval"), - callback=self._update_actual_rate, - ) - - if config.has("performance.rateBufferSpan"): - self._rate_tracker = calibrationBase.utils.UpdateRate( - interval=config.get("performance.rateBufferSpan") - ) - - for path in config.getPaths(): - if path in self._dict_cache_slots: - self._dict_cache[path] = config.get(path) - def process_input(self, data, metadata): """Registered for dataInput, handles all processing and sending @@ -592,9 +193,9 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): data.set("image.cellId", cell_table[:, np.newaxis]) data.set("image.pulseId", pulse_table[:, np.newaxis]) data.set("calngShmemPaths", ["image.data"]) - self.write_output(data, metadata) + self._write_output(data, metadata) if do_generate_preview: - self.write_combiner_preview( + self._write_combiner_preview( preview_raw, preview_corrected, train_id, source ) @@ -608,65 +209,6 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): if self.get("performance.rateUpdateOnEachInput"): self._update_actual_rate() - def handle_eos(self, channel): - self._has_set_output_schema = False - self.updateState(State.ON) - self.signalEndOfStream("dataOutput") - - def write_output(self, data, old_metadata): - metadata = ChannelMetaData( - old_metadata.get("source"), - Timestamp.fromHashAttributes(old_metadata.getAttributes("timestamp")), - ) - - if "image.passport" not in data: - data["image.passport"] = [] - data["image.passport"].append(self.getInstanceId()) - - if not self._has_set_output_schema: - self.updateState(State.CHANGING) - self._update_output_schema(data) - self.updateState(State.PROCESSING) - - channel = self.signalSlotable.getOutputChannel("dataOutput") - channel.write(data, metadata, False) - channel.update() - - def write_combiner_preview(self, data_raw, data_corrected, train_id, source): - # TODO: take into account updated pulse table after pulse filter - preview_hash = Hash() - preview_hash.set("image.passport", [self.getInstanceId()]) - preview_hash.set("image.trainId", train_id) - preview_hash.set("image.pulseId", self.get("preview.pulse")) - - # note: have to construct because setting .tid after init is broken - timestamp = Timestamp(Epochstamp(), Trainstamp(train_id)) - metadata = ChannelMetaData(source, timestamp) - for channel_name, data in ( - ("preview.outputRaw", data_raw), - ("preview.outputCorrected", data_corrected), - ): - preview_hash.set("data.adc", data[..., np.newaxis]) - channel = self.signalSlotable.getOutputChannel(channel_name) - channel.write(preview_hash, metadata, False) - channel.update() - - def getConstant(self, name): - """Hacky override of getConstant to actually return None on failure - - Full function is from CalibrationReceiverBaseDevice - - """ - - const = super().getConstant(name) - if const is not None and len(const.shape) == 1: - self.log.WARN( - f"Constant {name} should probably be None, but is array" - f" of size {const.size}, shape {const.shape}" - ) - const = None - return const - def constantLoaded(self): """Hook from CalibrationReceiverBaseDevice called after each getConstant @@ -718,37 +260,6 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): self._update_maps_on_gpu() - def registerManager(self, instance_id): - """A hook from stream.py for Manager devices to register themselves - - instance_id should be the instance id of the manager device. The - registration is currently not really used I think. - - """ - - self.managerInstance = instance_id - self.log.INFO(f"Registered calibration manager {instance_id}") - - def _update_output_schema(self, data): - """Updates the schema of dataOutput based on parameter data (a Hash) - - This should only be called once: when handling output for the first - time, we update the schema to match the modified data we'd send. - - """ - - self.log.INFO("Updating output schema") - my_schema_update = Schema() - data_schema = hashToSchema.HashToSchema(data).schema - ( - OUTPUT_CHANNEL(my_schema_update) - .key("dataOutput") - .dataSchema(data_schema) - .commit() - ) - self.updateSchema(my_schema_update) - self._has_set_output_schema = True - def _update_pulse_filter(self, filter_string): """Called whenever the pulse filter changes, typically followed by _update_shapes""" @@ -815,24 +326,3 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): self.set("status", msg) self.updateState(State.ON) - - def _reset_state_from_processing(self): - if self.get("state") is State.PROCESSING: - self.updateState(State.ON) - self._state_reset_timer = None - - def _update_actual_rate(self): - if not self.get("state") is State.PROCESSING: - self._rate_update_timer.delay() - return - self._buffered_status_update.set("performance.rate", self._rate_tracker.rate()) - last_processing = self._buffered_status_update.get( - "performance.lastProcessingDuration" - ) - if last_processing > 0: - theoretical_rate = 1000 / last_processing - self._buffered_status_update.set( - "performance.theoreticalRate", theoretical_rate - ) - self.set(self._buffered_status_update) - self._rate_update_timer.delay() diff --git a/src/calng/agipd_gpu.py b/src/calng/agipd_gpu.py new file mode 100644 index 0000000000000000000000000000000000000000..110939ec8509c5ddc322081423bc83b4a7537780 --- /dev/null +++ b/src/calng/agipd_gpu.py @@ -0,0 +1,143 @@ +import enum + +import cupy +import numpy as np + +from . import base_gpu, utils + + +class CorrectionFlags(enum.IntFlag): + THRESHOLD = 1 + OFFSET = 2 + BLSHIFT = 4 + REL_GAIN_PC = 8 + REL_GAIN_XRAY = 16 + BPMASK = 32 + + +class AgipdGpuRunner(base_gpu.BaseGpuRunner): + _kernel_source_filename = "agipd_gpu_kernels.cpp" + + def __init__( + self, + pixels_x, + pixels_y, + memory_cells, + output_transpose=(1, 2, 0), # default: memorycells-fast + constant_memory_cells=None, + input_data_dtype=np.uint16, + output_data_dtype=np.float32, + ): + self.input_shape = (memory_cells, 2, pixels_x, pixels_y) + self.processed_shape = (memory_cells, pixels_x, pixels_y) + super().__init__( + pixels_x, + pixels_y, + memory_cells, + output_transpose, + constant_memory_cells, + input_data_dtype, + output_data_dtype, + ) + self.gain_map_gpu = cupy.empty(self.processed_shape, dtype=np.uint8) + + self.map_shape = (self.pixels_x, self.pixels_y, self.constant_memory_cells) + self.gm_map_shape = self.map_shape + (3,) # for gain-mapped constants + self.threshold_map_shape = self.map_shape + (2,) + # constants + self.gain_thresholds_gpu = cupy.empty( + self.threshold_map_shape, dtype=np.float32 + ) + self.offset_map_gpu = cupy.zeros(self.gm_map_shape, dtype=np.float32) + self.rel_gain_pc_map_gpu = cupy.ones(self.gm_map_shape, dtype=np.float32) + # optional extra offset for medium gain + self.md_additional_offset_gpu = cupy.zeros(1, dtype=np.float32) + self.rel_gain_xray_map_gpu = cupy.ones(self.map_shape, dtype=np.float32) + self.badpixel_map_gpu = cupy.zeros(self.map_shape, dtype=np.uint32) + + self.update_block_size((1, 1, 64)) + + def load_thresholds(self, threshold_map): + # shape: y, x, memory cell, threshold 0 / threshold 1 / 3 gain values + # TODO: do we need the gain values for anything? + to_set = np.transpose(threshold_map[..., :2], (1, 0, 2, 3)).astype(np.float32) + self.gain_thresholds_gpu.set( + to_set + ) + + def load_offset_map(self, offset_map): + # shape: y, x, memory cell, gain stage + self.offset_map_gpu.set( + np.transpose(offset_map, (1, 0, 2, 3)).astype(np.float32) + ) + + def load_rel_gain_pc_map(self, slopes_pc_map): + # pc has funny shape (11, 352, 128, 512) from file + # this is (fi, memory cell, y, x) + pc_high_m = slopes_pc_map[0] + pc_high_I = slopes_pc_map[1] + pc_med_m = slopes_pc_map[3] + pc_med_I = slopes_pc_map[4] + frac_high_med = pc_high_m / pc_med_m + # TODO: verify formula + md_additional_offset = pc_high_I - pc_med_I * frac_high_med + self.rel_gain_pc_map_gpu[..., 0] = 1 # rel xray gain can come after + self.rel_gain_pc_map_gpu[..., 1] = self.rel_gain_pc_map[..., 0] * frac_high_med + self.rel_gain_pc_map_gpu[..., 2] = self.rel_gain_pc_map[..., 1] * 4.48 + # TODO: enable overriding this based on user input + self.md_additional_offset_gpu.set(md_additional_offset) + + def load_rel_gain_ff_map(self, slopes_ff_map): + # constant shape: y, x, memory cell + if slopes_ff_map.shape[2] == 2: + # old format, is per pixel only + raise NotImplementedError("Old slopes FF map format") + self.rel_gain_xray_map_gpu.set( + np.transpose(slopes_ff_map, (1, 0, 2)).astype(np.float32) + ) + + def correct(self, flags): + """Apply corrections to data (must load constant, data, and cell_table first) + + Applies corrections to input data and casts to desired output dtype. + Parameter cell_table allows out of order or non-contiguous memory cells + in input data. Both input ndarrays are assumed to be on GPU already, + preferably wrapped in GPU arrays (cupy array). + + Will return string encoded handle to shared memory output buffer and + (view of) said buffer as an ndarray. Keep in mind that the output + buffers will get overwritten eventually (circular buffer). + """ + self.correction_kernel( + self.full_grid, + self.full_block, + ( + self.input_data_gpu, + self.cell_table_gpu, + np.uint8(flags), + self.gain_thresholds_gpu, + self.offset_map_gpu, + self.rel_gain_pc_map_gpu, + self.md_additional_offset_gpu, + self.rel_gain_xray_map_gpu, + self.badpixel_map_gpu, + self.gain_map_gpu, + self.processed_data_gpu, + ), + ) + + def _init_kernels(self): + kernel_source = self._kernel_template.render( + { + "pixels_x": self.pixels_x, + "pixels_y": self.pixels_y, + "data_memory_cells": self.memory_cells, + "constant_memory_cells": self.constant_memory_cells, + "input_data_dtype": utils.np_dtype_to_c_type(self.input_data_dtype), + "output_data_dtype": utils.np_dtype_to_c_type(self.output_data_dtype), + } + ) + print(kernel_source) + self.source_module = cupy.RawModule(code=kernel_source) + self.correction_kernel = self.source_module.get_function("correct") + self.casting_kernel = self.source_module.get_function("only_cast") diff --git a/src/calng/agipd_gpu_kernels.cpp b/src/calng/agipd_gpu_kernels.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1212e00b14b89118a92751bdd4727afc12ed72da --- /dev/null +++ b/src/calng/agipd_gpu_kernels.cpp @@ -0,0 +1,147 @@ +#include <cuda_fp16.h> +#include <math_constants.h> + +const unsigned char CORR_THRESHOLD = 1; +const unsigned char CORR_OFFSET = 2; +const unsigned char CORR_BLSHIFT = 4; +const unsigned char CORR_REL_GAIN_PC = 8; +const unsigned char CORR_REL_GAIN_XRAY = 16; +const unsigned char CORR_BPMASK = 32; + +extern "C" { + /* + Perform correction: offset + Take cell_table into account when getting correction values + Converting to float for doing the correction + Converting to output dtype at the end + */ + __global__ void correct(const {{input_data_dtype}}* data, + const unsigned short* cell_table, + const unsigned char corr_flags, + const float* threshold_map, + const float* offset_map, + const float* rel_gain_pc_map, + const float md_additional_offset, + const float* rel_gain_xray_map, + const unsigned int* bad_pixel_map, + unsigned char* gain_map, + {{output_data_dtype}}* output) { + const size_t X = {{pixels_x}}; + const size_t Y = {{pixels_y}}; + const size_t input_cells = {{data_memory_cells}}; + const size_t map_cells = {{constant_memory_cells}}; + + const size_t cell = blockIdx.x * blockDim.x + threadIdx.x; + const size_t x = blockIdx.y * blockDim.y + threadIdx.y; + const size_t y = blockIdx.z * blockDim.z + threadIdx.z; + + if (cell >= input_cells || y >= Y || x >= X) { + return; + } + + // data shape: memory cell, data/raw_gain (dim size 2), x, y + const size_t data_stride_y = 1; + const size_t data_stride_x = Y * data_stride_y; + const size_t data_stride_raw_gain = X * data_stride_x; + const size_t data_stride_cell = 2 * data_stride_raw_gain; + const size_t data_index = cell * data_stride_cell + + 0 * data_stride_raw_gain + + y * data_stride_y + + x * data_stride_x; + const size_t raw_gain_index = cell * data_stride_cell + + 1 * data_stride_raw_gain + + y * data_stride_y + + x * data_stride_x; + float corrected = (float)data[data_index]; + const float raw_gain_val = (float)data[raw_gain_index]; + + const size_t output_stride_y = 1; + const size_t output_stride_x = output_stride_y * Y; + const size_t output_stride_cell = output_stride_x * X; + const size_t output_index = cell * output_stride_cell + x * output_stride_x + y * output_stride_y; + + // threshold constant shape: x, y, cell, threshold (dim size 2) + const size_t threshold_map_stride_threshold = 1; + const size_t threshold_map_stride_cell = 2 * threshold_map_stride_threshold; + const size_t threshold_map_stride_y = map_cells * threshold_map_stride_cell; + const size_t threshold_map_stride_x = Y * threshold_map_stride_y; + + // gain mapped constant shape: x, y, memory cell, gain_level (dim size 3) + const size_t gm_map_stride_gain = 1; + const size_t gm_map_stride_cell = 3 * gm_map_stride_gain; + const size_t gm_map_stride_y = map_cells * gm_map_stride_cell; + const size_t gm_map_stride_x = Y * gm_map_stride_y; + // TODO: handle multiple maps, multiple strides, multiple limits + + const size_t map_cell = cell_table[cell]; + + if (map_cell < map_cells) { + unsigned char gain = 0; + if (corr_flags & CORR_THRESHOLD) { + const float threshold_0 = threshold_map[0 * threshold_map_stride_threshold + + map_cell * threshold_map_stride_cell + + y * threshold_map_stride_y + + x * threshold_map_stride_x]; + const float threshold_1 = threshold_map[1 * threshold_map_stride_threshold + + map_cell * threshold_map_stride_cell + + y * threshold_map_stride_y + + x * threshold_map_stride_x]; + // could consider making this const using ternaries / tiny function + if (raw_gain_val <= threshold_0) { + gain = 0; + } else if (raw_gain_val <= threshold_1) { + gain = 1; + } else { + gain = 2; + } + } + + const size_t gm_map_index = gain * gm_map_stride_gain + + map_cell * gm_map_stride_cell + + y * gm_map_stride_y + + x * gm_map_stride_x; + + if ((corr_flags & CORR_BPMASK) && bad_pixel_map[gm_map_index]) { + corrected = CUDART_NAN_F; + } else { + if (corr_flags & CORR_OFFSET) { + corrected -= offset_map[gm_map_index]; + } + // TODO: baseline shift + if (corr_flags & CORR_REL_GAIN_PC) { + corrected *= rel_gain_pc_map[gm_map_index]; + if (gain == 1) { + corrected += md_additional_offset; + } + } + if (corr_flags & CORR_REL_GAIN_XRAY) { + // TODO + //corrected *= rel_gain_xray_map[map_index]; + } + } + + gain_map[output_index] = gain; + {% if output_data_dtype == "half" %} + output[output_index] = __float2half(corrected); + {% else %} + output[output_index] = ({{output_data_dtype}})corrected; + {% endif %} + } else { + // TODO: decide what to do when we cannot threshold + gain_map[data_index] = 255; + {% if output_data_dtype == "half" %} + output[data_index] = __float2half(corrected); + {% else %} + output[data_index] = ({{output_data_dtype}})corrected; + {% endif %} + } + } + + /* + Same as correction, except don't do any correction + */ + __global__ void only_cast(const {{input_data_dtype}}* data, + unsigned char* gain_map, + {{output_data_dtype}}* output) { + } +} diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py new file mode 100644 index 0000000000000000000000000000000000000000..dfdc83e65e7a7e77ddecd04256d7056d5a75a47b --- /dev/null +++ b/src/calng/base_correction.py @@ -0,0 +1,515 @@ +import threading + +import calibrationBase +import hashToSchema +import numpy as np +from karabo.bound import ( + BOOL_ELEMENT, + FLOAT_ELEMENT, + INPUT_CHANNEL, + INT32_ELEMENT, + NDARRAY_ELEMENT, + NODE_ELEMENT, + OUTPUT_CHANNEL, + STRING_ELEMENT, + UINT32_ELEMENT, + UINT64_ELEMENT, + VECTOR_STRING_ELEMENT, + VECTOR_UINT32_ELEMENT, + ChannelMetaData, + Epochstamp, + Hash, + MetricPrefix, + Schema, + Timestamp, + Trainstamp, + Unit, +) +from karabo.common.states import State + +from . import shmem_utils, utils + + +class BaseCorrection(calibrationBase.CalibrationReceiverBaseDevice): + _dict_cache_slots = { + "applyCorrection", + "doAnything", + "dataFormat.memoryCells", + "dataFormat.memoryCellsCorrection", + "dataFormat.pixelsX", + "dataFormat.pixelsY", + "preview.enable", + "preview.pulse", + "preview.trainIdModulo", + "processingStateTimeout", + "performance.rateUpdateOnEachInput", + "state", + } + + @staticmethod + def expectedParameters(expected): + ( + BOOL_ELEMENT(expected) + .key("doAnything") + .displayedName("Enable input processing") + .description( + "Toggle handling of input (at all). If False, the input handler of " + "this device will be skipped. Useful to decrease logspam if device is " + "misconfigured." + ) + .assignmentOptional() + .defaultValue(True) + .reconfigurable() + .commit(), + BOOL_ELEMENT(expected) + .key("applyCorrection") + .displayedName("Enable correction(s)") + .description( + "Toggle whether not correction(s) are applied to image data. If " + "false, this device still reshapes data to output shape, applies the " + "pulse filter, and casts to output dtype. Useful if constants are " + "missing / bad, or if data is sent to application doing its own " + "correction." + ) + .assignmentOptional() + .defaultValue(True) + .reconfigurable() + .commit(), + INPUT_CHANNEL(expected).key("dataInput").commit(), + # note: output schema not set, will be updated to match data later + OUTPUT_CHANNEL(expected).key("dataOutput").commit(), + VECTOR_STRING_ELEMENT(expected) + .key("fastSources") + .displayedName("Fast data sources") + .description( + "Sources to get data from. Only incoming hashes from these sources " + "will be processed." + ) + .assignmentMandatory() + .commit(), + STRING_ELEMENT(expected) + .key("pulseFilter") + .displayedName("[disabled] Pulse filter") + .description( + "Filter pulses: will be evaluated as array of indices to keep from " + "data. Can be anything which can be turned into numpy uint16 array. " + "Numpy is available as np. Take care not to include duplicates. If " + "empty, will not filter at all." + ) + .readOnly() + .initialValue("") + .commit(), + UINT32_ELEMENT(expected) + .key("outputShmemBufferSize") + .displayedName("Output buffer size limit (GB)") + .description( + "Corrected trains are written to shared memory locations. These are " + "pre-allocated and re-used. This parameter determines how big (number " + "of GB) the circular buffer will be." + ) + .assignmentOptional() + .defaultValue(10) + .commit(), + ) + + ( + NODE_ELEMENT(expected) + .key("dataFormat") + .displayedName("Data format (in/out)") + .commit(), + STRING_ELEMENT(expected) + .key("dataFormat.inputImageDtype") + .displayedName("Input image data dtype") + .description("The (numpy) dtype to expect for incoming image data.") + .options("uint16,float32") + .assignmentOptional() + .defaultValue("uint16") + .commit(), + STRING_ELEMENT(expected) + .key("dataFormat.outputImageDtype") + .displayedName("Output image data dtype") + .description( + "The (numpy) dtype to use for outgoing image data. Input is " + "cast to float32, corrections are applied, and only then will " + "the result be cast back to outputImageDtype (all on GPU)." + ) + .options("float16,float32,uint16") + .assignmentOptional() + .defaultValue("float32") + .commit(), + # important: shape of data as going into correction + UINT32_ELEMENT(expected) + .key("dataFormat.pixelsX") + .displayedName("Pixels x") + .description("Number of pixels of image data along X axis") + .assignmentMandatory() + .commit(), + UINT32_ELEMENT(expected) + .key("dataFormat.pixelsY") + .displayedName("Pixels y") + .description("Number of pixels of image data along Y axis") + .assignmentMandatory() + .commit(), + UINT32_ELEMENT(expected) + .key("dataFormat.memoryCells") + .displayedName("Memory cells") + .description("Full number of memory cells in incoming data") + .assignmentMandatory() + .commit(), + STRING_ELEMENT(expected) + .key("dataFormat.outputAxisOrder") + .displayedName("Output axis order") + .description( + "Axes of main data output can be reordered after correction. Choose " + "between 'pixels-fast' (memory_cell, x, y), 'memorycells-fast' " + "(x, y, memory_cell), and 'no-reshape' (memory_cell, y, x)" + ) + .options("pixels-fast,memorycells-fast,no-reshape") + .assignmentOptional() + .defaultValue("pixels-fast") + .commit(), + UINT32_ELEMENT(expected) + .key("dataFormat.memoryCellsCorrection") + .displayedName("(Debug) Memory cells in correction map") + .description( + "Full number of memory cells in currently loaded correction map. " + "May exceed memory cell number in input if veto is on. " + "This value just displayed for debugging." + ) + .readOnly() + .initialValue(0) + .commit(), + VECTOR_UINT32_ELEMENT(expected) + .key("dataFormat.inputDataShape") + .displayedName("Input data shape") + .description( + "Image data shape in incoming data (from reader / DAQ). This value is " + "computed from pixelsX, pixelsY, and memoryCells - this field just " + "shows you what is currently expected." + ) + .readOnly() + .initialValue([]) + .commit(), + VECTOR_UINT32_ELEMENT(expected) + .key("dataFormat.outputDataShape") + .displayedName("Output data shape") + .description( + "Image data shape for data output from this device. This value is " + "computed from pixelsX, pixelsY, and the size of the pulse filter - " + "this field just shows what is currently expected." + ) + .readOnly() + .initialValue([]) + .commit(), + ) + + preview_schema = Schema() + ( + NODE_ELEMENT(expected).key("preview").displayedName("Preview").commit(), + NODE_ELEMENT(preview_schema).key("data").commit(), + NDARRAY_ELEMENT(preview_schema).key("data.adc").dtype("FLOAT").commit(), + OUTPUT_CHANNEL(expected) + .key("preview.outputRaw") + .dataSchema(preview_schema) + .commit(), + OUTPUT_CHANNEL(expected) + .key("preview.outputCorrected") + .dataSchema(preview_schema) + .commit(), + BOOL_ELEMENT(expected) + .key("preview.enable") + .displayedName("Enable preview data generation") + .assignmentOptional() + .defaultValue(True) + .reconfigurable() + .commit(), + INT32_ELEMENT(expected) + .key("preview.pulse") + .displayedName("Pulse (or stat) for preview") + .description( + "If this value is ≥ 0, the corresponding index from data will be " + "sliced for the preview. If this value is ≤ 0, preview will be one of " + "the following stats:" + "-1: max, " + "-2: mean, " + "-3: sum, " + "-4: stdev. " + "Max means selecting the pulse with the maximum integrated value. The " + "others are computed across all filtered pulses in the train." + ) + .assignmentOptional() + .defaultValue(0) + .reconfigurable() + .commit(), + UINT32_ELEMENT(expected) + .key("preview.trainIdModulo") + .displayedName("Train modulo for throttling") + .description( + "Preview will only be generated for trains whose ID modulo this " + "number is zero. Higher values means fewer preview updates. Should be " + "adjusted based on input rate. Keep in mind that the GUI has limited " + "refresh rate anyway and that network is precious." + ) + .assignmentOptional() + .defaultValue(6) + .reconfigurable() + .commit(), + ) + + ( + NODE_ELEMENT(expected) + .key("performance") + .displayedName("Performance measures") + .commit(), + FLOAT_ELEMENT(expected) + .key("performance.rateUpdateInterval") + .displayedName("Rate update interval") + .description( + "Maximum interval (seconds) between updates of the rate. Mostly " + "relevant if not rateUpdateOnEachInput or if input is slow." + ) + .assignmentOptional() + .defaultValue(1) + .reconfigurable() + .commit(), + FLOAT_ELEMENT(expected) + .key("performance.rateBufferSpan") + .displayedName("Rate measurement buffer span") + .description("Event buffer timespan (in seconds) for measuring rate") + .assignmentOptional() + .defaultValue(20) + .reconfigurable() + .commit(), + BOOL_ELEMENT(expected) + .key("performance.rateUpdateOnEachInput") + .displayedName("Update rate on each input") + .description( + "Whether or not to update the device rate for each input (otherwise " + "only based on rateUpdateInterval). Note that processed trains are " + "always registered - this just impacts when the rate is computed " + "based on this." + ) + .assignmentOptional() + .defaultValue(False) + .reconfigurable() + .commit(), + FLOAT_ELEMENT(expected) + .key("processingStateTimeout") + .description( + "Timeout after which the device goes from PROCESSING back to ACTIVE " + "if no new input is processed" + ) + .assignmentOptional() + .defaultValue(10) + .reconfigurable() + .commit(), + # just measurements and counters to display + UINT64_ELEMENT(expected) + .key("trainId") + .displayedName("Train ID") + .description("ID of latest train processed by this device.") + .readOnly() + .initialValue(0) + .commit(), + FLOAT_ELEMENT(expected) + .key("performance.lastProcessingDuration") + .displayedName("Processing time") + .description( + "Amount of time spent in processing latest train. Time includes " + "generating preview and sending data." + ) + .unit(Unit.SECOND) + .metricPrefix(MetricPrefix.MILLI) + .readOnly() + .initialValue(0) + .commit(), + FLOAT_ELEMENT(expected) + .key("performance.rate") + .displayedName("Rate") + .description( + "Actual rate with which this device gets / processes / sends trains" + ) + .unit(Unit.HERTZ) + .readOnly() + .initialValue(0) + .commit(), + FLOAT_ELEMENT(expected) + .key("performance.theoreticalRate") + .displayedName("Processing rate (hypothetical)") + .description( + "Rate with which this device could hypothetically process trains. " + "Based on lastProcessingDuration." + ) + .unit(Unit.HERTZ) + .readOnly() + .initialValue(float("NaN")) + .warnLow(10) + .info("Processing not fast enough for full speed") + .needsAcknowledging(False) + .commit(), + ) + + def __init__(self, config): + self._dict_cache = {k: config.get(k) for k in self._dict_cache_slots} + super().__init__(config) + + self.KARABO_ON_DATA("dataInput", self.process_input) + self.KARABO_ON_EOS("dataInput", self.handle_eos) + + self.sources = set(config.get("fastSources")) + + self.input_data_dtype = np.dtype(config.get("dataFormat.inputImageDtype")) + self.output_data_dtype = np.dtype(config.get("dataFormat.outputImageDtype")) + + self._shmem_buffer = None + self._has_set_output_schema = False + self._rate_tracker = calibrationBase.utils.UpdateRate( + interval=config.get("performance.rateBufferSpan") + ) + self._state_reset_timer = None + + self._buffered_status_update = Hash( + "trainId", + 0, + "performance.rate", + 0, + "performance.theoreticalRate", + float("NaN"), + "performance.lastProcessingDuration", + 0, + ) + self._rate_update_timer = utils.RepeatingTimer( + interval=config.get("performance.rateUpdateInterval"), + callback=self._update_actual_rate, + ) + self._buffer_lock = threading.Lock() + + def preReconfigure(self, config): + if config.has("performance.rateUpdateInterval"): + self._rate_update_timer.stop() + self._rate_update_timer = utils.RepeatingTimer( + interval=config.get("performance.rateUpdateInterval"), + callback=self._update_actual_rate, + ) + + if config.has("performance.rateBufferSpan"): + self._rate_tracker = calibrationBase.utils.UpdateRate( + interval=config.get("performance.rateBufferSpan") + ) + + for path in config.getPaths(): + if path in self._dict_cache_slots: + self._dict_cache[path] = config.get(path) + + def get(self, key): + if key in self._dict_cache_slots: + return self._dict_cache.get(key) + else: + return super().get(key) + + def set(self, *args): + if len(args) == 2: + key, value = args + if key in self._dict_cache_slots: + self._dict_cache[key] = value + super().set(*args) + + def _write_output(self, data, old_metadata): + metadata = ChannelMetaData( + old_metadata.get("source"), + Timestamp.fromHashAttributes(old_metadata.getAttributes("timestamp")), + ) + + if "image.passport" not in data: + data["image.passport"] = [] + data["image.passport"].append(self.getInstanceId()) + + if not self._has_set_output_schema: + self.updateState(State.CHANGING) + self._update_output_schema(data) + self.updateState(State.PROCESSING) + + channel = self.signalSlotable.getOutputChannel("dataOutput") + channel.write(data, metadata, False) + channel.update() + + def _write_combiner_preview(self, data_raw, data_corrected, train_id, source): + # TODO: take into account updated pulse table after pulse filter + preview_hash = Hash() + preview_hash.set("image.passport", [self.getInstanceId()]) + preview_hash.set("image.trainId", train_id) + preview_hash.set("image.pulseId", self.get("preview.pulse")) + + # note: have to construct because setting .tid after init is broken + timestamp = Timestamp(Epochstamp(), Trainstamp(train_id)) + metadata = ChannelMetaData(source, timestamp) + for channel_name, data in ( + ("preview.outputRaw", data_raw), + ("preview.outputCorrected", data_corrected), + ): + preview_hash.set("data.adc", data[..., np.newaxis]) + channel = self.signalSlotable.getOutputChannel(channel_name) + channel.write(preview_hash, metadata, False) + channel.update() + + def _update_output_schema(self, data): + """Updates the schema of dataOutput based on parameter data (a Hash) + + This should only be called once: when handling output for the first + time, we update the schema to match the modified data we'd send. + + """ + + self.log.INFO("Updating output schema") + my_schema_update = Schema() + data_schema = hashToSchema.HashToSchema(data).schema + ( + OUTPUT_CHANNEL(my_schema_update) + .key("dataOutput") + .dataSchema(data_schema) + .commit() + ) + self.updateSchema(my_schema_update) + self._has_set_output_schema = True + + def _reset_state_from_processing(self): + if self.get("state") is State.PROCESSING: + self.updateState(State.ON) + self._state_reset_timer = None + + def _update_actual_rate(self): + if not self.get("state") is State.PROCESSING: + self._rate_update_timer.delay() + return + self._buffered_status_update.set("performance.rate", self._rate_tracker.rate()) + last_processing = self._buffered_status_update.get( + "performance.lastProcessingDuration" + ) + if last_processing > 0: + theoretical_rate = 1000 / last_processing + self._buffered_status_update.set( + "performance.theoreticalRate", theoretical_rate + ) + self.set(self._buffered_status_update) + self._rate_update_timer.delay() + + def handle_eos(self, channel): + self._has_set_output_schema = False + self.updateState(State.ON) + self.signalEndOfStream("dataOutput") + + def getConstant(self, name): + """Hacky override of getConstant to actually return None on failure + + Full function is from CalibrationReceiverBaseDevice + + """ + + const = super().getConstant(name) + if const is not None and len(const.shape) == 1: + self.log.WARN( + f"Constant {name} should probably be None, but is array" + f" of size {const.size}, shape {const.shape}" + ) + const = None + return const diff --git a/src/calng/base_gpu.py b/src/calng/base_gpu.py new file mode 100644 index 0000000000000000000000000000000000000000..0c1d2cd763abcc9ef980f5f52e58e473c7686fa6 --- /dev/null +++ b/src/calng/base_gpu.py @@ -0,0 +1,187 @@ +import pathlib + +import cupy +import cupyx +import jinja2 +import numpy as np + +from . import utils + + +class BaseGpuRunner: + """Class to handle instantiation and execution of CUDA kernels on trains + + All GPU buffers are kept within this class. This generally means that you will + want to load data into it and then do something. Typical usage in correct order: + + 1. instantiate + 2. load_constants + 3. load_data + 4. load_cell_table + 5. correct + 6a. reshape (only here does data transfer back to host) + 6b. compute_preview (optional) + + repeat from 2. or 3. + + In case no constants are available / correction is not desired, can skip 3 and 4 + and use only_cast in step 5 instead of correct (taking care to call + compute_preview with parameters set accordingly). + """ + + def __init__( + self, + pixels_x, + pixels_y, + memory_cells, + output_transpose=(2, 1, 0), # default: memorycells-fast + constant_memory_cells=None, + input_data_dtype=np.uint16, + output_data_dtype=np.float32, + ): + _src_dir = pathlib.Path(__file__).absolute().parent + # subclass must define _kernel_source_filename + with (_src_dir / self._kernel_source_filename).open("r") as fd: + self._kernel_template = jinja2.Template(fd.read()) + + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.memory_cells = memory_cells + self.output_transpose = output_transpose + if constant_memory_cells is None: + self.constant_memory_cells = memory_cells + else: + self.constant_memory_cells = constant_memory_cells + self.output_shape = utils.shape_after_transpose( + self.processed_shape, self.output_transpose + ) + # preview will only be single memory cell + self.preview_shape = (self.pixels_x, self.pixels_y) + self.input_data_dtype = input_data_dtype + self.output_data_dtype = output_data_dtype + + self._init_kernels() + + # reuse output arrays + self.cell_table_gpu = cupy.empty(self.memory_cells, dtype=np.uint16) + self.input_data_gpu = cupy.empty(self.input_shape, dtype=input_data_dtype) + self.processed_data_gpu = cupy.empty(self.input_shape, dtype=output_data_dtype) + self.reshaped_data_gpu = cupy.empty(self.output_shape, dtype=output_data_dtype) + self.preview_raw = cupyx.empty_pinned(self.preview_shape, dtype=np.float32) + self.preview_corrected = cupyx.empty_pinned( + self.preview_shape, dtype=np.float32 + ) + + def only_cast(self): + """Like correct without the correction + + This currently means just casting to output dtype. + """ + self.casting_kernel( + self.full_grid, + self.full_block, + ( + self.input_data_gpu, + self.processed_data_gpu, + ), + ) + + def reshape(self, out=None): + """Move axes to desired output order + + The out parameter is passed directly to the get function of GPU array: if + None, then a new ndarray (in host memory) is returned. If not None, then data + will be loaded into the provided array, which must match shape / dtype. + """ + # TODO: avoid copy + if self.output_transpose is None: + self.reshaped_data_gpu = cupy.ascontiguousarray( + cupy.squeeze(self.processed_data_gpu) + ) + else: + self.reshaped_data_gpu = cupy.ascontiguousarray( + cupy.transpose( + cupy.squeeze(self.processed_data_gpu), self.output_transpose + ) + ) + return self.reshaped_data_gpu.get(out=out) + + def load_data(self, raw_data): + print(raw_data.shape) + print(self.input_data_gpu.shape) + self.input_data_gpu.set(np.squeeze(raw_data)) + + def load_cell_table(self, cell_table): + self.cell_table_gpu.set(cell_table) + + def compute_preview(self, preview_index, have_corrected=True, can_correct=True): + """Generate single slice or reduction preview of raw and corrected data + + Special values of preview_index are -1 for max, -2 for mean, -3 for + sum, and -4 for stdev (across cells). + + Note that preview_index is taken from data without checking cell table. + Caller has to figure out which index along memory cell dimension they + actually want to preview. + + Can reuse data from corrected output buffer with have_corrected parameter. + Note that preview requires relevant data to be loaded (raw data for raw + preview, correction map and cell table in addition for corrected preview). + """ + + if preview_index < -4: + raise ValueError(f"No statistic with code {preview_index} defined") + elif preview_index >= self.memory_cells: + raise ValueError(f"Memory cell index {preview_index} out of range") + + if (not have_corrected) and can_correct: + self.correct() + # if not have_corrected and not can_correct, assume only_cast already done + + # TODO: enum around reduction type + for (image_data, output_buffer) in ( + (self.input_data_gpu, self.preview_raw), + (self.processed_data_gpu, self.preview_corrected), + ): + if preview_index >= 0: + # TODO: change axis order when moving reshape to after correction + image_data[preview_index].astype(np.float32).transpose().get( + out=output_buffer + ) + elif preview_index == -1: + # TODO: select argmax independently for raw and corrected? + # TODO: send frame sums somewhere to compute global max frame + max_index = cupy.argmax( + cupy.sum(image_data, axis=(1, 2), dtype=cupy.float32) + ) + image_data[max_index].astype(np.float32).transpose().get( + out=output_buffer + ) + elif preview_index in (-2, -3, -4): + stat_fun = {-2: cupy.mean, -3: cupy.sum, -4: cupy.std}[preview_index] + stat_fun(image_data, axis=0, dtype=cupy.float32).transpose().get( + out=output_buffer + ) + return self.preview_raw, self.preview_corrected + + def update_block_size(self, full_block, target_shape=None): + """Compute grid such that thread block grid covers target shape + + Execution is scheduled with 3d "blocks" of CUDA threads, tuning can affect + performance. Correction kernels are "monolithic" for simplicity (i.e. each + logical thread handles one entry in output data), so in each dimension we + parallelize, grid * block >= length. + + Note that individual kernels must themselves check whether they go out of + bounds; grid dimensions get rounded up in case ndarray size is not multiple of + block size. + + """ + if target_shape is None: + target_shape = self.processed_shape + assert len(full_block) == 3 + self.full_block = tuple(full_block) + self.full_grid = tuple( + utils.ceil_div(a_length, block_length) + for (a_length, block_length) in zip(target_shape, full_block) + ) diff --git a/src/calng/dssc_gpu.py b/src/calng/dssc_gpu.py index 61b00655fe77026472d203224d502edee406433c..38980e78235dd07ea8d652105e57403ba55362ab 100644 --- a/src/calng/dssc_gpu.py +++ b/src/calng/dssc_gpu.py @@ -1,37 +1,11 @@ -import pathlib - import cupy -import cupyx -import jinja2 import numpy as np -from . import utils - - -class DsscGpuRunner: - """Class to handle instantiation and execution of CUDA kernels on trains - - All GPU buffers are kept within this class. This generally means that you will - want to load data into it and then do something. Typical usage in correct order: - - 1. instantiate - 2. load_constants - 3. load_data - 4. load_cell_table - 5. correct - 6a. reshape (only here does data transfer back to host) - 6b. compute_preview (optional) +from . import base_gpu, utils - repeat from 2. or 3. - In case no constants are available / correction is not desired, can skip 3 and 4 - and use only_cast in step 5 instead of correct (taking care to call - compute_preview with parameters set accordingly). - """ - - _src_dir = pathlib.Path(__file__).absolute().parent - with (_src_dir / "gpu-dssc-correct.cpp").open("r") as fd: - _kernel_template = jinja2.Template(fd.read()) +class DsscGpuRunner(base_gpu.BaseGpuRunner): + _kernel_source_filename = "dssc_gpu_kernels.cpp" def __init__( self, @@ -43,39 +17,24 @@ class DsscGpuRunner: input_data_dtype=np.uint16, output_data_dtype=np.float32, ): - self.pixels_x = pixels_x - self.pixels_y = pixels_y - self.memory_cells = memory_cells - self.output_transpose = output_transpose - if constant_memory_cells is None: - self.constant_memory_cells = memory_cells - else: - self.constant_memory_cells = constant_memory_cells - self.input_shape = (self.memory_cells, self.pixels_y, self.pixels_x) - self.output_shape = utils.shape_after_transpose( - self.input_shape, self.output_transpose + self.input_shape = (memory_cells, pixels_y, pixels_x) + self.processed_shape = self.input_shape + super().__init__( + pixels_x, + pixels_y, + memory_cells, + output_transpose, + constant_memory_cells, + input_data_dtype, + output_data_dtype, ) self.map_shape = (self.pixels_x, self.pixels_y, self.constant_memory_cells) - # preview will only be single memory cell - self.preview_shape = (self.pixels_x, self.pixels_y) - self.input_data_dtype = input_data_dtype - self.output_data_dtype = output_data_dtype + self.offset_map_gpu = cupy.empty(self.map_shape, dtype=np.float32) self._init_kernels() self.offset_map_gpu = cupy.empty(self.map_shape, dtype=np.float32) - # reuse output arrays - self.cell_table_gpu = cupy.empty(self.memory_cells, dtype=np.uint16) - self.input_data_gpu = cupy.empty(self.input_shape, dtype=input_data_dtype) - self.processed_data_gpu = cupy.empty(self.input_shape, dtype=output_data_dtype) - self.reshaped_data_gpu = cupy.empty(self.output_shape, dtype=output_data_dtype) - self.preview_raw = cupyx.empty_pinned(self.preview_shape, dtype=np.float32) - self.preview_corrected = cupyx.empty_pinned( - self.preview_shape, dtype=np.float32 - ) - self.output_buffer_next_index = 0 - self.update_block_size((1, 1, 64)) def load_constants(self, offset_map): @@ -87,29 +46,6 @@ class DsscGpuRunner: self._init_kernels() self.offset_map_gpu.set(offset_map) - def load_data(self, raw_data): - self.input_data_gpu.set(np.squeeze(raw_data)) - - def load_cell_table(self, cell_table): - self.cell_table_gpu.set(cell_table) - - def update_block_size(self, full_block): - """Execution is scheduled with 3d "blocks" of CUDA threads, tuning can - affect performance - - Grid size is automatically computed based on block size. Note that - individual kernels must themselves check whether they go out of bounds; - grid dimensions get rounded up in case ndarray size is not multiple of - block size. - - """ - assert len(full_block) == 3 - self.full_block = tuple(full_block) - self.full_grid = tuple( - utils.ceil_div(a_length, block_length) - for (a_length, block_length) in zip(self.input_shape, full_block) - ) - def correct(self): """Apply corrections to data (must load constant, data, and cell_table first) @@ -133,90 +69,6 @@ class DsscGpuRunner: ), ) - def only_cast(self): - """Like correct without the correction - - This currently means just casting to output dtype. - """ - self.casting_kernel( - self.full_grid, - self.full_block, - ( - self.input_data_gpu, - self.processed_data_gpu, - ), - ) - - def reshape(self, out=None): - """Move axes to desired output order - - The out parameter is passed directly to the get function of GPU array: if - None, then a new ndarray (in host memory) is returned. If not None, then data - will be loaded into the provided array, which must match shape / dtype. - """ - # TODO: avoid copy - if self.output_transpose is None: - self.reshaped_data_gpu = cupy.ascontiguousarray( - cupy.squeeze(self.processed_data_gpu) - ) - else: - self.reshaped_data_gpu = cupy.ascontiguousarray( - cupy.transpose( - cupy.squeeze(self.processed_data_gpu), self.output_transpose - ) - ) - return self.reshaped_data_gpu.get(out=out) - - def compute_preview(self, preview_index, have_corrected=True, can_correct=True): - """Generate single slice or reduction preview of raw and corrected data - - Special values of preview_index are -1 for max, -2 for mean, -3 for - sum, and -4 for stdev (across cells). - - Note that preview_index is taken from data without checking cell table. - Caller has to figure out which index along memory cell dimension they - actually want to preview. - - Can reuse data from corrected output buffer with have_corrected parameter. - Note that preview requires relevant data to be loaded (raw data for raw - preview, correction map and cell table in addition for corrected preview). - """ - - if preview_index < -4: - raise ValueError(f"No statistic with code {preview_index} defined") - elif preview_index >= self.memory_cells: - raise ValueError(f"Memory cell index {preview_index} out of range") - - if (not have_corrected) and can_correct: - self.correct() - # if not have_corrected and not can_correct, assume only_cast already done - - # TODO: enum around reduction type - for (image_data, output_buffer) in ( - (self.input_data_gpu, self.preview_raw), - (self.processed_data_gpu, self.preview_corrected), - ): - if preview_index >= 0: - # TODO: change axis order when moving reshape to after correction - image_data[preview_index].astype(np.float32).transpose().get( - out=output_buffer - ) - elif preview_index == -1: - # TODO: select argmax independently for raw and corrected? - # TODO: send frame sums somewhere to compute global max frame - max_index = cupy.argmax( - cupy.sum(image_data, axis=(1, 2), dtype=cupy.float32) - ) - image_data[max_index].astype(np.float32).transpose().get( - out=output_buffer - ) - elif preview_index in (-2, -3, -4): - stat_fun = {-2: cupy.mean, -3: cupy.sum, -4: cupy.std}[preview_index] - stat_fun(image_data, axis=0, dtype=cupy.float32).transpose().get( - out=output_buffer - ) - return self.preview_raw, self.preview_corrected - def _init_kernels(self): kernel_source = self._kernel_template.render( { diff --git a/src/calng/gpu-dssc-correct.cpp b/src/calng/dssc_gpu_kernels.cpp similarity index 100% rename from src/calng/gpu-dssc-correct.cpp rename to src/calng/dssc_gpu_kernels.cpp diff --git a/src/tests/test_agipd_kernels.py b/src/tests/test_agipd_kernels.py new file mode 100644 index 0000000000000000000000000000000000000000..91015e6a0508bbdedbc4b35e487310c91dfeb871 --- /dev/null +++ b/src/tests/test_agipd_kernels.py @@ -0,0 +1,60 @@ +import numpy as np +import pytest + +from calng import agipd_gpu + +input_dtype = np.uint16 +output_dtype = np.float16 +corr_dtype = np.float32 +pixels_x = 512 +pixels_y = 128 +memory_cells = 352 +offset_map = ( + np.random.random(size=(pixels_x, pixels_y, memory_cells)).astype(corr_dtype) * 20 +) +cell_table = np.arange(memory_cells, dtype=np.uint16) +np.random.shuffle(cell_table) +raw_data = np.random.randint( + low=0, high=2000, size=(memory_cells, 2, pixels_x, pixels_y), dtype=input_dtype +) + +thresholds = np.random.random(size=(pixels_y, pixels_x, memory_cells)) * 1000 +thresholds = np.stack((thresholds, thresholds*2), axis=3).astype(np.float32) +gm_const_zeros = np.zeros((pixels_x, pixels_y, memory_cells, 3), dtype=np.float32) +gm_const_ones = np.ones((pixels_x, pixels_y, memory_cells, 3), dtype=np.float32) + +kernel_runner = agipd_gpu.AgipdGpuRunner( + pixels_x, + pixels_y, + memory_cells, + constant_memory_cells=memory_cells, + input_data_dtype=input_dtype, + output_data_dtype=output_dtype, +) + +def thresholding_cpu(data, cell_table, thresholds): + # get to memory_cell, x, y + raw_gain = data[:, 1, ...].astype(np.float32) + # get to threshold, memory_cell, x, y + thresholds = np.transpose(thresholds, (3, 2, 1, 0))[:, cell_table] + res = np.zeros((memory_cells, pixels_x, pixels_y), dtype=np.uint8) + res[raw_gain > thresholds[0]] = 1 + res[raw_gain > thresholds[1]] = 2 + return res + +kernel_runner.load_cell_table(cell_table) +kernel_runner.load_data(raw_data) +kernel_runner.load_thresholds(thresholds) +kernel_runner.correct(agipd_gpu.CorrectionFlags.THRESHOLD) +gpu_res = kernel_runner.gain_map_gpu.get() + +print(np.sum(gpu_res, axis=(0,2))) + +def test_only_thresholding(): + kernel_runner.load_cell_table(cell_table) + kernel_runner.load_data(raw_data) + kernel_runner.load_thresholds(thresholds) + kernel_runner.correct(agipd_gpu.CorrectionFlags.THRESHOLD) + gpu_res = kernel_runner.gain_map_gpu.get() + cpu_res = thresholding_cpu(raw_data, cell_table, thresholds) + assert np.allclose(gpu_res, cpu_res)