From fa93b871ad0d554ea391b446b18506f5c483badd Mon Sep 17 00:00:00 2001 From: David Hammer <dhammer@mailbox.org> Date: Thu, 19 Aug 2021 11:05:54 +0200 Subject: [PATCH] Directly import Karabo symbols --- src/Correction/dssc_correction.py | 165 ++++++++++++++++-------------- 1 file changed, 91 insertions(+), 74 deletions(-) diff --git a/src/Correction/dssc_correction.py b/src/Correction/dssc_correction.py index a9582b03..27565f7e 100644 --- a/src/Correction/dssc_correction.py +++ b/src/Correction/dssc_correction.py @@ -6,19 +6,42 @@ import timeit import calibrationBase import gpu_utils import hashToSchema -import karabo.bound as bound import numpy as np import pycuda.compiler import pycuda.driver import pycuda.gpuarray import pycuda.tools import utils +from karabo.bound import ( + BOOL_ELEMENT, + FLOAT_ELEMENT, + INPUT_CHANNEL, + INT32_ELEMENT, + KARABO_CLASSINFO, + NDARRAY_ELEMENT, + NODE_ELEMENT, + OUTPUT_CHANNEL, + SLOT_ELEMENT, + STRING_ELEMENT, + UINT32_ELEMENT, + UINT64_ELEMENT, + VECTOR_STRING_ELEMENT, + VECTOR_UINT32_ELEMENT, + ChannelMetaData, + Epochstamp, + Hash, + MetricPrefix, + Schema, + Timestamp, + Trainstamp, + Unit, +) from karabo.common.states import State from .cuda_pipeline import PyCudaPipeline -@bound.KARABO_CLASSINFO("DsscCorrection", "0.0.0") +@KARABO_CLASSINFO("DsscCorrection", "0.0.0") class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): _dict_cache_slots = { "applyCorrection", @@ -41,15 +64,15 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): "Offset", "Dark", expected, optional=True, mandatoryForIteration=True ) - bound.SLOT_ELEMENT(expected).key( - "askConnectedReadersToSendMySources" - ).displayedName("Request sources from connected RunToPipe").description( + SLOT_ELEMENT(expected).key("askConnectedReadersToSendMySources").displayedName( + "Request sources from connected RunToPipe" + ).description( "Only relevant for development environment. When running without a " "CAL_MANAGER, we need to tell RunToPipe instances which sources " "to send us." ).commit() - bound.BOOL_ELEMENT(expected).key("doAnything").displayedName( + BOOL_ELEMENT(expected).key("doAnything").displayedName( "Enable input processing" ).description( "Toggle handling of input (at all). If False, the input handler " @@ -59,7 +82,7 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): True ).reconfigurable().commit() - bound.BOOL_ELEMENT(expected).key("applyCorrection").displayedName( + BOOL_ELEMENT(expected).key("applyCorrection").displayedName( "Enable correction(s)" ).description( "Toggle whether or not correction(s) are applied to image data. " @@ -70,11 +93,11 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): True ).reconfigurable().commit() - bound.INPUT_CHANNEL(expected).key("dataInput").commit() + INPUT_CHANNEL(expected).key("dataInput").commit() # note: output schema not set, will be updated to match data later - bound.OUTPUT_CHANNEL(expected).key("dataOutput").commit() + OUTPUT_CHANNEL(expected).key("dataOutput").commit() - bound.VECTOR_STRING_ELEMENT(expected).key("fastSources").displayedName( + VECTOR_STRING_ELEMENT(expected).key("fastSources").displayedName( "Fast data sources" ).description( "Sources to fast data as provided in channel metadata. " @@ -83,7 +106,7 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): "Currently ignores the path.in.hash part (for hardcoded image.data)" ).assignmentMandatory().commit() - bound.STRING_ELEMENT(expected).key("pulseFilter").displayedName( + STRING_ELEMENT(expected).key("pulseFilter").displayedName( "Pulse filter" ).description( "Filter pulses: will be evaluated as array of indices to keep from data. " @@ -95,17 +118,17 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): "" ).reconfigurable().commit() - bound.NODE_ELEMENT(expected).key("dataFormat").displayedName( + NODE_ELEMENT(expected).key("dataFormat").displayedName( "Data format (in/out)" ).commit() - bound.STRING_ELEMENT(expected).key("dataFormat.inputImageDtype").displayedName( + STRING_ELEMENT(expected).key("dataFormat.inputImageDtype").displayedName( "Input image data dtype" ).description("The (numpy) dtype to expect for incoming image data.").options( "uint16,float32" ).assignmentOptional().defaultValue( "uint16" ).commit() - bound.STRING_ELEMENT(expected).key("dataFormat.outputImageDtype").displayedName( + STRING_ELEMENT(expected).key("dataFormat.outputImageDtype").displayedName( "Output image data dtype" ).description( "The (numpy) dtype to use for outgoing image data. " @@ -117,42 +140,42 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): "float32" ).commit() # important: shape of data as going into correction - bound.UINT32_ELEMENT(expected).key("dataFormat.pixelsX").displayedName( + UINT32_ELEMENT(expected).key("dataFormat.pixelsX").displayedName( "Pixels x" ).description( "Number of pixels of image data along X axis" ).assignmentMandatory().commit() - bound.UINT32_ELEMENT(expected).key("dataFormat.pixelsY").displayedName( + UINT32_ELEMENT(expected).key("dataFormat.pixelsY").displayedName( "Pixels y" ).description( "Number of pixels of image data along Y axis" ).assignmentMandatory().commit() - bound.UINT32_ELEMENT(expected).key("dataFormat.memoryCells").displayedName( + UINT32_ELEMENT(expected).key("dataFormat.memoryCells").displayedName( "Memory cells" ).description( "Full number of memory cells in incoming data" ).assignmentMandatory().commit() - bound.UINT32_ELEMENT(expected).key( - "dataFormat.memoryCellsCorrection" - ).displayedName("(Debug) Memory cells in correction map").description( + UINT32_ELEMENT(expected).key("dataFormat.memoryCellsCorrection").displayedName( + "(Debug) Memory cells in correction map" + ).description( "Full number of memory cells in currently loaded correction map. " "May exceed memory cell number in input if veto is on. " "This value just displayed for debugging." ).readOnly().initialValue( 0 ).commit() - bound.VECTOR_UINT32_ELEMENT(expected).key( - "dataFormat.inputDataShape" - ).displayedName("Input data shape").description( + VECTOR_UINT32_ELEMENT(expected).key("dataFormat.inputDataShape").displayedName( + "Input data shape" + ).description( "Image data shape in incoming data (from reader / DAQ). " "Value computed from pixelsX, pixelsY, and memoryCells - " "this slot is just showing you what is currently expected." ).readOnly().initialValue( [] ).commit() - bound.VECTOR_UINT32_ELEMENT(expected).key( - "dataFormat.outputDataShape" - ).displayedName("Output data shape").description( + VECTOR_UINT32_ELEMENT(expected).key("dataFormat.outputDataShape").displayedName( + "Output data shape" + ).description( "Image data shape for data output from this device. " "Value computed from pixelsX, pixelsY, and the size of the pulse filter - " "this slot is just showing what is currently expected." @@ -160,7 +183,7 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): [] ).commit() - bound.UINT32_ELEMENT(expected).key("outputShmemBufferLength").displayedName( + UINT32_ELEMENT(expected).key("outputShmemBufferLength").displayedName( "Output buffer length" ).description( "Corrected trains are written to shared memory locations. These are " @@ -171,20 +194,20 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): ).commit() # preview schema (WIP) - bound.NODE_ELEMENT(expected).key("preview").displayedName("Preview").commit() - preview_schema = bound.Schema() - bound.NODE_ELEMENT(preview_schema).key("data").commit() - bound.NDARRAY_ELEMENT(preview_schema).key("data.adc").dtype("FLOAT").commit() - bound.OUTPUT_CHANNEL(expected).key("preview.outputRaw").dataSchema( + NODE_ELEMENT(expected).key("preview").displayedName("Preview").commit() + preview_schema = Schema() + NODE_ELEMENT(preview_schema).key("data").commit() + NDARRAY_ELEMENT(preview_schema).key("data.adc").dtype("FLOAT").commit() + OUTPUT_CHANNEL(expected).key("preview.outputRaw").dataSchema( preview_schema ).commit() - bound.OUTPUT_CHANNEL(expected).key("preview.outputCorrected").dataSchema( + OUTPUT_CHANNEL(expected).key("preview.outputCorrected").dataSchema( preview_schema ).commit() - bound.BOOL_ELEMENT(expected).key("preview.enable").displayedName( + BOOL_ELEMENT(expected).key("preview.enable").displayedName( "Enable preview data generation" ).assignmentOptional().defaultValue(True).reconfigurable().commit() - bound.INT32_ELEMENT(expected).key("preview.pulse").displayedName( + INT32_ELEMENT(expected).key("preview.pulse").displayedName( "Pulse (or stat) for preview" ).description( "If this value is ≥ 0, the corresponding index from data will be " @@ -199,7 +222,7 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): ).assignmentOptional().defaultValue( 0 ).reconfigurable().commit() - bound.UINT32_ELEMENT(expected).key("preview.trainIdModulo").displayedName( + UINT32_ELEMENT(expected).key("preview.trainIdModulo").displayedName( "Train modulo for throttling" ).description( "Preview will only be sent for trains whose ID modulo this number " @@ -211,27 +234,27 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): ).reconfigurable().commit() # timer-related settings - bound.NODE_ELEMENT(expected).key("performance").displayedName( + NODE_ELEMENT(expected).key("performance").displayedName( "Performance measures" ).commit() - bound.FLOAT_ELEMENT(expected).key( - "performance.rateUpdateInterval" - ).displayedName("Rate update interval").description( + FLOAT_ELEMENT(expected).key("performance.rateUpdateInterval").displayedName( + "Rate update interval" + ).description( "Maximum interval (seconds) between updates of the rate. " "Mostly relevant if not rateUpdateOnEachInput or if input is slow." ).assignmentOptional().defaultValue( 1 ).reconfigurable().commit() - bound.FLOAT_ELEMENT(expected).key("performance.rateBufferSpan").displayedName( + FLOAT_ELEMENT(expected).key("performance.rateBufferSpan").displayedName( "Rate measurement buffer span" ).description( "Event buffer timespan (in seconds) for measuring rate" ).assignmentOptional().defaultValue( 20 ).reconfigurable().commit() - bound.BOOL_ELEMENT(expected).key( - "performance.rateUpdateOnEachInput" - ).displayedName("Update rate on each input").description( + BOOL_ELEMENT(expected).key("performance.rateUpdateOnEachInput").displayedName( + "Update rate on each input" + ).description( "Whether or not to update the device rate for each input " "(otherwise only based on rateUpdateInterval). " "Note that processed trains are always registered - this just " @@ -240,47 +263,43 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): False ).reconfigurable().commit() - bound.FLOAT_ELEMENT(expected).key("processingStateTimeout").description( + FLOAT_ELEMENT(expected).key("processingStateTimeout").description( "Timeout after which the device goes from PROCESSING back to ACTIVE " "if no new input is processed" ).assignmentOptional().defaultValue(10).reconfigurable().commit() # just measurements and counters to display - bound.UINT64_ELEMENT(expected).key("trainId").displayedName( - "Train ID" - ).description( + UINT64_ELEMENT(expected).key("trainId").displayedName("Train ID").description( "ID of latest train processed by this device." - ).readOnly().initialValue( - 0 - ).commit() - bound.FLOAT_ELEMENT(expected).key( - "performance.lastProcessingDuration" - ).displayedName("Processing time").description( + ).readOnly().initialValue(0).commit() + FLOAT_ELEMENT(expected).key("performance.lastProcessingDuration").displayedName( + "Processing time" + ).description( "Amount of time spent in processing latest train. " "Time includes generating preview and sending data." ).unit( - bound.Unit.SECOND + Unit.SECOND ).metricPrefix( - bound.MetricPrefix.MILLI + MetricPrefix.MILLI ).readOnly().initialValue( 0 ).commit() - bound.FLOAT_ELEMENT(expected).key("performance.rate").displayedName( + FLOAT_ELEMENT(expected).key("performance.rate").displayedName( "Rate" ).description( "Actual rate with which this device gets / processes / sends trains" ).unit( - bound.Unit.HERTZ + Unit.HERTZ ).readOnly().initialValue( 0 ).commit() - bound.FLOAT_ELEMENT(expected).key("performance.theoreticalRate").displayedName( + FLOAT_ELEMENT(expected).key("performance.theoreticalRate").displayedName( "Processing rate (hypothetical)" ).description( "Rate with which this device could hypothetically process trains. " "Based on lastProcessingDuration." ).unit( - bound.Unit.HERTZ + Unit.HERTZ ).readOnly().initialValue( float("NaN") ).warnLow( @@ -293,19 +312,19 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): # stuff from typical calPy that we don't use right now # Included to avoid errors due to unexpected configuration from init device - bound.STRING_ELEMENT(expected).key("sourceInfix").displayedName( + STRING_ELEMENT(expected).key("sourceInfix").displayedName( "[Disabled]" ).assignmentOptional().defaultValue("").commit() - bound.UINT32_ELEMENT(expected).key("maxGPUHandlesInFlight").displayedName( + UINT32_ELEMENT(expected).key("maxGPUHandlesInFlight").displayedName( "[Disabled]" ).assignmentOptional().defaultValue(35).commit() - bound.VECTOR_UINT32_ELEMENT(expected).key("gainMapping").displayedName( + VECTOR_UINT32_ELEMENT(expected).key("gainMapping").displayedName( "[Disabled]" ).assignmentOptional().defaultValue([]).commit() - bound.BOOL_ELEMENT(expected).key("dontProcess").displayedName( + BOOL_ELEMENT(expected).key("dontProcess").displayedName( "[Disabled]" ).assignmentOptional().defaultValue(False).reconfigurable().commit() @@ -343,7 +362,7 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): self._reset_timer = None self.updateState(State.ON) - self._buffered_status_update = bound.Hash( + self._buffered_status_update = Hash( "trainId", 0, "performance.rate", @@ -561,9 +580,9 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): self.signalEndOfStream("dataOutput") def write_output(self, data, old_metadata): - metadata = bound.ChannelMetaData( + metadata = ChannelMetaData( old_metadata.get("source"), - bound.Timestamp.fromHashAttributes(old_metadata.getAttributes("timestamp")), + Timestamp.fromHashAttributes(old_metadata.getAttributes("timestamp")), ) if "image.passport" not in data: @@ -581,14 +600,14 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): def write_combiner_preview(self, data_raw, data_corrected, train_id, source): # TODO: take into account updated pulse table after pulse filter - preview_hash = bound.Hash() + preview_hash = Hash() preview_hash.set("image.passport", [self.getInstanceId()]) preview_hash.set("image.trainId", train_id) preview_hash.set("image.pulseId", self.get("preview.pulse")) # note: have to construct because setting .tid after init is broken - timestamp = bound.Timestamp(bound.Epochstamp(), bound.Trainstamp(train_id)) - metadata = bound.ChannelMetaData(source, timestamp) + timestamp = Timestamp(Epochstamp(), Trainstamp(train_id)) + metadata = ChannelMetaData(source, timestamp) for channel_name, data in ( ("preview.outputRaw", data_raw), ("preview.outputCorrected", data_corrected), @@ -690,7 +709,7 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): for channel in self.get("dataInput.connectedOutputChannels") ] - send_me_the_thing_hash = bound.Hash() + send_me_the_thing_hash = Hash() for source in self.get("fastSources"): send_me_the_thing_hash[f"sources.{source}"] = True @@ -711,9 +730,7 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): self.log.INFO("Updating the output schema based on actual outgoing data") my_schema = self.getFullSchema() data_schema = hashToSchema.HashToSchema(data).schema - bound.OUTPUT_CHANNEL(my_schema).key("dataOutput").dataSchema( - data_schema - ).commit() + OUTPUT_CHANNEL(my_schema).key("dataOutput").dataSchema(data_schema).commit() my_config = copy.copy(self.getCurrentConfiguration()) self.updateSchema(my_schema) self.log.INFO("Re-applying backed up config") -- GitLab