diff --git a/DEPENDS b/DEPENDS
index 26a5109cef1ecbaca648556f4e7968a521c6cdc4..ecfd72308abf45cf6b79eb94ac17c6e19a6306aa 100644
--- a/DEPENDS
+++ b/DEPENDS
@@ -1,4 +1,4 @@
-TrainMatcher, 2.4.7
+TrainMatcher, 2.4.8
 calibrationClient, 11.3.0
-calibration/geometryDevices, 0.0.7
-calibration/calngUtils, 0.0.7
+calibration/geometryDevices, 0.0.8
+calibration/calngUtils, 0.0.8
diff --git a/Makefile b/Makefile
index 163b20cb42680bbe9dcee6782fdd8b088276ca6b..a394f27b590bb3b0a414bb3a06c7216d13a7cd91 100644
--- a/Makefile
+++ b/Makefile
@@ -1,15 +1,20 @@
 PYPI = pip install --index-url "https://devpi.exfldadev01.desy.de/root/pypi"
 PROXIED = pip install --proxy "http://exflproxy01.desy.de:3128"
 
-.PHONY: all cupy jinja2 h5py xarray extra-geom calng
+.PHONY: all cupy h5py xarray extra-geom calng
 
 all: calng
 
-cupy:
-	$(PYPI) cupy-cuda12x==11.6.0
+# this is what's in Karabo, but cython compiled with 1.x somehow?
+numpy:
+	$(PYPI) --upgrade "numpy==2.0.1"
+
+# just to get something newer than the 0.22 shipping with Karabo
+cython:
+	$(PYPI) --upgrade cython
 
-jinja2:
-	$(PYPI) Jinja2==3.1.2
+cupy:
+	$(PYPI) cupy-cuda12x
 
 # version: follow minimum used by EXtra-data
 h5py:
@@ -21,5 +26,5 @@ xarray:
 extra-geom:
 	$(PYPI) extra_geom==1.12.0
 
-calng: cupy jinja2 h5py extra-geom xarray
+calng: numpy cupy h5py extra-geom xarray cython
 	pip install --upgrade .
diff --git a/docs/detectors.md b/docs/detectors.md
index b2fa6d7ce0b75e6e9a3172ab25f42e429cf2f583..1cde311094bf0b231ba92d247e9a998427bb44e5 100644
--- a/docs/detectors.md
+++ b/docs/detectors.md
@@ -66,6 +66,33 @@ Preview outputs available:
 | relative gain correction | RelativeGain       |
 | flatfield correction     | FFMap              |
 | bad pixel masking        | BadPixels{Dark,FF} |
+| parallel gain            | Offset,Noise       |
+
+#### Parallel gain
+First use of this operation mode is planned for a beamtime in CW 47 of 2024.
+In parallel gain mode, the detector will take and send us data for each of the three gain stages simultaneously, resulting in input data having three times as many "frames" as there otherwise would be.
+The correction devices can:
+
+- ignore this entirely if the correction step is switched off, sending along all three versions of the set of frames (first all high, then medium, then low gain),
+- select the frame data from one gain stage, high, medium, or low (see `corrections.parallelGain.handling`),
+- or use thresholding to select, for each frame and each pixel individually which gain stage to use.
+
+Thresholding requires offset and noise constants to be loaded.
+The parameters `corrections.parallelGain.sigmaHighMedium` and `corrections.parallelGain.sigmaMediumLow` are used to compute the gain switching thresholds.
+For a given pixel in a given memory cell, let $N$ denote the noise (std from dark calibration data), and $A$ the raw ADU value, with subscript $H$, $M$, or $L$ to denote the high, medium, or low gain values.
+
+- Initially the high gain value, $A_H$ is considered
+- If $A_H > 4096 - \sigma_H N_H$, switch to medium gain value, $A_M$
+- If then $A_M > 4096 - \sigma_M N_M$, switch to low gain value, $A_L$
+
+
+Appropriate amplification based on gain stage is done by the regular correction steps which are executed after selection / thresholding (`corrections.gainAmp`).
+
+The parameter `constantParameters.parallelGain` is used to query calibration constants for parallel gain mode *and* it determines whether the parallel gain correction steps are available.
+This means that if `constantParameters.parallelGain` is turned off, the parallel gain correction step detailed above is not run.
+
+As with other correction steps, parallel gain "correction" (selection or thresholding) can be turned on or off for full data output (`corrections.parallelGain.enable`) and preview (`corrections.parallelGain.preview`) independently.
+Keep this in mind to avoid confusion about the number of frames; raw preview may have three times as many frames to choose from as corrected preview and full data may agree with one of the two.
 
 ### Previews
 Preview outputs available:
diff --git a/docs/javascripts/katex.js b/docs/javascripts/katex.js
new file mode 100644
index 0000000000000000000000000000000000000000..f7fd7047fcdf345bf13dd46f485c4be3b7f37506
--- /dev/null
+++ b/docs/javascripts/katex.js
@@ -0,0 +1,10 @@
+document$.subscribe(({ body }) => {
+  renderMathInElement(body, {
+    delimiters: [
+      { left: "$$",  right: "$$",  display: true },
+      { left: "$",   right: "$",   display: false },
+      { left: "\\(", right: "\\)", display: false },
+      { left: "\\[", right: "\\]", display: true }
+    ],
+  })
+})
diff --git a/mkdocs.yml b/mkdocs.yml
index 30d89b7e808be8b03d1398c2cb89631926860b86..a9204daf9bf329b404ddcc02a84947419fdfc031 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -3,3 +3,13 @@ theme:
 site_name: calng user docs
 markdown_extensions:
   - def_list
+  - pymdownx.arithmatex:
+      generic: true
+
+extra_javascript:
+  - javascripts/katex.js
+  - https://unpkg.com/katex@0/dist/katex.min.js
+  - https://unpkg.com/katex@0/dist/contrib/auto-render.min.js
+
+extra_css:
+  - https://unpkg.com/katex@0/dist/katex.min.css
diff --git a/setup.py b/setup.py
index 0f50a792a95a3f929b5767bf8245701d7e6a6b82..be29428680f8ad923dae4f69302d0fd948b78b9f 100644
--- a/setup.py
+++ b/setup.py
@@ -5,6 +5,7 @@ from setuptools import setup, find_packages, Extension
 
 from Cython.Build import cythonize
 from karabo.packaging.versioning import device_scm_version
+import numpy as np
 
 
 ROOT_FOLDER = dirname(realpath(__file__))
@@ -37,6 +38,7 @@ setup(
             "ShmemTrainMatcher = calng.ShmemTrainMatcher:ShmemTrainMatcher",
             "SimpleFrameSelectionArbiter = calng.FrameSelectionArbiter:SimpleFrameSelectionArbiter",  # noqa
             "AdvancedFrameSelectionArbiter = calng.FrameSelectionArbiter:AdvancedFrameSelectionArbiter",  # noqa
+            "AzimuthalCombiner = calng.AzimuthalCombiner:AzimuthalCombiner",
             "DetectorAssembler = calng.DetectorAssembler:DetectorAssembler",
             "Gotthard2Assembler = calng.Gotthard2Assembler:Gotthard2Assembler",
             "LpdminiSplitter = calng.LpdminiSplitter:LpdminiSplitter",
@@ -50,12 +52,15 @@ setup(
             "RoiTool = calng.RoiTool:RoiTool",
         ],
         "calng.correction_addon": [
+            "Autocorrelation = calng.correction_addons.autocorrelation:Autocorrelation",  # noqa
+            "AzimuthalIntegrator = calng.correction_addons.azimuthal:AzimuthalIntegrator",  # noqa
             "IntegratedIntensity = calng.correction_addons.integrated_intensity:IntegratedIntensity",  # noqa
+            "RoiIntegration = calng.correction_addons.integrate_roi:RoiIntegration",  # noqa
             "LitPixelCounter = calng.correction_addons.litpixel_counter:LitPixelCounter",  # noqa
             "Peakfinder9 = calng.correction_addons.peakfinder9:Peakfinder9",  # noqa
             "RandomFrames = calng.correction_addons.random_frames:RandomFrames",
             "SaturationMonitor = calng.correction_addons.saturation_monitor:SaturationMonitor",# noqa
-            "Autocorrelation = calng.correction_addons.autocorrelation:Autocorrelation",  # noqa
+            "UserMask = calng.correction_addons.masking:UserMask",  # noqa
         ],
         "calng.arbiter_kernel": [
             "Assign = calng.arbiter_kernels.base_kernel:Assign",
@@ -78,12 +83,17 @@ setup(
                 extra_compile_args=["-O3", "-march=native", "-fopenmp"],
                 extra_link_args=["-fopenmp"],
             ),
+            Extension(
+                "calng.kernels.azimuthal_cython",
+                ["src/calng/kernels/azimuthal.pyx"],
+                extra_compile_args=["-O3", "-march=native", "-fopenmp"],
+                extra_link_args=["-fopenmp"],
+            ),
             Extension(
                 "calng.kernels.common_cython",
                 ["src/calng/kernels/common_cpu.pyx"],
                 extra_compile_args=["-O3", "-march=native", "-fopenmp"],
                 extra_link_args=["-fopenmp"],
-                language="c++",
             ),
             Extension(
                 "calng.kernels.dssc_cython",
@@ -111,4 +121,5 @@ setup(
         ],
         compiler_directives={"language_level": 3},
     ),
+    include_dirs=[np.get_include()],
 )
diff --git a/src/calng/AzimuthalCombiner.py b/src/calng/AzimuthalCombiner.py
new file mode 100644
index 0000000000000000000000000000000000000000..7f2b71b649133e34cea2ca389e736b15217079a4
--- /dev/null
+++ b/src/calng/AzimuthalCombiner.py
@@ -0,0 +1,216 @@
+import numpy as np
+from calng.stacking_utils import StackingFriend
+from calngUtils import device as device_utils
+from .utils import kwhash
+from karabo.bound import (
+    IMAGEDATA_ELEMENT,
+    KARABO_CLASSINFO,
+    NDARRAY_ELEMENT,
+    INT32_ELEMENT,
+    NODE_ELEMENT,
+    OUTPUT_CHANNEL,
+    OVERWRITE_ELEMENT,
+    Hash,
+    ImageData,
+    Schema,
+)
+from TrainMatcher import TrainMatcher
+
+from . import scenes
+from ._version import version as deviceVersion
+
+
+@device_utils.with_unsafe_get
+@KARABO_CLASSINFO("AzimuthalCombiner", deviceVersion)
+class AzimuthalCombiner(TrainMatcher.TrainMatcher):
+    @staticmethod
+    def expectedParameters(expected):
+        (
+            OVERWRITE_ELEMENT(expected)
+            .key("availableScenes")
+            .setNewDefaultValue(["overview", "trainMatcherScene"])
+            .commit(),
+
+            OVERWRITE_ELEMENT(expected)
+            .key("sortSources")
+            .setNowReadOnly()
+            .setNewDefaultValue(False)
+            .commit(),
+        )
+        StackingFriend.add_schema(expected)
+        (
+            OVERWRITE_ELEMENT(expected)
+            .key("merge")
+            .setNewDefaultValue(
+                [
+                    kwhash(
+                        select=True,
+                        sourcePattern=r".*/DET/.*:(xtdf|daqOutput)",
+                        keyPattern=key,
+                        replacement="STACKED",
+                        groupType="sources",
+                        mergeMethod="stack",
+                        axis=0,
+                        missingValue="nan",
+                    )
+                    for key in ("azimuthal.integrated1d", "azimuthal.integrated1dWeight", "azimuthal.bins")
+                ]
+            )
+            .commit(),
+        )
+        output_schema = Schema()
+        preview_streak_schema = Schema()
+        preview_hist_schema = Schema()
+        (
+            NDARRAY_ELEMENT(output_schema)
+            .key("integrated1d")
+            .commit(),
+
+            NDARRAY_ELEMENT(output_schema)
+            .key("bins")
+            .commit(),
+
+            IMAGEDATA_ELEMENT(preview_streak_schema)
+            .key("image")
+            .commit(),
+
+            NDARRAY_ELEMENT(preview_hist_schema)
+            .key("x")
+            .commit(),
+
+            NDARRAY_ELEMENT(preview_hist_schema)
+            .key("y")
+            .commit(),
+        )
+        (
+            OUTPUT_CHANNEL(expected)
+            .key("output")
+            .dataSchema(output_schema)
+            .commit(),
+
+            # emulating the schema of devices using preview friend
+            NODE_ELEMENT(expected)
+            .key("preview")
+            .commit(),
+
+            NODE_ELEMENT(expected)
+            .key("preview.streak")
+            .commit(),
+
+            OUTPUT_CHANNEL(expected)
+            .key("preview.streak.output")
+            .dataSchema(preview_streak_schema)
+            .commit(),
+
+            NODE_ELEMENT(expected)
+            .key("preview.hist")
+            .commit(),
+
+            INT32_ELEMENT(expected)
+            .key("preview.hist.index")
+            .displayedName("Preview index")
+            .description(
+                "In case of multi-frame data, which frame should we preview? Either "
+                "select frame by using value >= 0 or reduction with special values: "
+                "-1: max, -2: mean, -3: sum, -4: stdev. If you specify a frame out "
+                "of range, I will silently clamp index to some legal value."
+            )
+            .assignmentOptional()
+            .defaultValue(0)
+            .minInc(-4)
+            .reconfigurable()
+            .commit(),
+
+            OUTPUT_CHANNEL(expected)
+            .key("preview.hist.output")
+            .dataSchema(preview_hist_schema)
+            .commit(),
+        )
+
+    def initialization(self):
+        super().initialization()
+        self._stacking_friend = StackingFriend(
+            self, self.get("sources"), self.get("merge")
+        )
+        self.start()
+
+    def on_matched_data(self, tid, sources):
+        # snatch up bins from arbitrary source
+        # TODO: maybe check that bins agree
+        for source, (data, _) in sources.items():
+            if data.has("azimuthal.bins"):
+                bins = data["azimuthal.bins"]
+                break
+        else:
+            self.log.WARN("No source provided azimuthal integration bins")
+            return
+        bin_midpoints = (bins[1:] + bins[:-1]) / 2
+        # collect actual results
+        self._stacking_friend.process(sources)
+        data_hash, _ = sources["STACKED"]
+
+        # combining data across modules
+        combined = np.nansum(data_hash["azimuthal.integrated1d"], axis=0) / np.nansum(
+            data_hash["azimuthal.integrated1dWeight"], axis=0
+        )
+
+        # regular output: "full" combined data + actual bin edges
+        self.writeChannel("output", Hash("bins", bins, "integrated1d", combined))
+
+        # streak preview: same data, just as image
+        self.writeChannel(
+            "preview.streak.output",
+            Hash("image", ImageData(np.nan_to_num(combined)))
+        )
+
+        # histogram: select some frame
+        preview_index = self.unsafe_get("preview.hist.index")
+        if preview_index == -1:
+            preview_data = np.nanmax(combined, axis=0)
+        elif preview_index == -2:
+            preview_data = np.nanmean(combined, axis=0)
+        elif preview_index == -3:
+            preview_data = np.nansum(combined, axis=0)
+        elif preview_index == -4:
+            preview_data = np.nanstd(combined, axis=0)
+        else:
+            preview_index = max(0, min(preview_index, combined.shape[0]-1))
+            preview_data = combined[preview_index]
+        np.nan_to_num(preview_data, copy=False)
+        self.writeChannel(
+            "preview.hist.output",
+            Hash(
+                "x", bin_midpoints,
+                "y", preview_data,
+            )
+        )
+
+        self.rate_out.update()
+        self.info["sent"] += 1
+        self.info["trainId"] = tid
+
+    def requestScene(self, params):
+        scene_name = params.get("name", default="")
+        if scene_name == "trainMatcherScene":
+            params["name"] = "scene"
+            return super().requestScene(params)
+        if scene_name != "overview":
+            self.log.INFO(
+                f"Unexpected scene name {scene_name} requested, "
+                "will just return overview"
+            )
+        payload = Hash("name", scene_name, "success", True)
+        payload["data"] = scenes.azimuthal_combiner_overview(
+            device_id=self.getInstanceId(),
+            schema=self.getFullSchema(),
+        )
+        self.reply(
+            Hash(
+                "type",
+                "deviceScene",
+                "origin",
+                self.getInstanceId(),
+                "payload",
+                payload,
+            )
+        )
diff --git a/src/calng/CalibrationManager.py b/src/calng/CalibrationManager.py
index f0a6d192f7dd6473f5d8fa400cd269c712295209..bed6a8307cd225cc7a0d4d4c2002cd2086c888a5 100644
--- a/src/calng/CalibrationManager.py
+++ b/src/calng/CalibrationManager.py
@@ -40,9 +40,8 @@ Device states:
  - INIT: When the device is starting up
  - ACTIVE: When the device is ready to manage a calibration pipeline
  - CHANGING: When the device is actively changing the pipeline configuration
- - UNKNOWN: When the pipeline should be restarted after reconfiguration
  - ERROR: Recoverable error, only allows server restart
- - DISABLED: Unrecoverable error, requires device restart
+ - UNKNOWN: Unrecoverable error, requires device restart
 '''
 
 
@@ -350,7 +349,7 @@ class CalibrationManager(DeviceClientBase, Device):
         displayedName='Detector type',
         description='Type of the detector to manage.',
         options=['AGIPD', 'LPD', 'DSSC', 'Jungfrau', 'ePix100', 'pnCCD',
-                 'FastCCD', 'Gotthard2'],
+                 'FastCCD', 'Gotthard2', 'ShimadzuHPVX2'],
         accessMode=AccessMode.INITONLY,
         assignment=Assignment.MANDATORY)
 
@@ -385,6 +384,14 @@ class CalibrationManager(DeviceClientBase, Device):
                     'pipeline, which is appended to the same device ID root '
                     '(DOMAIN/TYPE) as the manager instance itself.')
 
+    deviceServers = VectorHash(
+        displayedName='Device servers',
+        description='WARNING: It is strongly recommended to perform a full '
+                    'pipeline restart after changes to this property.',
+        rows=DeviceServerRow,
+        accessMode=AccessMode.INITONLY,
+        assignment=Assignment.MANDATORY)
+
     modules = VectorHash(
         displayedName='Modules',
         description='Individual modules constituting this detector. A module '
@@ -407,20 +414,6 @@ class CalibrationManager(DeviceClientBase, Device):
         accessMode=AccessMode.RECONFIGURABLE,
         assignment=Assignment.MANDATORY)
 
-    @VectorHash(
-        displayedName='Device servers',
-        description='WARNING: It is strongly recommended to perform a full '
-                    'pipeline restart after changes to this property.',
-        rows=DeviceServerRow,
-        accessMode=AccessMode.RECONFIGURABLE,
-        assignment=Assignment.MANDATORY)
-    async def deviceServers(self, value):
-        self.deviceServers = value
-        self._servers_changed = True
-
-        # Switch to UNKNOWN state to suggest operator to restart pipeline.
-        self.state = State.UNKNOWN
-
     previewServer = String(
         displayedName='Preview device server',
         description='The server (from "Device servers" list) to use for '
@@ -477,7 +470,7 @@ class CalibrationManager(DeviceClientBase, Device):
         defaultValue=[Hash(
             'enabled', False,
             'memberPattern', r'MATCH_G\d',
-            'keyPattern', '^(sources|zmqConfig|sortSources|mode')],
+            'keyPattern', '^(sources|zmqConfig|sortSources|mode)')],
         accessMode=AccessMode.RECONFIGURABLE)
     async def restoredConfigurations(self, new_configs):
         self.restoredConfigurations = new_configs
@@ -494,14 +487,14 @@ class CalibrationManager(DeviceClientBase, Device):
 
     @Slot(
         displayedName='Restart servers',
-        allowedStates=[State.UNKNOWN, State.ACTIVE, State.ERROR])
+        allowedStates=[State.ACTIVE, State.ERROR])
     async def restartServers(self):
         self.state = State.CHANGING
         background(self._restart_servers())
 
     @Slot(
         displayedName='Instantiate pipeline',
-        allowedStates=[State.UNKNOWN, State.ACTIVE])
+        allowedStates=[State.ACTIVE])
     async def startInstantiate(self):
         # Slot name is mandated by DeviceInstantiator interface.
         self.state = State.CHANGING
@@ -608,6 +601,9 @@ class CalibrationManager(DeviceClientBase, Device):
         # Task object to update managed devices list.
         self._managed_devices_updater = None
 
+        # Copy of the correction device schema for scene generation.
+        self._correction_device_schema = Schema()
+
     async def onInitialization(self):
         self.state = State.INIT
         await self._async_init()
@@ -642,9 +638,15 @@ class CalibrationManager(DeviceClientBase, Device):
         # Check device servers and initialize webserver access.
         await self._check_servers()
 
+        if self.state != State.INIT:
+            return
+
         # Inject schema for configuration of managed devices.
         await self._inject_managed_keys()
 
+        if self.state != State.INIT:
+            return
+
         # Populate preview layers from correction device schema
         self.previewLayers = list(self._correction_device_schema
                                   .hash["preview"].getKeys())
@@ -652,9 +654,11 @@ class CalibrationManager(DeviceClientBase, Device):
         # Populate the device ID sets with what's out there right now.
         await self._check_topology()
 
-        if self.state == State.INIT:
-            self._set_status('Calibration manager ready')
-            self.state = State.ACTIVE
+        if self.state != State.INIT:
+            return
+
+        self._set_status('Calibration manager ready')
+        self.state = State.ACTIVE
 
     def _check_new_device(self, device_id, server_id, class_id):
         if device_id == self.deviceId:
@@ -824,7 +828,6 @@ class CalibrationManager(DeviceClientBase, Device):
                                           self._class_ids['correction'])
 
         # Save this for scene generation later.
-        self._correction_device_schema = Schema()
         self._correction_device_schema.copy(managed_schema)
 
         if managed_schema.name != self._class_ids['correction']:
@@ -999,9 +1002,9 @@ class CalibrationManager(DeviceClientBase, Device):
         self._set_status(text, level=logging.ERROR)
 
     def _set_fatal(self, text):
-        """Set the device into disabled state and log an error message."""
+        """Set the device into unknown state and log an error message."""
 
-        self.state = State.DISABLED
+        self.state = State.UNKNOWN
         self._set_status(text, level=logging.CRITICAL)
 
     def _set_exception(self, text, e):
@@ -1097,9 +1100,6 @@ class CalibrationManager(DeviceClientBase, Device):
     async def _check_servers(self):
         """Validate device server configuration."""
 
-        if not self._servers_changed:
-            return True
-
         # Mapping of servers to device servers.
         self._server_hosts = {server: host for server, host, _
                               in self.deviceServers.value}
@@ -1164,18 +1164,12 @@ class CalibrationManager(DeviceClientBase, Device):
                 self._server_api_names[name] = info['name']
 
         if err:
-            self._set_error('One or more device server problems were '
+            self._set_fatal('One or more device server problems were '
                             'detected:\n' + '\n'.join(err))
-            return False
-        else:
-            return True
 
     async def _restart_servers(self):
         """Restart all managed device servers."""
 
-        if not await self._check_servers():
-            return
-
         try:
             # Find all servers in need of being brought down.
             up_names = await self._get_servers_in_state('up')
@@ -1295,9 +1289,6 @@ class CalibrationManager(DeviceClientBase, Device):
     async def _instantiate_pipeline(self):
         """Instantiate all managed devices."""
 
-        if not await self._check_servers():
-            return
-
         # Make sure all servers are up.
         try:
             up_servers = await self._get_servers_in_state('up')
diff --git a/src/calng/DetectorAssembler.py b/src/calng/DetectorAssembler.py
index 36e92e55d0ae4505fcd3170ad6e7fe4859f2dd13..3026ebb80a695c93c4d339d21fabcb74ece0f5b8 100644
--- a/src/calng/DetectorAssembler.py
+++ b/src/calng/DetectorAssembler.py
@@ -4,7 +4,7 @@ from timeit import default_timer
 
 import numpy as np
 import xarray as xr
-from calngUtils import device as device_utils, geom_utils, trackers
+from calngUtils import device as device_utils, geom_utils, shmem_utils, trackers
 from karabo.bound import (
     BOOL_ELEMENT,
     DOUBLE_ELEMENT,
@@ -215,6 +215,7 @@ class DetectorAssembler(TrainMatcher.TrainMatcher):
 
     def initialization(self):
         super().initialization()
+        self._shmem_handler = shmem_utils.ShmemCircularBufferReceiver()
 
         self._preview_friend = PreviewFriend(self, self._preview_outputs, "preview")
         self._image_data_path = self.get("imageDataPath")
@@ -412,6 +413,7 @@ class DetectorAssembler(TrainMatcher.TrainMatcher):
             earliest_source_timestamp = min(
                 earliest_source_timestamp, source_timestamp.toTimestamp()
             )
+            self._shmem_handler.dereference_shmem_handles(data)
             # regular TrainMatcher output
             self.output.write(
                 data, ChannelMetaData(source, source_timestamp), copyAllData=False
@@ -443,7 +445,9 @@ class DetectorAssembler(TrainMatcher.TrainMatcher):
         if bridge_output_choice is BridgeOutputOptions.MATCHED:
             self.zmq_output.update()
 
-        dims = ["module", "slow_scan", "fast_scan"]
+        extra_dims = ["ex_dim_" + str(i)
+                      for i in range(0, image_datas[0].ndim - 2)]
+        dims = ["module"] + extra_dims + ["slow_scan", "fast_scan"]
         coords = {"module": module_indices}
         assembled_data, _ = self._geometry.position_modules(
             xr.DataArray(
diff --git a/src/calng/FrameSelectionArbiter.py b/src/calng/FrameSelectionArbiter.py
index 53ef68e4571e7208efee5b919932fc7b2d63bb91..a2945e8a261b95df0b483e8e0682a77292d7239a 100644
--- a/src/calng/FrameSelectionArbiter.py
+++ b/src/calng/FrameSelectionArbiter.py
@@ -51,11 +51,11 @@ def output_schema():
     return res
 
 
-kernels = [
-    kernel_class.load()
-    for kernel_class in entry_points().select().get("calng.arbiter_kernel", [])
-]
-kernel_choice = {kernel_class.__name__: kernel_class for kernel_class in kernels}
+kernel_choice = {
+    entry_point.name: entry_point for entry_point in entry_points(
+        group="calng.arbiter_kernel"
+    )
+}
 
 
 def selection_plan_schema(kernels):
@@ -93,7 +93,7 @@ def selection_plan_schema(kernels):
             "effectively assigns the result of the preselection to the given name, "
             "allowing you to name combined results from other kernels."
         )
-        .options(",".join(kernel_choice))
+        .options(",".join(kernel_choice.keys()))
         .assignmentOptional()
         .defaultValue("Assign")
         .reconfigurable()
@@ -203,7 +203,7 @@ class SimpleFrameSelectionArbiter(BaseFrameSelectionArbiter):
         (
             STRING_ELEMENT(expected)
             .key("frameSelection.kernelChoice")
-            .options(",".join(kernel_choice))
+            .options(",".join(kernel_choice.keys()))
             .assignmentOptional()
             .defaultValue("ManualFilter")
             .reconfigurable()
@@ -242,7 +242,7 @@ class SimpleFrameSelectionArbiter(BaseFrameSelectionArbiter):
             .key("frameSelection.kernelParameters")
             .commit(),
         )
-        kernel_class = kernel_choice[kernel_name]
+        kernel_class = kernel_choice[kernel_name].load()
         kernel_class.extend_device_schema(
             schema_update, "frameSelection.kernelParameters"
         )
@@ -257,7 +257,7 @@ class SimpleFrameSelectionArbiter(BaseFrameSelectionArbiter):
         self.updateSchema(schema_update)
 
     def _initialize_kernel(self, class_name, conf):
-        kernel_class = kernel_choice[class_name]
+        kernel_class = kernel_choice[class_name].load()
         self.kernel = kernel_class(self, kernel_class.__name__, conf)
 
     def preReconfigure(self, conf):
diff --git a/src/calng/RoiTool.py b/src/calng/RoiTool.py
index c450afa418720a103da3e21d20acaf58fc2fe7b8..ac70686ce8ddc9e860a728cd027635ef392ff023 100644
--- a/src/calng/RoiTool.py
+++ b/src/calng/RoiTool.py
@@ -240,8 +240,10 @@ class RoiTool(Device):
             encoding=EncodingType.GRAY,
             bitsPerPixel=32,
         )
+        # something in pipeline / asyncio complains about this being memoryview now,
+        # hence explicit copy with np.array
         self.output.schema.zoomImage = ImageData(
-            zoomed.filled(fill_value=0),
+            np.array(zoomed.filled(fill_value=0)),
             encoding=EncodingType.GRAY,
             bitsPerPixel=32,
         )
diff --git a/src/calng/azim_utils.py b/src/calng/azim_utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..1fc312df9e1c05ce8340b6ee78bb94cc4802009c
--- /dev/null
+++ b/src/calng/azim_utils.py
@@ -0,0 +1,276 @@
+import copy
+import itertools
+import scipy.sparse
+
+import numpy as np
+
+from calngUtils.misc import ceil_div
+
+from .base_kernel_runner import get_kernel_template
+from .utils import np_dtype_to_c_type
+import calng.kernels.azimuthal_cython as az
+
+
+def slice_geom(geom, modules):
+    if isinstance(modules, int):
+        modules = [modules]
+    res = copy.copy(geom)
+    res.modules = [res.modules[i] for i in modules]
+    res.expected_data_shape = (len(modules),) + res.expected_data_shape[1:]
+    res.n_modules = len(res.modules)
+    return res
+
+
+def get_geom_tile_vectors(geom, det_dist):
+    num_tiles = geom.n_tiles_per_module * geom.n_modules
+    tile_corners = np.empty((num_tiles, 3), dtype=np.float64)
+    tile_ss_vecs = np.empty((num_tiles, 3), dtype=np.float64)
+    tile_fs_vecs = np.empty((num_tiles, 3), dtype=np.float64)
+    for i, tile in enumerate(itertools.chain.from_iterable(geom.modules)):
+        assert tile.ss_pixels == geom.frag_ss_pixels
+        assert tile.fs_pixels == geom.frag_fs_pixels
+        tile_corners[i] = tile.corner_pos
+        tile_ss_vecs[i] = tile.ss_vec
+        tile_fs_vecs[i] = tile.fs_vec
+    tile_corners[:, -1] += det_dist
+    return tile_corners, tile_ss_vecs, tile_fs_vecs
+
+
+def get_geom_max_theta(geom, det_dist):
+    def tile_max_theta(tile):
+        corner = tile.corner_pos + [0, 0, det_dist]
+        return max(
+            np.arctan2(np.sqrt(arr[0] ** 2 + arr[1] ** 2), arr[2])
+            for arr in [
+                corner,
+                corner + tile.ss_vec * tile.ss_pixels,
+                corner + tile.fs_vec * tile.fs_pixels,
+                corner + tile.ss_vec * tile.ss_pixels + tile.fs_vec * tile.fs_pixels,
+            ]
+        )
+
+    return max(
+        tile_max_theta(tile) for tile in itertools.chain.from_iterable(geom.modules)
+    )
+
+
+def coo_from_geom(geom, bins, det_dist, n_split):
+    tile_corners, tile_ss_vecs, tile_fs_vecs = get_geom_tile_vectors(geom, det_dist)
+    data, rows, cols = az.generate_coo(
+        bins,
+        tile_corners,
+        tile_ss_vecs,
+        tile_fs_vecs,
+        geom.frag_ss_pixels,
+        geom.frag_fs_pixels,
+        n_split,
+    )
+    return scipy.sparse.coo_matrix(
+        (
+            data,
+            (rows, cols)
+        ),
+        dtype=np.float32,
+        shape=(bins.size - 1, np.prod(geom.expected_data_shape)),
+    )
+
+
+def coo_to_sorted_csr(coo):
+    order = np.lexsort((coo.row, coo.col))
+    return scipy.sparse.coo_matrix(
+        (
+            coo.data[order],
+            (
+                coo.row[order],
+                coo.col[order],
+            ),
+        ),
+        shape=coo.shape,
+    ).tocsr()
+
+
+class BaseAzimuthalIntegrator:
+    def __init__(self, geom, bins, det_dist, n_split):
+        self.geom = geom
+        if isinstance(bins, int):
+            self.bins = np.linspace(0, get_geom_max_theta(geom, det_dist), bins)
+        else:
+            self.bins = np.asarray(bins, dtype=np.float32)
+        self.det_dist = det_dist
+        self.n_split = n_split
+        self.n_pixels = np.prod(self.geom.expected_data_shape)
+
+    def _check_shape_make_buffer(self, data, out, out_weight, multi_module):
+        if data.ndim == 1:
+            if data.size != self.n_pixels:
+                raise ValueError(
+                    "Flat input data should have size equal to number of pixels"
+                )
+            n_frames = 1
+            was_single_frame = True
+        elif data.ndim == 2:
+            if data.shape[1] != self.n_pixels:
+                raise ValueError("2d data input should have shape (frames, pixels)")
+            n_frames = data.shape[0]
+            was_single_frame = False
+        elif data.ndim == len(self.geom.expected_data_shape):
+            n_frames = 1
+            was_single_frame = True
+            data = data.ravel()
+        elif data.ndim == len(self.geom.expected_data_shape) + 1:
+            if data.shape[1:] != self.geom.expected_data_shape:
+                raise ValueError("nd data should have frames + geometry expected shape")
+            n_frames = data.shape[0]
+            data = data.reshape(n_frames, -1)
+            was_single_frame = False
+
+        output_shape = (n_frames, self.bins.size - 1)
+        if out is None:
+            out = self._xp.empty(output_shape, dtype=data.dtype)
+        else:
+            assert out.dtype == data.dtype
+            assert out.shape == output_shape
+        if multi_module and out_weight is None:
+            out_weight = self._xp.empty_like(out)
+        if was_single_frame:
+            data = data[None]  # add pretend singleton axis so kernel doesn't get mad
+
+        return data, out, out_weight, was_single_frame
+
+
+class CpuAzimuthalIntegrator(BaseAzimuthalIntegrator):
+    _xp = np
+
+    def __init__(self, geom, bins, det_dist, n_split):
+        super().__init__(geom, bins, det_dist, n_split)
+        self.csr = coo_to_sorted_csr(
+            coo_from_geom(self.geom, self.bins, self.det_dist, self.n_split)
+        )
+
+    def integrate_1d(self, data, out=None):
+        data, out, _, was_single_frame = self._check_shape_make_buffer(
+            data, out, None, False
+        )
+
+        az.integrate_1d_csr(
+            self.csr.indices,
+            self.csr.indptr,
+            self.csr.data,
+            data,
+            out,
+        )
+
+        # just being polite: if no frame axis on input, don't give one on output
+        if was_single_frame:
+            out = out[0]
+
+        return out
+
+    def integrate_1d_multimodule(self, data, out=None, out_weight=None):
+        data, out, out_weight, was_single_frame = self._check_shape_make_buffer(
+            data, out, out_weight, True
+        )
+
+        az.integrate_1d_csr_multimodule(
+            self.csr.indices,
+            self.csr.indptr,
+            self.csr.data,
+            data,
+            out,
+            out_weight,
+        )
+
+        # just being polite: if no frame axis on input, don't give one on output
+        if was_single_frame:
+            out = out[0]
+            out_weight = out_weight[0]
+
+        return out, out_weight
+
+
+class GpuAzimuthalIntegrator(BaseAzimuthalIntegrator):
+    def __init__(self, geom, bins, det_dist, n_split):
+        import cupy
+        import cupyx.scipy.sparse
+
+        self._xp = cupy
+        self._sparse = cupyx.scipy.sparse
+        super().__init__(geom, bins, det_dist, n_split)
+
+        self._kernels = {}
+        name_expressions = [
+            "integrate_1d_csr<float,float>",
+            "integrate_1d_csr_multimodule<float,float>"
+        ]
+        self._module = self._xp.RawModule(
+            code=get_kernel_template("azimuthal.cu").render(),
+            # backend="nvrtc",
+            options=("-std=c++11",),
+            name_expressions=name_expressions,
+        )
+        self._regular_kernel = self._module.get_function(name_expressions[0])
+        self._multimod_kernel = self._module.get_function(name_expressions[1])
+
+        csr_cpu = coo_to_sorted_csr(
+            coo_from_geom(self.geom, self.bins, self.det_dist, self.n_split)
+        )
+        self.csr = self._sparse.csr_matrix(csr_cpu.astype(np.float32))
+
+    def integrate_1d(self, data, out=None, block=(128, 8)):
+        data, out, _, was_single_frame = self._check_shape_make_buffer(
+            data, out, None, False
+        )
+
+        n_frames = data.shape[0]
+        grid = (
+            ceil_div(n_frames, block[0]),  # x is frame
+            ceil_div(self.csr.shape[0], block[1]),  # y is ring
+        )
+        self._regular_kernel(
+            grid,
+            block,
+            (
+                *self.csr.shape,
+                self.csr.data,
+                self.csr.indices,
+                self.csr.indptr,
+                n_frames,
+                data,
+                out,
+            ),
+        )
+
+        if was_single_frame:
+            out = out[0]
+
+        return out
+
+    def integrate_1d_multimodule(self, data, out=None, out_weight=None, block=(128, 8)):
+        data, out, out_weight, was_single_frame = self._check_shape_make_buffer(
+            data, out, out_weight, True
+        )
+        n_frames = data.shape[0]
+        grid = (
+            ceil_div(n_frames, block[0]),  # x is frame
+            ceil_div(self.csr.shape[0], block[1]),  # y is ring
+        )
+        self._multimod_kernel(
+            grid,
+            block,
+            (
+                *self.csr.shape,
+                self.csr.data,
+                self.csr.indices,
+                self.csr.indptr,
+                n_frames,
+                data,
+                out,
+                out_weight,
+            ),
+        )
+
+        if was_single_frame:
+            out = out[0]
+            out_weight = out_weight[0]
+
+        return out, out_weight
diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py
index 8fa0c4830fd8fb147174ac664b6f8327796cd918..e62a59cffc5c1b9caeb91d5473c6dd9e8346adb9 100644
--- a/src/calng/base_correction.py
+++ b/src/calng/base_correction.py
@@ -47,8 +47,6 @@ from .preview_utils import PreviewFriend
 from .utils import StateContext, WarningContextSystem, WarningLampType, subset_of_hash
 from ._version import version as deviceVersion
 
-PROCESSING_STATE_TIMEOUT = 10
-
 
 class TrainFromTheFutureException(BaseException):
     pass
@@ -70,6 +68,8 @@ class BaseCorrection(PythonDevice):
     _cuda_pin_buffers = False
     _use_shmem_handles = True
 
+    PROCESSING_STATE_TIMEOUT = 10
+
     @staticmethod
     def expectedParameters(expected):
         (
@@ -562,10 +562,11 @@ class BaseCorrection(PythonDevice):
         self.KARABO_ON_EOS("dataInput", self.handle_eos)
 
         self._enabled_addons = []
-        for addon_class in self._available_addons:
-            addon_prefix = f"addons.{addon_class.__name__}"
+        for addon_class_name in self.get("addonOrder"):
+            addon_prefix = f"addons.{addon_class_name}"
             if not self.get(f"{addon_prefix}.enable"):
                 continue
+            addon_class = self._available_addons[addon_class_name]
             addon = addon_class(self, addon_prefix, self._parameters[addon_prefix])
             self._enabled_addons.append(addon)
         if (
@@ -1023,11 +1024,11 @@ class BaseCorrection(PythonDevice):
             self._buffered_status_update.clear()
             if (
                 default_timer() - self._last_processing_started
-                > PROCESSING_STATE_TIMEOUT
+                > self.PROCESSING_STATE_TIMEOUT
             ):
                 self.updateState(State.ON)
                 self.log_status_info(
-                    f"No new train in {PROCESSING_STATE_TIMEOUT} s, switching state."
+                    f"No new train in {self.PROCESSING_STATE_TIMEOUT} s, switching state."
                 )
                 self._train_ratio_tracker.reset()
                 self._maybe_update_cell_and_pulse_tables(None, None)
@@ -1051,28 +1052,45 @@ class BaseCorrection(PythonDevice):
         self.signalEndOfStream("dataOutput")
 
 
-def add_addon_nodes(schema, device_class, prefix="addons"):
+def add_addon_nodes(schema, device_class):
     det_name = device_class.__name__[:-len("Correction")].lower()
-    device_class._available_addons = [
-        addon.load()
-        for addon in entry_points().select().get("calng.correction_addon", [])
-        if not addon.extras
-        or det_name in addon.extras
-    ]
-
-    for addon_class in device_class._available_addons:
+    device_class._available_addons = {}
+    for addon in entry_points(group="calng.correction_addon"):
+        if addon.extras and det_name in addon.extras:
+            continue
+        addon_class = addon.load()
+        device_class._available_addons[addon_class.__name__] = addon_class
+
+    for addon_class in device_class._available_addons.values():
         (
             NODE_ELEMENT(schema)
-            .key(f"{prefix}.{addon_class.__name__}")
+            .key(f"addons.{addon_class.__name__}")
             .commit(),
 
             BOOL_ELEMENT(schema)
-            .key(f"{prefix}.{addon_class.__name__}.enable")
+            .key(f"addons.{addon_class.__name__}.enable")
             .tags("managed")
             .assignmentOptional()
             .defaultValue(False)
             .commit(),
         )
         addon_class.extend_device_schema(
-            schema, f"{prefix}.{addon_class.__name__}"
+            schema, f"addons.{addon_class.__name__}"
+        )
+
+    (
+        VECTOR_STRING_ELEMENT(schema)
+        .key("addonOrder")
+        .displayedName("Addon order")
+        .description(
+            "Use this list to change the execution order of addons. Addons - which are "
+            "enabled - are executed in the order in which they appear here. If an "
+            "addon is enabled and not included in this list, it may not get "
+            "instantiated, so ensure that this list contains (a superset of) the "
+            "relevant addons."
         )
+        .tags("managed")
+        .assignmentOptional()
+        .defaultValue(list(device_class._available_addons.keys()))
+        .commit(),
+    )
diff --git a/src/calng/base_kernel_runner.py b/src/calng/base_kernel_runner.py
index 55c85ccb7c2755666906cf3ce151656dd3034b9e..a5ba784bfed16eac5bab5f1a6c9ecadea01a8e2c 100644
--- a/src/calng/base_kernel_runner.py
+++ b/src/calng/base_kernel_runner.py
@@ -328,7 +328,7 @@ class BaseKernelRunner:
         self._post_load_constant(constant)
 
     def _post_load_constant(self, constant):
-        for field_name in self._constant_to_correction_names[constant]:
+        for field_name in self._constant_to_correction_names.get(constant, []):
             key = f"corrections.{field_name}.available"
             if not self._get(key):
                 self._set(key, True)
diff --git a/src/calng/conditions/LpdCondition.py b/src/calng/conditions/LpdCondition.py
index 6d467fd167e1c37cd561b4cbcab27f5ed872dfaa..00186d3243746feadc2a984d491841c99536e2fc 100644
--- a/src/calng/conditions/LpdCondition.py
+++ b/src/calng/conditions/LpdCondition.py
@@ -1,18 +1,18 @@
 from karabo.middlelayer import (
     AccessMode,
     Assignment,
-    Slot,
-    State,
     String,
-    getDevice,
-    get_array_data,
-    sleep,
 )
-from ..corrections import LpdCorrection
 from .. import base_condition, utils
 
 
 class LpdCondition(base_condition.ConditionBase):
+    controlDeviceId = String(
+        displayedName="Control device ID",
+        assignment=Assignment.MANDATORY,
+        accessMode=AccessMode.INITONLY,
+    )
+
     someCorrectionDeviceId = String(
         displayedName="Correction device",
         description="Correction devices expose the cell table as a property. Specify "
@@ -25,7 +25,8 @@ class LpdCondition(base_condition.ConditionBase):
     @property
     def keys_to_get(self):
         return {
+            self.controlDeviceId.value: [("femAsicGainOverride", "parallelGain", None)],
             self.someCorrectionDeviceId.value: [
                 ("dataFormat.cellId", "memoryCellOrder", utils.cell_table_to_string)
-            ]
+            ],
         }
diff --git a/src/calng/correction_addons/autocorrelation.py b/src/calng/correction_addons/autocorrelation.py
index 08abb8b910db607c7e8ea52a84d7eba54fd155a7..d3fc9e8c0bcfd245d619f81fc86f69ca702fed0e 100644
--- a/src/calng/correction_addons/autocorrelation.py
+++ b/src/calng/correction_addons/autocorrelation.py
@@ -136,6 +136,7 @@ class Autocorrelation(BaseCorrectionAddon):
         )
 
     def __init__(self, device, prefix, config):
+        # note: this includes reconfigure
         super().__init__(device, prefix, config)
         self._shape = None
         self._shmem_buffer = None
@@ -156,8 +157,6 @@ class Autocorrelation(BaseCorrectionAddon):
         self._modno = int(da[-2:])
         device.set(f"{prefix}.moduleNumber", self._modno)
 
-        self.reconfigure(config)
-
         # AGIPD specific, we need to know the detector layout
         self._repeats = np.ones(512, int)
         self._repeats[63:-1:64] = 2
diff --git a/src/calng/correction_addons/azimuthal.py b/src/calng/correction_addons/azimuthal.py
new file mode 100644
index 0000000000000000000000000000000000000000..cb25a37ccf680becc3adee2e5e8acc1075915ddc
--- /dev/null
+++ b/src/calng/correction_addons/azimuthal.py
@@ -0,0 +1,209 @@
+import re
+
+from karabo.bound import (
+    BOOL_ELEMENT,
+    DOUBLE_ELEMENT,
+    IMAGEDATA_ELEMENT,
+    INT32_ELEMENT,
+    NDARRAY_ELEMENT,
+    NODE_ELEMENT,
+    UINT32_ELEMENT,
+    EventLoop,
+    Hash,
+    ImageData,
+)
+import numpy as np
+
+from .base_addon import BaseCorrectionAddon
+from .. import azim_utils
+from ..utils import maybe_get
+
+
+class AzimuthalIntegrator(BaseCorrectionAddon):
+    @staticmethod
+    def extend_device_schema(schema, prefix):
+        (
+            UINT32_ELEMENT(schema)
+            .key(f"{prefix}.numBins")
+            .tags("managed")
+            .assignmentOptional()
+            .defaultValue(1000)
+            .reconfigurable()
+            .commit(),
+
+            # TODO: automatic decision from integration with device class
+            # or enable using GPU even if rest of device didn't
+            BOOL_ELEMENT(schema)
+            .key(f"{prefix}.useGpu")
+            .tags("managed")
+            .assignmentOptional()
+            .defaultValue(True)
+            .reconfigurable()
+            .commit(),
+
+            BOOL_ELEMENT(schema)
+            .key(f"{prefix}.multiModule")
+            .tags("managed")
+            .assignmentOptional()
+            .defaultValue(True)
+            .reconfigurable()
+            .commit(),
+
+            INT32_ELEMENT(schema)
+            .key(f"{prefix}.moduleIndex")
+            .assignmentOptional()
+            .defaultValue(-1)
+            .reconfigurable()
+            .commit(),
+
+            UINT32_ELEMENT(schema)
+            .key(f"{prefix}.pixelsplittingFactor")
+            .assignmentOptional()
+            .defaultValue(5)
+            .reconfigurable()
+            .commit(),
+
+            DOUBLE_ELEMENT(schema)
+            .key(f"{prefix}.detectorDistance")
+            .assignmentOptional()
+            .defaultValue(0.5)
+            .reconfigurable()
+            .commit(),
+        )
+
+    @staticmethod
+    def extend_output_schema(schema):
+        (
+            NODE_ELEMENT(schema)
+            .key("azimuthal")
+            .commit(),
+
+            NDARRAY_ELEMENT(schema)
+            .key("azimuthal.integrated1d")
+            .commit(),
+
+            NDARRAY_ELEMENT(schema)
+            .key("azimuthal.integrated1dWeight")
+            .commit(),
+
+            NDARRAY_ELEMENT(schema)
+            .key("azimuthal.preview")
+            .commit(),
+
+            IMAGEDATA_ELEMENT(schema)
+            .key("azimuthal.streak")
+            .commit(),
+        )
+
+    def __init__(self, device, prefix, conf):
+        self._conf = Hash()
+        self.integrator = None
+        # full is entire detector, local is for this module (if case multi-module det)
+        self.full_geom = None
+        self.local_geom = None
+        super().__init__(device, prefix, conf)
+        self._try_to_guess_module_index()
+
+    def reconfigure(self, conf):
+        self._conf.merge(conf)
+        if self.full_geom is None:
+            # cannot do anything below yet
+            return
+        if any(conf.has(key) for key in ("useGpu", "multiModule", "numBins")):
+            self._compute_bins()
+            EventLoop.post(self._initialize_integrator)
+        elif any(conf.has(key) for key in ("pixelsplittingFactor", "useGpu")):
+            EventLoop.post(self._initialize_integrator)
+
+    def on_new_geometry(self, geom):
+        self.full_geom = geom
+        if self._conf["multiModule"]:
+            # TODO: warn if module index not known
+            self.local_geom = azim_utils.slice_geom(
+                self.full_geom, self._conf["moduleIndex"]
+            )
+        else:
+            self.local_geom = self.full_geom
+        self._compute_bins()
+        EventLoop.post(self._initialize_integrator)
+
+    def _compute_bins(self):
+        if self.full_geom is None:
+            self._device.log_status_warn(
+                "Azimuthal integration addon cannot compute bins because it doesn't know "
+                "its geometry"
+            )
+            return
+        self.bins = np.linspace(
+            0,
+            azim_utils.get_geom_max_theta(self.full_geom, self._conf["detectorDistance"]),
+            self._conf["numBins"],
+            dtype=np.float32
+        )
+
+    def _initialize_integrator(self):
+        self._device.log_status_info("Azimuthal integration addon computing CSR...")
+        if self._conf["useGpu"]:
+            self.integrator = azim_utils.GpuAzimuthalIntegrator(
+                self.local_geom,
+                self.bins,
+                self._conf["detectorDistance"],
+                self._conf["pixelsplittingFactor"],
+            )
+        else:
+            self.integrator = azim_utils.CpuAzimuthalIntegrator(
+                self.local_geom,
+                self.bins,
+                self._conf["detectorDistance"],
+                self._conf["pixelsplittingFactor"],
+            )
+        self._device.log_status_info("Azimuthal integration addon CSR is ready")
+
+    def post_correction(self, train_id, processed_data, cell_table, pulse_table, output_hash):
+        if self.integrator is None:
+            self._device.log_status_warn("Azimuthal integration addon not ready yet")
+            return
+        processed_data = processed_data.reshape(processed_data.shape[0], -1)
+        if self._conf["multiModule"]:
+            data, weight = self.integrator.integrate_1d_multimodule(processed_data)
+            output_hash["azimuthal.integrated1d"] = maybe_get(data)
+            output_hash["azimuthal.integrated1dWeight"] = maybe_get(weight)
+            # TODO: don't assume multi-frame data
+            previewable = data / weight
+        else:
+            data, weight = self.integrator.integrate_1d(processed_data)
+            output_hash["azimuthal.integrated1d"] = maybe_get(data)
+            previewable = data
+        output_hash["azimuthal.bins"] = self.bins
+        output_hash["azimuthal.preview"] = maybe_get(
+            self.integrator._xp.nanmean(previewable, axis=0)
+        )
+        output_hash["azimuthal.streak"] = ImageData(
+            maybe_get(
+                self.integrator._xp.nan_to_num(previewable)
+            )
+        )
+
+    def _try_to_guess_module_index(self):
+        for source in self._device.get("fastSources"):
+            # stolen from masking addon
+            for (source_pattern, extractor) in [
+                (
+                    r".*\/DET\/(\d+)CH0:xtdf",  # XTDF-detector source
+                    lambda match: int(match.group(1)),
+                ),
+                (
+                    r".*\/DET\/.*?(\d+):daqOutput",  # typical JF receiver source
+                    lambda match: int(match.group(1)) - 1,
+                    # don't subtract 1; JUNGFRAUGeometry does this for us
+                )
+            ]:
+                if (match := re.match(source_pattern, source)) is not None:
+                    module_index = extractor(match)
+                    self._device.log_status_info(
+                        f"Source {source} means {module_index}"
+                    )
+                    self._device.set(f"{self._prefix}.moduleIndex", module_index)
+                    self._conf["moduleIndex"] = module_index
+                    return
+        self._device.log_status_info("User mask addon couldn't guess module index")
diff --git a/src/calng/correction_addons/base_addon.py b/src/calng/correction_addons/base_addon.py
index 0a063bde862cecf7f5755aba34e8b53a14019d53..1ab990902b07e6a2252a3466540dce986102a4d1 100644
--- a/src/calng/correction_addons/base_addon.py
+++ b/src/calng/correction_addons/base_addon.py
@@ -1,5 +1,5 @@
 class BaseCorrectionAddon:
-    _device = None  # will be set to host device *after* init
+    _device = None  # host device will pass itself to __init__, default __init__ sets
 
     @staticmethod
     def extend_device_schema(schema, prefix):
@@ -16,9 +16,10 @@ class BaseCorrectionAddon:
         pass
 
     def __init__(self, device, prefix, config):
-        """Will be given the node from extend_device_schema, no prefix needed here"""
+        """Will get node from extend_device_schema; node paths won't include prefix"""
         self._device = device
         self._prefix = prefix
+        self.reconfigure(config)
 
     def post_correction(
         self, train_id, processed_data, cell_table, pulse_table, output_hash
diff --git a/src/calng/correction_addons/integrate_roi.py b/src/calng/correction_addons/integrate_roi.py
new file mode 100644
index 0000000000000000000000000000000000000000..184435f944244d06c283a72cc725a9e5bccd024c
--- /dev/null
+++ b/src/calng/correction_addons/integrate_roi.py
@@ -0,0 +1,94 @@
+import numpy as np
+from karabo.bound import (
+    DOUBLE_ELEMENT,
+    STRING_ELEMENT,
+    VECTOR_INT32_ELEMENT,
+    Epochstamp,
+    Hash,
+    Timestamp,
+    Trainstamp,
+)
+
+from ..utils import maybe_get
+from .base_addon import BaseCorrectionAddon
+
+
+class RoiIntegration(BaseCorrectionAddon):
+    @staticmethod
+    def extend_device_schema(schema, prefix):
+        (
+            STRING_ELEMENT(schema)
+            .key(f'{prefix}.pulses')
+            .description(
+                'Slice pulses to select in the train. Selected pulses '
+                'are summed before performing the integration')
+            .assignmentOptional()
+            .defaultValue('0:1')
+            .reconfigurable()
+            .commit(),
+
+            DOUBLE_ELEMENT(schema)
+            .key(f"{prefix}.valueMin")
+            .assignmentOptional()
+            .defaultValue(-10.0)
+            .reconfigurable()
+            .commit(),
+
+            DOUBLE_ELEMENT(schema)
+            .key(f"{prefix}.valueMax")
+            .assignmentOptional()
+            .defaultValue(20000.0)
+            .reconfigurable()
+            .commit(),
+
+            VECTOR_INT32_ELEMENT(schema)
+            .key(f"{prefix}.roi")
+            .assignmentOptional()
+            .defaultValue([0, 10, 0, 10])
+            .reconfigurable()
+            .commit(),
+
+            DOUBLE_ELEMENT(schema)
+            .key(f"{prefix}.meanIntensity")
+            .readOnly()
+            .initialValue(0.)
+            .commit(),
+
+            DOUBLE_ELEMENT(schema)
+            .key(f"{prefix}.meanIntensityROI")
+            .readOnly()
+            .initialValue(0.)
+            .commit(),
+        )
+
+    def reconfigure(self, conf):
+        if conf.has("roi"):
+            self.x_min, self.x_max, self.y_min, self.y_max = conf["roi"]
+        if conf.has("pulses"):
+            self.pulses = eval(f'np.s_[{conf["pulses"]}]')
+        if conf.has("valueMax"):
+            self._vmax = conf["valueMax"]
+        if conf.has("valueMin"):
+            self._vmin = conf["valueMin"]
+
+    def post_correction(self, train_id, data, cell_table, pulse_table, data_hash):
+        data = data[self.pulses]
+        mask = np.isfinite(data) & (self._vmin < data) & (data < self._vmax)
+        count = np.sum(mask)
+        count_roi = np.sum(mask[:, self.y_min:self.y_max, self.x_min:self.x_max])
+
+        masked_data = np.nansum(data * mask, axis=0)
+        mean_data = np.nansum(masked_data) / count
+
+        roi = masked_data[self.y_min:self.y_max, self.x_min:self.x_max]
+        mean_roi = np.nansum(roi) / count_roi
+
+        res = Hash(
+            f"{self._prefix}.meanIntensity", float(maybe_get(mean_data)),
+            f"{self._prefix}.meanIntensityROI", float(maybe_get(mean_roi)),
+        )
+
+        ts = Timestamp(Epochstamp(), Trainstamp(train_id))
+        ts.toHashAttributes(res.getAttributes(f"{self._prefix}.meanIntensity"))
+        ts.toHashAttributes(res.getAttributes(f"{self._prefix}.meanIntensityROI"))
+        self._device.set(res)
diff --git a/src/calng/correction_addons/integrated_intensity.py b/src/calng/correction_addons/integrated_intensity.py
index 188b6d4f26ac629a9a323d646aca54f120be2568..a016a6f5f8e610eb1862e28381c5efb212d173f9 100644
--- a/src/calng/correction_addons/integrated_intensity.py
+++ b/src/calng/correction_addons/integrated_intensity.py
@@ -1,15 +1,9 @@
 import numpy as np
 from karabo.bound import DOUBLE_ELEMENT, NDARRAY_ELEMENT, NODE_ELEMENT
+from ..utils import maybe_get
 from .base_addon import BaseCorrectionAddon
 
 
-def maybe_get(a):
-    # TODO: proper check for cupy
-    if hasattr(a, "get"):
-        return a.get()
-    return a
-
-
 class IntegratedIntensity(BaseCorrectionAddon):
     def __init__(self, device, prefix, config):
         super().__init__(device, prefix, config)
diff --git a/src/calng/correction_addons/litpixel_counter.py b/src/calng/correction_addons/litpixel_counter.py
index 8089e9c33133155fc574698ef0c14155fd7a66b5..90c4eaba7f5351dd7df7b1a2cc4ac2c8062163b6 100644
--- a/src/calng/correction_addons/litpixel_counter.py
+++ b/src/calng/correction_addons/litpixel_counter.py
@@ -22,16 +22,15 @@ def factors(n):
 
 class LitPixelCounter(BaseCorrectionAddon):
     def __init__(self, device, prefix, config):
-        super().__init__(device, prefix, config)
-        global cupy
-        import cupy
-
         self._ss_size = device.kernel_runner.num_pixels_ss
         self._ss_factors = factors(self._ss_size)
         self._fs_size = device.kernel_runner.num_pixels_fs
         self._fs_factors = factors(self._fs_size)
 
-        self.reconfigure(config)
+        # note: this includes reconfigure
+        super().__init__(device, prefix, config)
+        global cupy
+        import cupy
 
         # these properties may be changed to the factors
         # of module dimensions due to protection
diff --git a/src/calng/correction_addons/masking.py b/src/calng/correction_addons/masking.py
new file mode 100644
index 0000000000000000000000000000000000000000..192e56d33544d289fb543640da16c849b30d80db
--- /dev/null
+++ b/src/calng/correction_addons/masking.py
@@ -0,0 +1,220 @@
+import re
+
+import numpy as np
+import h5py
+from karabo.bound import (
+    BOOL_ELEMENT,
+    INT32_ELEMENT,
+    STRING_ELEMENT,
+    SLOT_ELEMENT,
+)
+from .base_addon import BaseCorrectionAddon
+
+
+class UserMask(BaseCorrectionAddon):
+    @staticmethod
+    def extend_device_schema(schema, prefix):
+        (
+            STRING_ELEMENT(schema)
+            .key(f"{prefix}.maskFilePath")
+            .displayedName("File path")
+            .description(
+                "Path to an HDF5 file containing mask data. If set, will automatically "
+                "try to load on init. Will also try to (re)load when path is changed."
+            )
+            .tags("managed")
+            .assignmentOptional()
+            .defaultValue("")
+            .reconfigurable()
+            .commit(),
+
+            STRING_ELEMENT(schema)
+            .key(f"{prefix}.dataPath")
+            .displayedName("Dataset name")
+            .description(
+                "Path within the file to go to for the mask data. If left empty, I "
+                "will just take the first array I find and try my best. Will reload if "
+                "this is set / changed."
+            )
+            .tags("managed")
+            .assignmentOptional()
+            .defaultValue("")
+            .reconfigurable()
+            .commit(),
+
+            SLOT_ELEMENT(schema)
+            .key(f"{prefix}.loadMask" )
+            .displayedName("Load mask")
+            .description(
+                "(Re)loads the mask from currently specified file + dataset name"
+            )
+            .tags("managed")
+            .commit(),
+
+            BOOL_ELEMENT(schema)
+            .key(f"{prefix}.multiModule")
+            .displayedName("File covers multiple modules?")
+            .description(
+                "For multi-module detectors, you probably want to generate a single "
+                "mask file rather than many. If this flag is on, the first axis of the "
+                "loaded dataset is assumed to correspond to module index. So shape will "
+                "be ex. (16, 256, 256) for LPD1M if this is set and burstMode is not "
+                "set."
+            )
+            .tags("managed")
+            .assignmentOptional()
+            .defaultValue(True)
+            .reconfigurable()
+            .commit(),
+
+            BOOL_ELEMENT(schema)
+            .key(f"{prefix}.burstMode")
+            .displayedName("File has per-cell mask?")
+            .description(
+                "For burst mode detectors, it can make sense to mask based on pixel "
+                "*and* memory cell. If this flag is set, the second axis (first axis "
+                "in case multiModule is False) is assumed to correspond to memory "
+                "cells. Please make sure the cell axis is large enough to accomodate "
+                "the memory cells in use. Leave this setting False to just provide a "
+                "simple per-pixel mask (definitely the case when masking by hand)."
+            )
+            .tags("managed")
+            .assignmentOptional()
+            .defaultValue(False)
+            .reconfigurable()
+            .commit(),
+
+            INT32_ELEMENT(schema)
+            .key(f"{prefix}.moduleIndex")
+            .displayedName("Module index")
+            .description(
+                "For multi-module detectors, if multiModule is on, we need to know the "
+                "correction device's module's index. This is hopefully automatically "
+                "set."
+            )
+            .assignmentOptional()
+            .defaultValue(-1)
+            .commit(),
+        )
+
+    def __init__(self, device, prefix, conf):
+        self._module_index = None
+        self._mask = None
+        super().__init__(device, prefix, conf)
+        self._device.KARABO_SLOT(
+            self._load_mask,
+            slotName=f"{prefix}.loadMask".replace(".", "_"),
+            numArgs=0,
+        )
+
+    def _try_to_guess_module_index(self):
+        for source in self._device.get("fastSources"):
+            # stolen from DetectorAssembler
+            # TODO: something configurable, robust
+            for (source_pattern, extractor) in [
+                (
+                    r".*\/DET\/(\d+)CH0:xtdf",  # XTDF-detector source
+                    lambda match: int(match.group(1)),
+                ),
+                (
+                    r".*\/DET\/.*?(\d+):daqOutput",  # typical JF receiver source
+                    lambda match: int(match.group(1)) - 1,
+                    # don't subtract 1; JUNGFRAUGeometry does this for us
+                )
+            ]:
+                if (match := re.match(source_pattern, source)) is not None:
+                    self._module_index = extractor(match)
+                    self._device.log_status_info(
+                        f"Source {source} means {self._module_index}"
+                    )
+                    return
+        self._device.log_status_info("User mask addon couldn't guess module index")
+
+    def reconfigure(self, conf):
+        if conf.has("moduleIndex"):
+            if conf["moduleIndex"] < 0:
+                self._try_to_guess_module_index()
+                if self._module_index is not None:
+                    conf["moduleIndex"] = self._module_index
+            else:
+                self._module_index = conf["moduleIndex"]
+        if conf.has("multiModule"):
+            if conf["multiModule"] and self._module_index is None:
+                warning = "Unable to guess module index, cannot use multi-module mask files"
+                # only throw if we're /not/ in device init
+                if hasattr(self, "_past_init"):
+                    # but interrupt bad preReconfigure with exception
+                    raise ValueError(warning)
+                else:
+                    self._device.log_status_warn(f"{self._prefix}: {warning}")
+            self._multi_module = conf["multiModule"]
+        if conf.has("burstMode"):
+            self._burst_mode = conf["burstMode"]
+        if conf.has("maskFilePath"):
+            self._mask_fn = conf["maskFilePath"]
+        if conf.has("dataPath"):
+            self._mask_data_path = conf["dataPath"]
+            self._user_mask_data_path = self._mask_data_path
+        if self._mask_fn and (conf.has("maskFilePath") or conf.has("dataPath")):
+            try:
+                self._load_mask()
+            except Exception as ex:
+                self._device.log_status_warn(
+                    f"{self._prefix} failed to load mask: {ex}"
+                )
+                if not initial:
+                    raise ex
+        self._past_init = True
+
+
+    def _load_mask(self):
+        """Utility function to load mask from currently set path and such. Can throw
+        all kinds of exceptions, caller should decide whether to catch them."""
+        with h5py.File(self._mask_fn, "r") as fd:
+            if not self._user_mask_data_path:
+                self._guess_data_path(fd)
+            mask = np.array(fd[self._mask_data_path])
+        if self._multi_module:
+            assert (
+                self._module_index is not None
+            ), "Tried loading multi module, but I don't know my module index"
+            assert (
+                self._module_index <= mask.shape[0]
+            ), "My module index is out of bounds for this mask file"
+            mask = mask[self._module_index]
+        if self._burst_mode:
+            assert mask.ndim == 3, f"Burst mode but mask has {mask.ndim} dimensions"
+        else:
+            assert mask.ndim == 2, f"Single-frame mode but mask has {mask.ndim} dimensions"
+        self._mask = (mask != 0)
+
+    def _guess_data_path(self, fd):
+        """People probably don't want to specify the data path, but if the mask is the
+        only Dataset in the file, then we can just try to guess it."""
+        def visitor(key, val):
+            if isinstance(val, h5py.Dataset):
+                self._device.log_status_info(
+                    f"User mask addon assumes mask path is: {key}"
+                )
+                self._mask_data_path = key
+                self._device.set(f"{self._prefix}.dataPath", self._mask_data_path)
+                raise StopIteration()
+        try:
+            fd.visititems(visitor)
+        except StopIteration:
+            pass
+        else:
+            raise ValueError(f"No dataset found in {fd.filename}")
+
+    def post_correction(self, train_id, data, cell_table, pulse_table, output_hash):
+        if self._burst_mode:
+            try:
+                mask = self._mask[cell_table]
+            except IndexError as ex:
+                self._device.log_status_warn(
+                    f"Failed to get mask in cell order (mask too small?), {ex}"
+                )
+                return
+        else:
+            mask = self._mask
+        data[..., mask] = np.nan
diff --git a/src/calng/correction_addons/peakfinder9.py b/src/calng/correction_addons/peakfinder9.py
index 1647350cb0e7e7a9cb567d99d77159e966a76d52..4a9d199f4c7dd77cadb579d827160d40bee685be 100644
--- a/src/calng/correction_addons/peakfinder9.py
+++ b/src/calng/correction_addons/peakfinder9.py
@@ -170,11 +170,11 @@ class Peakfinder9(BaseCorrectionAddon):
                 pass
 
     def __init__(self, device, prefix, config):
+        self._config = config
         super().__init__(device, prefix, config)
         global cupy
         import cupy
 
-        self._config = config
 
         _src_dir = pathlib.Path(__file__).absolute().parent.parent
         with (_src_dir / "kernels" / "peakfinder9_gpu.cu").open("r") as fd:
diff --git a/src/calng/corrections/LpdCorrection.py b/src/calng/corrections/LpdCorrection.py
index 8e77e95593dcd594646b8735ea4029b6d4d5fc43..15b0106d34e63180ad64b6152d466b9e0940e159 100644
--- a/src/calng/corrections/LpdCorrection.py
+++ b/src/calng/corrections/LpdCorrection.py
@@ -3,6 +3,7 @@ import enum
 import numpy as np
 from karabo.bound import (
     BOOL_ELEMENT,
+    FLOAT_ELEMENT,
     DOUBLE_ELEMENT,
     KARABO_CLASSINFO,
     OUTPUT_CHANNEL,
@@ -30,6 +31,7 @@ class Constants(enum.Enum):
     FFMap = enum.auto()
     BadPixelsDark = enum.auto()
     BadPixelsFF = enum.auto()
+    Noise = enum.auto()
 
 
 bad_pixel_constants = {
@@ -45,6 +47,14 @@ class CorrectionFlags(enum.IntFlag):
     REL_GAIN = 4
     FF_CORR = 8
     BPMASK = 16
+    PARALLELGAIN = 32
+
+
+class ParallelGainOptions(enum.Enum):
+    HIGH_GAIN = 0
+    MEDIUM_GAIN = 1
+    LOW_GAIN = 2
+    THRESHOLD = enum.auto()
 
 
 correction_steps = (
@@ -53,6 +63,9 @@ correction_steps = (
     ("relGain", CorrectionFlags.REL_GAIN, {Constants.RelativeGain}),
     ("flatfield", CorrectionFlags.FF_CORR, {Constants.FFMap}),
     ("badPixels", CorrectionFlags.BPMASK, bad_pixel_constants),
+    # for threshold, will need Offset and Noise
+    # manual selection of one always works
+    ("parallelGain", CorrectionFlags.PARALLELGAIN, {None}),
 )
 
 
@@ -68,6 +81,46 @@ class LpdBaseRunner(base_kernel_runner.BaseKernelRunner):
         super(cls, cls).add_schema(schema)
         super(cls, cls).add_bad_pixel_config(schema)
 
+        (
+            OVERWRITE_ELEMENT(schema)
+            .key("corrections.parallelGain")
+            .setNewDescription(
+                "The processing of parallel gain data is set automatically based on "
+                "constantParameters.parallelGain. This node is only used to set the "
+                "threshold values to use."
+            )
+            .commit(),
+
+            STRING_ELEMENT(schema)
+            .key("corrections.parallelGain.handling")
+            .tags("managed")
+            .options(",".join(o.name for o in ParallelGainOptions))
+            .assignmentOptional()
+            .defaultValue(ParallelGainOptions.THRESHOLD.name)
+            .reconfigurable()
+            .commit(),
+
+            FLOAT_ELEMENT(schema)
+            .key("corrections.parallelGain.sigmaHighMedium")
+            .tags("managed")
+            .description("TODO")
+            .assignmentOptional()
+            # TODO: good defaults
+            .defaultValue(100)
+            .reconfigurable()
+            .commit(),
+
+            FLOAT_ELEMENT(schema)
+            .key("corrections.parallelGain.sigmaMediumLow")
+            .tags("managed")
+            .description("See description of thresholdHighMedium")
+            .assignmentOptional()
+            # TODO: good defaults
+            .defaultValue(200)
+            .reconfigurable()
+            .commit(),
+        )
+
     def expected_input_shape(self, num_frames):
         return (num_frames, 1, self.num_pixels_ss, self.num_pixels_fs)
 
@@ -77,6 +130,7 @@ class LpdBaseRunner(base_kernel_runner.BaseKernelRunner):
 
     def _setup_constant_buffers(self):
         self.offset_map = self._xp.empty(self._gm_map_shape, dtype=np.float32)
+        self.noise_map = self._xp.empty(self._gm_map_shape, dtype=np.float32)
         self.gain_amp_map = self._xp.empty(self._gm_map_shape, dtype=np.float32)
         self.rel_gain_slopes_map = self._xp.empty(self._gm_map_shape, dtype=np.float32)
         self.flatfield_map = self._xp.empty(self._gm_map_shape, dtype=np.float32)
@@ -84,12 +138,19 @@ class LpdBaseRunner(base_kernel_runner.BaseKernelRunner):
         self.flush_buffers(set(Constants))
 
     def _make_output_buffers(self, num_frames, flags):
+        if flags & CorrectionFlags.PARALLELGAIN:
+            num_frames = num_frames // 3
         output_shape = (num_frames, self.num_pixels_ss, self.num_pixels_fs)
         return [
             self._xp.empty(output_shape, dtype=self._xp.float32),  # image
             self._xp.empty(output_shape, dtype=self._xp.float32),  # gain
         ]
 
+    def _expected_output_shape(self, num_frames):
+        if self._correction_flag_enabled & CorrectionFlags.PARALLELGAIN:
+            num_frames = num_frames // 3
+        return super()._expected_output_shape(num_frames)
+
     def _preview_data_views(self, raw_data, processed_data, gain_map):
         # TODO: always split off gain from raw to avoid messing up preview?
         return (
@@ -104,6 +165,7 @@ class LpdBaseRunner(base_kernel_runner.BaseKernelRunner):
             Constants.BadPixelsDark: ((2, 1, 0, 3), self.bad_pixel_map),
             Constants.BadPixelsFF: ((2, 0, 1, 3), self.bad_pixel_map),
             Constants.Offset: ((2, 1, 0, 3), self.offset_map),
+            Constants.Noise: ((2, 1, 0, 3), self.noise_map),
             Constants.GainAmpMap: ((2, 0, 1, 3), self.gain_amp_map),
             Constants.FFMap: ((2, 0, 1, 3), self.flatfield_map),
             Constants.RelativeGain: ((2, 0, 1, 3), self.rel_gain_slopes_map),
@@ -127,11 +189,33 @@ class LpdBaseRunner(base_kernel_runner.BaseKernelRunner):
                 )
             )
 
+    def reconfigure(self, config):
+        super().reconfigure(config)
+        if config.has("constantParameters.parallelGain"):
+            val = config["constantParameters.parallelGain"]
+            self._set("corrections.parallelGain.available", val)
+            self._set("corrections.parallelGain.available", val)
+            self._update_correction_flags()
+        if config.has("corrections.parallelGain.sigmaHighMedium"):
+            self._gain_sigma_high_medium = self._xp.float32(
+                config["corrections.parallelGain.sigmaHighMedium"]
+            )
+        if config.has("corrections.parallelGain.sigmaMediumLow"):
+            self._gain_sigma_medium_low = self._xp.float32(
+                config["corrections.parallelGain.sigmaMediumLow"]
+            )
+        if config.has("corrections.parallelGain.handling"):
+            self._parallel_gain_handling = ParallelGainOptions[
+                config["corrections.parallelGain.handling"]
+            ]
+
     def flush_buffers(self, constants):
         if Constants.Offset in constants:
             self.offset_map.fill(0)
         if Constants.GainAmpMap in constants:
             self.gain_amp_map.fill(1)
+        if Constants.Noise in constants:
+            self.noise_map.fill(0)
         if Constants.RelativeGain in constants:
             self.rel_gain_slopes_map.fill(1)
         if Constants.FFMap in constants:
@@ -144,7 +228,7 @@ class LpdGpuRunner(LpdBaseRunner):
     runner_type = base_kernel_runner.KernelRunnerTypes.GPU
 
     def _post_init(self):
-        self.correction_kernel = self._xp.RawModule(
+        cuda_module = self._xp.RawModule(
             code=base_kernel_runner.get_kernel_template("lpd_gpu.cu").render(
                 {
                     "ss_dim": self.num_pixels_ss,
@@ -155,7 +239,9 @@ class LpdGpuRunner(LpdBaseRunner):
                     "corr_enum": utils.enum_to_c_template(self._correction_flag_class),
                 }
             )
-        ).get_function("correct")
+        )
+        self.correction_kernel = cuda_module.get_function("correct")
+        self.gain_threshold_kernel = cuda_module.get_function("parallel_gain_threshold")
 
     def _correct(self, flags, image_data, cell_table, processed_data, gain_map):
         num_frames = self._xp.uint16(image_data.shape[0])
@@ -163,11 +249,43 @@ class LpdGpuRunner(LpdBaseRunner):
         grid_shape = utils.grid_to_cover_shape_with_blocks(
             processed_data.shape, block_shape
         )
+
+        if flags & CorrectionFlags.PARALLELGAIN:
+            # note: processed_data has num_frames // 3 frames already
+            # plan: override image_data with thresholded / sliced data
+            num_frames = num_frames // 3
+
+            if self._parallel_gain_handling is ParallelGainOptions.THRESHOLD:
+                self.gain_threshold_kernel(
+                    grid_shape,
+                    block_shape,
+                    (
+                        num_frames,
+                        cell_table,
+                        self._constant_memory_cells,
+                        self._gain_sigma_high_medium,
+                        self._gain_sigma_medium_low,
+                        self.offset_map,
+                        self.noise_map,
+                        image_data,
+                    )
+                )
+                # kernel put results into first third of "frames" in buffer
+                image_data = image_data[:num_frames]
+
+            else:
+                gain_stage = self._parallel_gain_handling.value
+                image_data = image_data[
+                    gain_stage * num_frames:(gain_stage + 1) * num_frames
+                ]
+            # ordinary kernel will later extract gain values,
+            # so just leave them in "raw" data passed to it
+
+        # regular kernel does not need to know about parallel gain
         self.correction_kernel(
             grid_shape,
             block_shape,
             (
-                image_data,
                 cell_table,
                 flags,
                 num_frames,
@@ -178,6 +296,7 @@ class LpdGpuRunner(LpdBaseRunner):
                 self.flatfield_map,
                 self.bad_pixel_map,
                 self.bad_pixel_mask_value,
+                image_data,
                 gain_map,
                 processed_data,
             ),
@@ -213,12 +332,13 @@ class LpdCalcatFriend(base_calcat.BaseCalcatFriend):
     @property
     def _constants_need_conditions(self):
         return {
-            Constants.Offset: self.with_cell_condition,
-            Constants.BadPixelsDark: self.with_cell_condition,
-            Constants.GainAmpMap: self.category_condition,
-            Constants.FFMap: self.category_condition,
-            Constants.RelativeGain: self.category_condition,
-            Constants.BadPixelsFF: self.category_condition,
+            Constants.Offset: self.dark_condition,
+            Constants.Noise: self.dark_condition,
+            Constants.BadPixelsDark: self.dark_condition,
+            Constants.GainAmpMap: self.gain_condition,
+            Constants.FFMap: self.gain_condition,
+            Constants.RelativeGain: self.gain_condition,
+            Constants.BadPixelsFF: self.gain_condition,
         }
 
     @staticmethod
@@ -295,6 +415,15 @@ class LpdCalcatFriend(base_calcat.BaseCalcatFriend):
             .defaultValue("")
             .reconfigurable()
             .commit(),
+
+            BOOL_ELEMENT(schema)
+            .key("constantParameters.parallelGain")
+            .tags("managed")
+            .displayedName("Parallel gain mode")
+            .assignmentOptional()
+            .defaultValue(False)
+            .reconfigurable()
+            .commit(),
         )
 
         base_calcat.add_status_schema_from_enum(schema, Constants)
@@ -309,20 +438,18 @@ class LpdCalcatFriend(base_calcat.BaseCalcatFriend):
 
         return res
 
-    def with_cell_condition(self):
+    def dark_condition(self):
         res = self.basic_condition()
         if self._get_param("useMemoryCellOrder"):
             res["Memory cell order"] = self._get_param("memoryCellOrder")
+        if self._get_param("parallelGain"):
+            res["Parallel gain"] = 1
 
         return res
 
-    def illuminated_condition(self):
+    def gain_condition(self):
         res = self.basic_condition()
         res["Source Energy"] = self._get_param("photonEnergy")
-        return res
-
-    def category_condition(self):
-        res = self.illuminated_condition()
         res["category"] = self._get_param("category")
         return res
 
@@ -337,6 +464,10 @@ class LpdCorrection(base_correction.BaseCorrection):
 
     _preview_outputs = [PreviewSpec(name) for name in ("raw", "corrected", "gainMap")]
 
+    def __init__(self, config):
+        super().__init__(config)
+        self._parallel_gain = config["constantParameters.parallelGain"]
+
     @classmethod
     def expectedParameters(cls, expected):
         LpdCalcatFriend.add_schema(expected)
@@ -351,3 +482,48 @@ class LpdCorrection(base_correction.BaseCorrection):
             )
             .commit(),
         )
+
+    def _get_data_from_hash(self, data_hash):
+        """Will get image data, cell table, pulse table, and list of other arrays from
+        the data hash. Assumes XTDF (image.data, image.cellId, image.pulseId, and no
+        other data), non-XTDF subclass can override. Cell and pulse table can be
+        None."""
+
+        image_data = data_hash.get(self._image_data_path)
+        cell_table = data_hash[self._cell_table_path].ravel()
+        pulse_table = data_hash[self._pulse_table_path].ravel()
+        num_frames = cell_table.size
+
+        if self._parallel_gain:
+            # TODO: warn if num_frames not multiple of 3
+            actual_frames = num_frames // 3
+            cell_table[actual_frames : 2 * actual_frames] = cell_table[:actual_frames]
+            cell_table[2 * actual_frames:] = cell_table[:actual_frames]
+            pulse_table[actual_frames : 2 * actual_frames] = pulse_table[:actual_frames]
+            pulse_table[2 * actual_frames:] = pulse_table[:actual_frames]
+
+        # DataAggregator typically tells us the wrong axis order
+        if self.unsafe_get("workarounds.overrideInputAxisOrder"):
+            # note: regular correction kernel needn't know about parallel gain
+            expected_shape = self.kernel_runner.expected_input_shape(
+                num_frames
+            )
+            if expected_shape != image_data.shape:
+                image_data.shape = expected_shape
+        return (
+            num_frames,
+            image_data,
+            cell_table,
+            pulse_table,
+        )
+
+    def _maybe_update_cell_and_pulse_tables(self, cell_table, pulse_table):
+        if self._parallel_gain and cell_table is not None:
+            actual_frames = cell_table.size // 3
+            cell_table = cell_table[:actual_frames]
+            pulse_table = pulse_table[:actual_frames]
+        super()._maybe_update_cell_and_pulse_tables(cell_table, pulse_table)
+
+    def postReconfigure(self):
+        super().postReconfigure()
+        self._parallel_gain = self.get("constantParameters.parallelGain")
diff --git a/src/calng/kernels/azimuthal.cu b/src/calng/kernels/azimuthal.cu
new file mode 100644
index 0000000000000000000000000000000000000000..a5e84fb03dc026c531ca3d4c6a0aa5e32d983980
--- /dev/null
+++ b/src/calng/kernels/azimuthal.cu
@@ -0,0 +1,106 @@
+template<typename T>
+class NeumaierSum {
+    // https://en.wikipedia.org/wiki/Kahan_summation_algorithm
+private:
+    T sum = 0;
+    T c = 0;
+public:
+    __device__ void add(const T val) {
+        const T t = sum + val;
+        if (fabs(sum) >= fabs(val)) {
+            c += (sum - t) + val;
+        } else {
+            c += (val - t) + sum;
+        }
+        sum = t;
+    }
+
+    __device__ T get() {
+        return sum + c;
+    }
+};
+
+template<typename image_t, typename csr_t>
+__global__ void integrate_1d_csr(
+		const int csr_nrows, // = #rings
+        const int csr_ncols, // = #pixels
+		const csr_t* csr_data, // fixed dtype for now
+        const int* csr_indices,
+        const int* csr_indptr,
+        const int n_frames,
+        const image_t* frames, // #frames x #pixels
+        image_t* output  // #frames x #rings
+    ) {
+    // this thread handles one element in output matrix
+    const int output_frame = blockDim.x * blockIdx.x + threadIdx.x;
+    const int output_ring = blockDim.y * blockIdx.y + threadIdx.y;
+    if (output_frame >= n_frames || output_ring >= csr_nrows) {
+        return;
+    }
+
+    NeumaierSum<image_t> value;
+    NeumaierSum<image_t> weight;
+
+    const int frame_start = output_frame * csr_ncols;
+    for (int i=csr_indptr[output_ring]; i<csr_indptr[output_ring + 1]; ++i) {
+        // read data
+        const image_t pixel_value = frames[frame_start + csr_indices[i]];
+        const float pixel_weight = csr_data[i];
+
+        if (isnan(pixel_value)) {
+            continue;
+        }
+
+        value.add(pixel_weight * pixel_value);
+        weight.add(pixel_weight);
+    }
+    const float final_weight = weight.get();
+	output[output_frame * csr_nrows + output_ring] =
+		final_weight > 0 ?
+		value.get() / final_weight :
+		nan("0");
+}
+
+/* This version does *not* divide by the weight included in a given ring. Instead, this
+   value is output along with the un-normalized weighted pixel total s.t. a downstream
+   device combining data from multiple modules can normalize properly i.e. sum all the
+   mass in the ring and the weight and divide the former by the latter. */
+template<typename image_t, typename csr_t>
+__global__ void integrate_1d_csr_multimodule(
+		const int csr_nrows, // = #rings (bins is this + 1)
+        const int csr_ncols, // = #pixels per frame
+		const float* csr_data, // fixed dtype for now
+        const int* csr_indices,
+		const int* csr_indptr,  // size csr_nrows + 1
+        const int n_frames,
+        const image_t* frames, // #frames x #pixels
+		image_t* output_total,  // #frames x #rings
+        image_t* output_weight  // #frames x #rings
+    ) {
+    // this thread handles one element in output matrix
+    const int output_frame = blockDim.x * blockIdx.x + threadIdx.x;
+    const int output_ring = blockDim.y * blockIdx.y + threadIdx.y;
+    if (output_frame >= n_frames || output_ring >= csr_nrows) {
+        return;
+    }
+
+    NeumaierSum<image_t> value;
+    NeumaierSum<image_t> weight;
+
+    const int frame_start = output_frame * csr_ncols;
+    for (int i=csr_indptr[output_ring]; i<csr_indptr[output_ring + 1]; ++i) {
+        // read data
+        const image_t pixel_value = frames[frame_start + csr_indices[i]];
+        const float pixel_weight = csr_data[i];
+
+        if (isnan(pixel_value)) {
+            continue;
+        }
+
+        value.add(pixel_weight * pixel_value);
+        weight.add(pixel_weight);
+    }
+    const float final_weight = weight.get();
+    output_total[output_frame * csr_nrows + output_ring] = final_weight > 0 ? value.get() : nan("0");
+    output_weight[output_frame * csr_nrows + output_ring] = final_weight > 0 ? final_weight : nan("0");
+}
diff --git a/src/calng/kernels/azimuthal.pyx b/src/calng/kernels/azimuthal.pyx
new file mode 100644
index 0000000000000000000000000000000000000000..70cf9e43cfe598554f1994c1582ca1b0046c7d66
--- /dev/null
+++ b/src/calng/kernels/azimuthal.pyx
@@ -0,0 +1,207 @@
+# distutils: language=c++
+# cython: boundscheck=False
+# cython: wraparound=False
+# cython: cdivision=True
+# cython: nonecheck=False
+cimport cython
+cimport numpy as np
+cimport openmp
+from cython.parallel cimport prange
+from libc.math cimport NAN, atan2, isfinite, sqrt
+from libcpp.algorithm cimport upper_bound
+from libcpp.iterator cimport distance
+from libcpp.vector cimport vector
+
+import numpy as np
+import scipy.sparse
+
+
+# TODO: get rid of this guy in favor of bare memoryviews
+cdef struct vec3_t:
+    double x
+    double y
+    double z
+
+
+cdef vec3_t scale_arr3(double[:] v, double s) noexcept nogil:
+    return vec3_t(s * v[0], s * v[1], s * v[2])
+
+
+cdef vec3_t lin_comb_vec3(vec3_t v, double s1, vec3_t v1, double s2, vec3_t v2) noexcept nogil:
+    return vec3_t(
+        v.x + s1 * v1.x + s2 * v2.x,
+        v.y + s1 * v1.y + s2 * v2.y,
+        v.z + s1 * v1.z + s2 * v2.z,
+    )
+
+
+cdef vec3_t lin_comb_arr_arr(double[:] v, double s1, double[:] v1, double s2, double[:] v2) noexcept nogil:
+    return vec3_t(
+        v[0] + s1 * v1[0] + s2 * v2[0],
+        v[1] + s1 * v1[1] + s2 * v2[1],
+        v[2] + s1 * v1[2] + s2 * v2[2],
+    )
+
+
+cdef double point_theta(vec3_t v) noexcept nogil:
+    return atan2(
+        sqrt(v.x * v.x + v.y * v.y),
+        v.z,
+    )
+
+
+ctypedef fused bin_t:
+    float
+    double
+
+
+cdef struct BinAndWeight:
+    Py_ssize_t bin_index
+    float weight
+
+
+cdef vector[BinAndWeight] bin_pixel_subsample(
+    bin_t[:] bins,
+    vec3_t pixel_corner,
+    vec3_t sub_step_ss,
+    vec3_t sub_step_fs,
+    Py_ssize_t n_split,
+) noexcept nogil:
+    cdef Py_ssize_t sub_index_ss, sub_index_fs  # indices for subsample steps
+    cdef vec3_t sample_point  # subpixel point we're placing in a bin
+    cdef float sample_point_theta
+    cdef Py_ssize_t bin_index_found
+    # taking half a step into pixel so we don't bias towards "lower left" corner
+    cdef vec3_t start_point = lin_comb_vec3(pixel_corner, 0.5, sub_step_ss, 0.5, sub_step_fs)
+    cdef vector[Py_ssize_t] bin_counts  # "custom bin counter"
+    cdef vector[BinAndWeight] res
+    # given n bins, output "bin indices" will be from 0 to n-2
+    # a sample with theta contributes to output "bin index" i if b[i] <= theta b[i+1]
+    # hence the upper_bound and - 1 later
+    # sample points outside range are ignored
+    bin_counts.resize(bins.shape[0] - 1)  # full of zeros initially
+    # and more loops in order to subsample...
+    for sub_index_ss in range(n_split):
+        for sub_index_fs in range(n_split):
+            sample_point = lin_comb_vec3(
+                start_point,
+                <double>sub_index_ss, sub_step_ss,
+                <double>sub_index_fs, sub_step_fs
+            )
+            sample_point_theta = point_theta(sample_point)
+            if sample_point_theta < bins[0] or sample_point_theta >= bins[bins.shape[0]-1]:
+                continue
+            bin_index_found = distance(
+                &bins[0], upper_bound(&bins[0], &bins[0] + bins.shape[0], sample_point_theta)
+            )
+            # note: with previous check distance to upper_bound should be in [1, #bins-2]
+            bin_counts[bin_index_found - 1] += 1
+    cdef Py_ssize_t elem = 0
+    for count in bin_counts:
+        if count > 0:
+            res.push_back(BinAndWeight(elem, <float>count / <float>(n_split * n_split)))
+        elem += 1
+    return res
+
+
+def generate_coo(
+    bin_t[:] bins,
+    double[:, ::1] corners,
+    double[:, ::1] ss_vecs,
+    double[:, ::1] fs_vecs,
+    Py_ssize_t pixels_per_tile_ss,
+    Py_ssize_t pixels_per_tile_fs,
+    Py_ssize_t n_split,
+):
+    cdef Py_ssize_t num_pixels = corners.shape[0] * pixels_per_tile_ss * pixels_per_tile_fs
+    cdef vector[Py_ssize_t] row_vec, col_vec
+    cdef vector[float] data_vec
+    cdef openmp.omp_lock_t lock
+    openmp.omp_init_lock(&lock)
+    # could maybe come up with better estimate, but having to allocate more later is not
+    # the end of the world
+    row_vec.reserve(num_pixels * 2)
+    col_vec.reserve(num_pixels * 2)
+    data_vec.reserve(num_pixels * 2)
+
+    cdef Py_ssize_t tile_index, pixel_ss, pixel_fs, pixel_index
+    cdef vec3_t sub_step_ss, sub_step_fs # (ss_vec and fs_vec) / n_split
+    # these two just for iterating over bin_pixel_subsample results
+    cdef BinAndWeight bin_and_weight
+    cdef vector[BinAndWeight] subsample_res
+
+    for tile_index in prange(corners.shape[0], nogil=True):
+        sub_step_ss = scale_arr3(ss_vecs[tile_index], 1.0 / <float>n_split)
+        sub_step_fs = scale_arr3(fs_vecs[tile_index], 1.0 / <float>n_split)
+        for pixel_ss in prange(pixels_per_tile_ss):
+            for pixel_fs in range(pixels_per_tile_fs):
+                pixel_index = tile_index * (pixels_per_tile_ss * pixels_per_tile_fs) + pixel_ss * pixels_per_tile_fs + pixel_fs
+                subsample_res = bin_pixel_subsample(
+                    bins,
+                    lin_comb_arr_arr(
+                        corners[tile_index],
+                        <float>pixel_ss, ss_vecs[tile_index],
+                        <float>pixel_fs, fs_vecs[tile_index],
+                    ), # corner of this pixel
+                    sub_step_ss,
+                    sub_step_fs,
+                    n_split,
+                )
+                openmp.omp_set_lock(&lock)
+                for bin_and_weight in subsample_res:
+                    row_vec.push_back(bin_and_weight.bin_index)
+                    col_vec.push_back(pixel_index)
+                    data_vec.push_back(bin_and_weight.weight)
+                openmp.omp_unset_lock(&lock)
+    # TODO: contemplate ownership; is this safe and copy-free?
+    return np.asarray(data_vec), np.asarray(row_vec), np.asarray(col_vec)
+
+
+def integrate_1d_csr(
+    int[:] csr_indices,
+    int[:] csr_indptr,
+    float[:] csr_data,
+    float[:, ::1] frames,
+    float[:, ::1] output,
+):
+    cdef Py_ssize_t frame_index, ring_index, data_index
+    cdef float pixel_acc, weight_acc, weight_val, pixel_val
+
+    for frame_index in range(frames.shape[0]):
+        for ring_index in range(csr_indptr.shape[0] - 1):
+            weight_acc = 0
+            pixel_acc = 0
+            for data_index in range(csr_indptr[ring_index], csr_indptr[ring_index+1]):
+                pixel_val = frames[frame_index, csr_indices[data_index]]
+                if not isfinite(pixel_val):
+                    continue
+                weight_val = csr_data[data_index]
+                weight_acc = weight_acc + weight_val
+                pixel_acc = pixel_acc + weight_val * pixel_val
+            output[frame_index, ring_index] = pixel_acc / weight_acc if weight_acc > 0 else NAN
+
+
+def integrate_1d_csr_multimodule(
+    int[:] csr_indices,
+    int[:] csr_indptr,
+    float[:] csr_data,
+    float[:, ::1] frames,
+    float[:, ::1] output_total,
+    float[:, ::1] output_weight,
+):
+    cdef Py_ssize_t frame_index, ring_index, data_index
+    cdef float pixel_acc, weight_acc, weight_val, pixel_val
+
+    for frame_index in range(frames.shape[0]):
+        for ring_index in range(csr_indptr.shape[0] - 1):
+            weight_acc = 0
+            pixel_acc = 0
+            for data_index in range(csr_indptr[ring_index], csr_indptr[ring_index+1]):
+                pixel_val = frames[frame_index, csr_indices[data_index]]
+                if not isfinite(pixel_val):
+                    continue
+                weight_val = csr_data[data_index]
+                weight_acc = weight_acc + weight_val
+                pixel_acc = pixel_acc + weight_val * pixel_val
+            output_total[frame_index, ring_index] = pixel_acc if weight_acc > 0 else NAN
+            output_weight[frame_index, ring_index] = weight_acc if weight_acc > 0 else NAN
diff --git a/src/calng/kernels/common_cpu.pyx b/src/calng/kernels/common_cpu.pyx
index 893015f2471d402fc730527eaf64a55a58ad7dc2..ec0640d2bc37fef70377e56923eb919332ef59f6 100644
--- a/src/calng/kernels/common_cpu.pyx
+++ b/src/calng/kernels/common_cpu.pyx
@@ -1,15 +1,13 @@
+# distutils: language=c++
 # cython: boundscheck=False
 # cython: cdivision=True
 # cython: wrapararound=False
 
 from cython.parallel import prange
+from libcpp.algorithm cimport nth_element
 import numpy as np
 
 
-cdef extern from "<algorithm>" namespace "std" nogil:
-    void nth_element[Iter](Iter first, Iter nth, Iter last) except +
-
-
 def cm_fs(
     float[:, :] data,
     unsigned char[:, :] mask,
diff --git a/src/calng/kernels/lpd_gpu.cu b/src/calng/kernels/lpd_gpu.cu
index 7e935a38963af4c27038e7f40ee75ca9e92dcfa8..482c68bcca7422506b030df276bd2e7cc5d45001 100644
--- a/src/calng/kernels/lpd_gpu.cu
+++ b/src/calng/kernels/lpd_gpu.cu
@@ -3,8 +3,7 @@
 {{corr_enum}}
 
 extern "C" {
-	__global__ void correct(const unsigned short* data, // shape: memory cell, 1, ss, fs
-	                        const unsigned short* cell_table,
+	__global__ void correct(const unsigned short* cell_table,
 	                        const unsigned char corr_flags,
 	                        const unsigned short input_frames,
 	                        const unsigned short map_memory_cells,
@@ -14,6 +13,8 @@ extern "C" {
 	                        const float* flatfield_map,
 	                        const unsigned int* bad_pixel_map,
 	                        const float bad_pixel_mask_value,
+	                        // note: will overwrite data with original value stripped of gain bits
+	                        unsigned short* data, // shape: memory cell, 1, ss, fs
 	                        float* gain_map, // similar to preview for AGIPD
 	                        {{output_data_dtype}}* output) {
 		const size_t fs_dim = {{fs_dim}};
@@ -33,7 +34,8 @@ extern "C" {
 		const size_t data_index = frame * data_stride_frame + ss * data_stride_ss + fs * data_stride_fs;
 		const unsigned short raw_data_value = data[data_index];
 		const unsigned char gain = (raw_data_value >> 12) & 0x0003;
-		float corrected = (float)(raw_data_value & 0x0fff);
+		const unsigned short raw_data_no_gain = raw_data_value & 0x0fff;
+		float corrected = (float)(raw_data_no_gain);
 		float gain_for_preview = (float)gain;
 
 		const size_t gm_map_stride_gain = 1;
@@ -78,10 +80,76 @@ extern "C" {
 		}
 
 		gain_map[data_index] = gain_for_preview;
+		data[data_index] = raw_data_no_gain;
 		{% if output_data_dtype == "half" %}
 		output[data_index] = __float2half(corrected);
 		{% else %}
 		output[data_index] = ({{output_data_dtype}})corrected;
 		{% endif %}
 	}
+
+	__global__ void parallel_gain_threshold(const unsigned short output_frames,  // one third of input frames
+	                                        const unsigned short* cell_table,
+	                                        const unsigned short map_memory_cells,
+	                                        const float threshold_sigma_high_med,
+	                                        const float threshold_sigma_med_low,
+											const float* offset_map, // shape: cell, ss, fs, gain
+											const float* noise_map, // shape: cell, ss, fs, gain
+	                                        // data is input *and* output
+	                                        // shape: memory cell, 1, ss, fs
+	                                        unsigned short* data) {
+		const size_t fs_dim = {{fs_dim}};
+		const size_t ss_dim = {{ss_dim}};
+
+		const size_t frame = blockIdx.x * blockDim.x + threadIdx.x;
+		const size_t ss = blockIdx.y * blockDim.y + threadIdx.y;
+		const size_t fs = blockIdx.z * blockDim.z + threadIdx.z;
+
+		if (frame >= output_frames || ss >= ss_dim || fs >= fs_dim) {
+			return;
+		}
+
+		const size_t map_cell = cell_table[frame];
+		if (map_cell >= map_memory_cells) {
+			return;
+		}
+
+		const size_t data_stride_fs = 1;
+		const size_t data_stride_ss = fs_dim * data_stride_fs;
+		const size_t data_stride_frame = ss_dim * data_stride_ss;
+		const size_t data_stride_gain = output_frames * data_stride_frame;
+
+		const size_t data_index_hg = frame * data_stride_frame + ss * data_stride_ss + fs * data_stride_fs;
+		const size_t data_index_mg = data_index_hg + data_stride_gain;
+		const size_t data_index_lg = data_index_mg + data_stride_gain;
+
+		const size_t gm_map_stride_gain = 1;
+		const size_t gm_map_stride_fs = 3 * gm_map_stride_gain;
+		const size_t gm_map_stride_ss = fs_dim * gm_map_stride_fs;
+		const size_t gm_map_stride_cell = ss_dim * gm_map_stride_ss;
+
+		const size_t gm_map_index_hg = map_cell * gm_map_stride_cell + ss * gm_map_stride_ss + fs * gm_map_stride_fs;
+		const size_t gm_map_index_mg = gm_map_index_hg + gm_map_stride_gain;
+		//const size_t gm_map_index_lg = gm_map_index_mg + gm_map_stride_gain; not used
+
+		unsigned short data_value = data[data_index_hg];
+		// stripping gain bit(s) for comparisons
+		float data_value_float = static_cast<float>(data_value & 0x0fff);
+
+		// compared to notebook: this is before offset is subtracted
+		const float threshold_high_med = 4096 - (threshold_sigma_high_med * noise_map[gm_map_index_hg]);
+		// compared to notebook: without gain bit, we are 4096 lower
+		const float threshold_med_low = 4096 - (threshold_sigma_med_low * noise_map[gm_map_index_mg]);
+
+		// TODO: handle NaNs
+		if (data_value_float > threshold_high_med) {
+			data_value = data[data_index_mg];
+			data_value_float = static_cast<float>(data_value & 0x0fff);
+
+			if (data_value_float > threshold_med_low) {
+				data_value = data[data_index_lg];
+			}
+		}
+		data[data_index_hg] = data_value;
+	}
 }
diff --git a/src/calng/preview_utils.py b/src/calng/preview_utils.py
index 4b82d15e14d7886920d825e166b1afb3cf2431da..cec21defeceddebee15e60ada201f441be3d3c8b 100644
--- a/src/calng/preview_utils.py
+++ b/src/calng/preview_utils.py
@@ -187,6 +187,7 @@ class PreviewFriend:
                 .defaultValue("frame")
                 .reconfigurable()
                 .commit(),
+
                 INT32_ELEMENT(schema)
                 .key(f"{node_path}.index")
                 .tags("managed")
@@ -247,6 +248,7 @@ class PreviewFriend:
             if spec.frame_reduction_fun is not None:
                 data = spec.frame_reduction_fun(data, cell_table, pulse_table, warn_fun)
             if spec.downsampling_factor > 1:
+                data = maybe_get(data)
                 if spec.dimensions == 1:
                     data = downsample_1d(
                         data,
diff --git a/src/calng/scenes.py b/src/calng/scenes.py
index 30b294204d502a0b5f36e8cc715bf0ce3ecfaeaa..6449c75b7315380fa676222957ffcf936170ea07 100644
--- a/src/calng/scenes.py
+++ b/src/calng/scenes.py
@@ -738,7 +738,6 @@ class PreviewDisplayArea(VerticalLayout):
         attrs = attrs_to_dict(
             schema_hash.getAttributes(f"{channel_name}.schema.{image_key_name}")
         )
-        print(attrs)
         if attrs.get("classId") == "ImageData":
             model = DetectorGraphModel
             extra = dict(colormap="viridis")
@@ -996,18 +995,23 @@ def manager_device_overview(
     mds_hash = schema_to_hash(manager_device_schema)
     cds_hash = schema_to_hash(correction_device_schema)
 
-    config_column = [
-        recursive_editable(
-            manager_device_id,
-            mds_hash,
-            "managedKeys.constantParameters",
-        ),
-        DisplayCommandModel(
-            keys=[f"{manager_device_id}.managedKeys.loadMostRecentConstants"],
-            width=10 * BASE_INC,
-            height=BASE_INC,
-        ),
-    ]
+    config_column = []
+
+    if "managedKeys.constantParameters" in mds_hash:
+        config_column.extend(
+            [
+                recursive_editable(
+                    manager_device_id,
+                    mds_hash,
+                    "managedKeys.constantParameters",
+                ),
+                DisplayCommandModel(
+                    keys=[f"{manager_device_id}.managedKeys.loadMostRecentConstants"],
+                    width=10 * BASE_INC,
+                    height=BASE_INC,
+                ),
+            ]
+        )
 
     if "managedKeys.preview" in mds_hash:
         config_column.append(
@@ -1048,17 +1052,23 @@ def manager_device_overview(
         titled("Data throttling")(boxed(VerticalLayout))(*throttling),
     )
 
-    return VerticalLayout(
-        HorizontalLayout(
-            ManagerDeviceStatus(manager_device_id),
-            VerticalLayout(*config_column),
+    top_row = [
+        ManagerDeviceStatus(manager_device_id),
+        VerticalLayout(*config_column),
+    ]
+
+    if "managedKeys.corrections" in mds_hash:
+        top_row.append(
             recursive_editable(
                 manager_device_id,
                 mds_hash,
                 "managedKeys.corrections",
                 max_depth=2,
             ),
-        ),
+        )
+
+    return VerticalLayout(
+        HorizontalLayout(*top_row),
         HorizontalLayout(
             titled("DAQs", width=8 * NARROW_INC)(boxed(VerticalLayout))(
                 children=[Space(width=BASE_INC, height=BASE_INC)] * 2
@@ -1219,6 +1229,31 @@ def detector_assembler_overview(device_id, schema, geometry_device_id):
     )
 
 
+@scene_generator
+def azimuthal_combiner_overview(device_id, schema):
+    schema_hash = schema_to_hash(schema)
+    return VerticalLayout(
+        AssemblerDeviceStatus(device_id),
+        VectorXYGraphModel(
+            keys=[
+                f"{device_id}.preview.hist.output.schema.x",
+                f"{device_id}.preview.hist.output.schema.y",
+            ],
+            height=16 * BASE_INC,
+            width=52 * BASE_INC,
+        ),
+        DetectorGraphModel(
+            keys=[f"{device_id}.preview.streak.output.schema.image"],
+            width=60 * BASE_INC,
+            height=40 * BASE_INC,
+            x_label="theta",
+            x_units="bin index",
+            y_label="frame",
+            aspect_ratio=0,
+        ),
+    )
+
+
 @scene_generator
 def gotthard2_assembler_overview(device_id, schema):
     schema_hash = schema_to_hash(schema)
diff --git a/src/calng/utils.py b/src/calng/utils.py
index 64da3d3dc1325bb1c531f2a9984b9ba6c73e5860..5dbb8c77cf3bf9f8888614dab02628e392847071 100644
--- a/src/calng/utils.py
+++ b/src/calng/utils.py
@@ -128,7 +128,7 @@ _np_typechar_to_c_typestring = {
 
 
 def np_dtype_to_c_type(dtype):
-    as_char = np.sctype2char(dtype)
+    as_char = np.dtype(dtype).char
     return _np_typechar_to_c_typestring[as_char]
 
 
@@ -292,6 +292,7 @@ class OptionsEnum(enum.Enum):
     be a magic number used in CalCat, the name refers to "the actual" detector operation
     mode, and format_helpful / parse_helpful_format are tricks employed such that both
     are exposed to the operator. Life is full of compromise."""
+
     def format_helpful(self):
         return f"{self.value}: {self.name}"
 
@@ -305,3 +306,25 @@ class OptionsEnum(enum.Enum):
     @classmethod
     def list_options(cls):
         return [member.format_helpful() for member in cls]
+
+
+def dict_to_hash(d):
+    from karabo.bound import Hash
+
+    res = Hash()
+    for k, v in d.items():
+        if isinstance(v, dict):
+            v = dict_to_hash(v)
+        elif isinstance(v, list):
+            v = list(map(dict_to_hash, v))
+        res[k] = v
+    return res
+
+
+def kwhash(**kwargs):
+    from karabo.bound import Hash
+
+    res = Hash()
+    for k, v in kwargs.items():
+        res[k] = v
+    return res
diff --git a/tests/common_setup.py b/tests/common_setup.py
index c3d20b6d9bbc450462095441e63e2fbf6ceefa3d..fbb8858501e8aef4dd8934ec7d3e7f3f9c040e5c 100644
--- a/tests/common_setup.py
+++ b/tests/common_setup.py
@@ -1,6 +1,5 @@
 import contextlib
 import pathlib
-import queue
 import threading
 
 import h5py
@@ -101,13 +100,6 @@ def get_constant_from_file(path_to_file, file_name, data_set_name):
         return np.array(fd[f"{data_set_name}/data"])
 
 
-def kwhash(**kwargs):
-    res = Hash()
-    for k, v in kwargs.items():
-        res[k] = v
-    return res
-
-
 class OverrideBase:
     overrides = tuple()
 
@@ -131,14 +123,3 @@ class OverridePythonDevice(OverrideBase):
     def writeChannel(self, output_name, out_hash, timestamp=None):
         self._last_written_channel = output_name
         self._last_written_data = out_hash
-
-
-def dict_to_hash(d):
-    res = Hash()
-    for k, v in d.items():
-        if isinstance(v, dict):
-            v = dict_to_hash(v)
-        elif isinstance(v, list):
-            v = list(map(dict_to_hash, v))
-        res[k] = v
-    return res
diff --git a/tests/test_azimuthal.py b/tests/test_azimuthal.py
new file mode 100644
index 0000000000000000000000000000000000000000..11f25226934192b1c9e81fd77534dd8147befc2f
--- /dev/null
+++ b/tests/test_azimuthal.py
@@ -0,0 +1,94 @@
+import extra_geom
+import itertools
+import numpy as np
+
+from calng import azim_utils
+from calng.utils import maybe_get
+
+
+det_dist = 0.23
+num_bins = 1000
+num_frames = 352
+geom = extra_geom.AGIPD_1MGeometry.example()
+bins = np.linspace(0, azim_utils.get_geom_max_theta(geom, det_dist), 1000)
+bin_midpoints = (bins[1:] + bins[:-1]) / 2
+
+
+def angle_to_intensity(angle, dtype):
+    return (
+        np.exp(-((angle.min() - angle * 10) ** 2))
+        + np.exp(-np.abs(np.quantile(angle, 0.6) - angle) ** 1.5 * 1000) * 10
+    ).astype(dtype, copy=False)
+
+
+def generate_frame(geom, dtype):
+    ppos = geom.get_pixel_positions().reshape(-1, 3).astype(dtype, copy=False)
+    ppos[..., 2] += det_dist
+    angle = np.arctan2(
+        np.linalg.norm(ppos[..., :2], axis=-1),
+        ppos[..., 2],
+    )
+    return angle_to_intensity(angle, dtype)
+
+
+for ai_class, scalar_dtype in itertools.product(
+    (azim_utils.CpuAzimuthalIntegrator,),  # azim_utils.GpuAzimuthalIntegrator),
+    (np.float32,),  # np.float64),
+):
+    print(f"Test round: {ai_class.__name__} with {scalar_dtype} data")
+    frame = generate_frame(geom, scalar_dtype)
+
+    ai = ai_class(geom, bins, det_dist, 1)
+
+    frame = ai._xp.asarray(frame)
+
+    # input with no frame axis -> output with no frame axis
+    output = ai.integrate_1d(frame)
+
+    assert output.shape == (bins.size - 1,)
+    assert output.dtype == frame.dtype
+
+    output = ai.integrate_1d(frame.reshape(geom.expected_data_shape))
+
+    assert output.shape == (bins.size - 1,)
+    assert output.dtype == frame.dtype
+
+    # input with frame axis -> output with frame axis
+    output = ai.integrate_1d(frame.reshape(1, frame.size))
+
+    assert output.shape == (
+        1,
+        bins.size - 1,
+    )
+    assert output.dtype == frame.dtype
+
+    output = ai.integrate_1d(frame.reshape(1, *geom.expected_data_shape))
+
+    assert output.shape == (
+        1,
+        bins.size - 1,
+    )
+    assert output.dtype == frame.dtype
+
+    del frame
+
+    frames = ai._xp.random.random_sample(
+        (num_frames,) + geom.expected_data_shape
+    ).astype(scalar_dtype, copy=False)
+
+    # input with frame axis -> output with frame axis
+    output = ai.integrate_1d(frames.reshape(num_frames, -1))
+
+    assert output.shape == (
+        num_frames,
+        bins.size - 1,
+    )
+    assert output.dtype == frames.dtype
+
+    output = ai.integrate_1d(frames.reshape(num_frames, *geom.expected_data_shape))
+
+    assert output.shape == (
+        num_frames,
+        bins.size - 1,
+    )
+    assert output.dtype == frames.dtype
diff --git a/tests/test_frameselection.py b/tests/test_frameselection.py
index 2399de2ad54bd5b65df32a1d79184dc84b4eb01b..dcb6922a18644e88e15e6c96f482f3882443aeb0 100644
--- a/tests/test_frameselection.py
+++ b/tests/test_frameselection.py
@@ -1,7 +1,8 @@
 import pytest
 
 import numpy as np
-from common_setup import OverridePythonDevice, dict_to_hash, kwhash
+from calng.utils import dict_to_hash, kwhash
+from common_setup import OverridePythonDevice
 from karabo.bound import Configurator, PythonDevice
 
 from calng.FrameSelectionArbiter import (
diff --git a/tests/test_lpd_kernels.py b/tests/test_lpd_kernels.py
index cf51fb209a62d32bd7929c59af30464f7e2a8d2c..952fb4b09bdde47f3bff54175ce23f7913ebca6f 100644
--- a/tests/test_lpd_kernels.py
+++ b/tests/test_lpd_kernels.py
@@ -1,3 +1,4 @@
+import cupy as cp
 import numpy as np
 import pytest
 from calng.corrections.LpdCorrection import (
@@ -7,8 +8,21 @@ from calng.corrections.LpdCorrection import (
     LpdCpuRunner,
     LpdGpuRunner,
 )
+from karabo.bound import Hash
 
-from common_setup import DummyCorrectionDevice, maybe_get
+from common_setup import DummyCorrectionDevice
+
+
+def assert_allclose(a, b):
+    if isinstance(a, cp.ndarray):
+        if isinstance(b, cp.ndarray):
+            return cp.testing.assert_allclose(a, b)
+        else:
+            a = a.get()
+            return np.testing.assert_allclose(a, b)
+    if isinstance(b, cp.ndarray):
+        b = b.get()
+    return np.testing.assert_allclose(a, b)
 
 
 @pytest.fixture(params=[LpdCpuRunner, LpdGpuRunner])
@@ -29,70 +43,161 @@ def gpu_runner():
     return LpdGpuRunner(device)
 
 
-# generate some "raw data"
-raw_data_shape = (100, 1, 256, 256)
-# the image data part
-raw_image = (np.arange(np.product(raw_data_shape)).reshape(raw_data_shape)).astype(
-    np.uint16
-)
-# keeping in mind 12-bit ADC
-raw_image = np.bitwise_and(raw_image, 0x0FFF)
-raw_image_32 = raw_image.astype(np.float32)
-# the gain values (TODO: test with some being illegal)
-gain_data = (np.random.random(raw_data_shape) * 3).astype(np.uint16)
-assert np.array_equal(gain_data, np.bitwise_and(gain_data, 0x0003))
-gain_data_32 = gain_data.astype(np.float32)
-raw_data = np.bitwise_or(raw_image, np.left_shift(gain_data, 12))
-# just checking that we constructed something reasonable
-assert np.array_equal(gain_data, np.bitwise_and(np.right_shift(raw_data, 12), 0x0003))
-assert np.array_equal(raw_image, np.bitwise_and(raw_data, 0x0FFF))
-
-# generate some defects
-wrong_gain_value_indices = (np.random.random(raw_data_shape) < 0.005).astype(np.bool_)
-raw_data_with_some_wrong_gain = raw_data.copy()
-raw_data_with_some_wrong_gain[wrong_gain_value_indices] = np.bitwise_or(
-    raw_data_with_some_wrong_gain[wrong_gain_value_indices], 0x3000
-)
-
-# generate some constants
-dark_constant_shape = (256, 256, 512, 3)
-# okay, as slow and fast scan have same size, can't tell the difference here...
-funky_constant_shape = (256, 256, 512, 3)
-offset_constant = np.random.random(dark_constant_shape).astype(np.float32)
-gain_amp_constant = np.random.random(funky_constant_shape).astype(np.float32)
-bp_dark_constant = (np.random.random(dark_constant_shape) < 0.01).astype(np.uint32)
-
-cell_table = np.linspace(0, 512, 100).astype(np.uint16)
-np.random.shuffle(cell_table)
-
-
-def test_only_cast(kernel_runner):
-    _, processed, previews = kernel_runner.correct(raw_data, cell_table)
-    assert np.allclose(maybe_get(processed), raw_image_32[:, 0])
-    # TODO: make raw preview show image data (remove gain bits)
-    assert np.allclose(previews[0], raw_data[:, 0])  # raw: raw
-    assert np.allclose(previews[1], raw_image_32[:, 0])  # processed: raw
-    assert np.allclose(previews[2], gain_data_32[:, 0])  # gain: unchanged
-
-
-def test_only_cast_with_some_wrong_gain(kernel_runner):
-    (
-        _,
-        processed,
-        _,
-    ) = kernel_runner.correct(raw_data_with_some_wrong_gain, cell_table)
-    assert np.all(np.isnan(processed[wrong_gain_value_indices[:, 0]]))
-
-
-def test_correct(cpu_runner, gpu_runner):
-    # naive numpy version way too slow, just compare the two runners
-    # the CPU one uses kernel almost identical to pycalibration
-    cpu_runner.load_constant(Constants.Offset, offset_constant)
-    cpu_runner.load_constant(Constants.GainAmpMap, gain_amp_constant)
-    cpu_runner.load_constant(Constants.BadPixelsDark, bp_dark_constant)
-    gpu_runner.load_constant(Constants.Offset, offset_constant)
-    gpu_runner.load_constant(Constants.GainAmpMap, gain_amp_constant)
-    gpu_runner.load_constant(Constants.BadPixelsDark, bp_dark_constant)
-    _, processed_cpu, _ = cpu_runner.correct(raw_data_with_some_wrong_gain, cell_table)
-    _, processed_gpu, _ = gpu_runner.correct(raw_data_with_some_wrong_gain, cell_table)
-    assert np.allclose(processed_cpu, processed_gpu.get(), equal_nan=True)
+class TestParallelGain:
+    @classmethod
+    def setup_class(cls):
+        nframes = 12
+        cells = cp.arange(nframes * 3, dtype=np.uint16)
+        cells[nframes:] = 1234  # only first third / repetition is reliable
+        cls.cells = cells
+        shape = (nframes, 256, 256)
+        base_data = (
+            (cp.arange(np.prod(shape)) * 1000).astype(cp.uint16).reshape(shape)
+        )
+        cls.raw_image = cp.concatenate(
+            [
+                base_data,  # HG
+                (base_data + cp.random.random(base_data.shape) * 20).astype(
+                    cp.int16
+                ),  # MG
+                (base_data + cp.random.random(base_data.shape) * 20).astype(
+                    cp.int16
+                ),  # LG
+            ]
+        ).astype(np.uint16)
+        cls.raw_image %= 2**12
+        gain_data = np.zeros_like(cls.raw_image)
+        gain_data[nframes : nframes * 2] = 1
+        gain_data[nframes * 2 :] = 2
+        cls.raw_data = cls.raw_image.copy()
+        cls.raw_data |= gain_data
+
+    def test_threshold(self, gpu_runner):
+        t1 = 1000
+        t2 = 2001
+        gpu_runner.reconfigure(
+            Hash(
+                "constantParameters.parallelGain",
+                True,
+                "corrections.parallelGain.thresholdHighMedium",
+                t1,
+                "corrections.parallelGain.thresholdMediumLow",
+                t2,
+            )
+        )
+        corrections, processed, (_, preview_corrected, preview_gain) = (
+            gpu_runner.correct(self.raw_data, self.cells)
+        )
+        nframes = self.cells.size // 3
+
+        # now we do thresholding ourselves
+        # start by assuming high gain
+        tmp = self.raw_image.astype(np.float32)
+        gain_map = cp.zeros((nframes,) + self.raw_image.shape[1:], dtype=np.uint8)
+        # then bump to medium gain if above threshold
+        gain_map[tmp[:nframes] > np.float32(t1)] = 1
+        # now, some pixels are high and some medium
+        # gain_map[(gain_map == 0) & (tmp[:nframes] > t2)] = 2
+        # finally, bump pixels from medium to low gain if above t2
+        gain_map[(gain_map == 1) & (tmp[nframes : nframes * 2] > np.float32(t2))] = 2
+
+        # creating masks for making assertions
+        high_gain_mask = gain_map == 0
+        med_gain_mask = gain_map == 1
+        low_gain_mask = gain_map == 2
+
+        # sanity check: some pixels should go in each gain stage
+        assert high_gain_mask.sum() > 0
+        assert med_gain_mask.sum() > 0
+        assert low_gain_mask.sum() > 0
+
+        assert_allclose(preview_gain[high_gain_mask], 0)
+        assert_allclose(preview_gain[med_gain_mask], 1)
+        assert_allclose(preview_gain[low_gain_mask], 2)
+
+        assert_allclose(processed[high_gain_mask], tmp[:nframes][high_gain_mask])
+        assert_allclose(processed[high_gain_mask], tmp[:nframes][high_gain_mask])
+        assert_allclose(processed[high_gain_mask], tmp[:nframes][high_gain_mask])
+
+
+class TestAdaptiveGain:
+    @classmethod
+    def setup_class(cls):
+        # generate some "raw data"
+        raw_data_shape = (100, 1, 256, 256)
+        # the image data part
+        raw_image = (
+            np.arange(np.prod(raw_data_shape)).reshape(raw_data_shape)
+        ).astype(np.uint16)
+        # keeping in mind 12-bit ADC
+        raw_image = np.bitwise_and(raw_image, 0x0FFF)
+        cls.raw_image_32 = raw_image.astype(np.float32)
+        # the gain values (TODO: test with some being illegal)
+        gain_data = (np.random.random(raw_data_shape) * 3).astype(np.uint16)
+        assert np.array_equal(gain_data, np.bitwise_and(gain_data, 0x0003))
+        cls.gain_data_32 = gain_data.astype(np.float32)
+        cls.raw_data = np.bitwise_or(raw_image, np.left_shift(gain_data, 12))
+        # just checking that we constructed something reasonable
+        assert np.array_equal(
+            gain_data, np.bitwise_and(np.right_shift(cls.raw_data, 12), 0x0003)
+        )
+        assert np.array_equal(raw_image, np.bitwise_and(cls.raw_data, 0x0FFF))
+
+        # generate some defects
+        cls.wrong_gain_value_indices = (
+            np.random.random(raw_data_shape) < 0.005
+        ).astype(np.bool_)
+        raw_data_with_some_wrong_gain = cls.raw_data.copy()
+        raw_data_with_some_wrong_gain[cls.wrong_gain_value_indices] = np.bitwise_or(
+            raw_data_with_some_wrong_gain[cls.wrong_gain_value_indices], 0x3000
+        )
+        cls.raw_data_with_some_wrong_gain = raw_data_with_some_wrong_gain
+
+        # generate some constants
+        dark_constant_shape = (256, 256, 512, 3)
+        # okay, as slow and fast scan have same size, can't tell the difference here...
+        funky_constant_shape = (256, 256, 512, 3)
+        cls.offset_constant = np.random.random(dark_constant_shape).astype(np.float32)
+        cls.gain_amp_constant = np.random.random(funky_constant_shape).astype(
+            np.float32
+        )
+        cls.bp_dark_constant = (np.random.random(dark_constant_shape) < 0.01).astype(
+            np.uint32
+        )
+
+        cell_table = np.linspace(0, 512, 100).astype(np.uint16)
+        np.random.shuffle(cell_table)
+        cls.cell_table = cell_table
+
+    def test_only_cast(self, kernel_runner):
+        _, processed, previews = kernel_runner.correct(self.raw_data, self.cell_table)
+        assert_allclose(processed, self.raw_image_32[:, 0])
+        # TODO: make raw preview show image data (remove gain bits)
+        assert_allclose(previews[0], self.raw_data[:, 0])  # raw: raw
+        assert_allclose(previews[1], self.raw_image_32[:, 0])  # processed: raw
+        assert_allclose(previews[2], self.gain_data_32[:, 0])  # gain: unchanged
+
+    def test_only_cast_with_some_wrong_gain(self, kernel_runner):
+        (
+            _,
+            processed,
+            _,
+        ) = kernel_runner.correct(self.raw_data_with_some_wrong_gain, self.cell_table)
+        assert np.all(np.isnan(processed[self.wrong_gain_value_indices[:, 0]]))
+
+    def test_correct(self, cpu_runner, gpu_runner):
+        # naive numpy version way too slow, just compare the two runners
+        # the CPU one uses kernel almost identical to pycalibration
+        cpu_runner.load_constant(Constants.Offset, self.offset_constant)
+        cpu_runner.load_constant(Constants.GainAmpMap, self.gain_amp_constant)
+        cpu_runner.load_constant(Constants.BadPixelsDark, self.bp_dark_constant)
+        gpu_runner.load_constant(Constants.Offset, self.offset_constant)
+        gpu_runner.load_constant(Constants.GainAmpMap, self.gain_amp_constant)
+        gpu_runner.load_constant(Constants.BadPixelsDark, self.bp_dark_constant)
+        _, processed_cpu, _ = cpu_runner.correct(
+            self.raw_data_with_some_wrong_gain, self.cell_table
+        )
+        _, processed_gpu, _ = gpu_runner.correct(
+            self.raw_data_with_some_wrong_gain, self.cell_table
+        )
+        assert np.allclose(processed_cpu, processed_gpu.get(), equal_nan=True)