Skip to content
Snippets Groups Projects
Commit 98965fa2 authored by Egor Sobolev's avatar Egor Sobolev
Browse files

Move ImageAgg into separate module (file)

parent cffed485
Branches master
Tags 0.0.4
1 merge request!3Center refinement using powder diffraction data
import multiprocessing as mp
import time
import h5py
import numpy as np
from . import shmemarray as shmem
class ImageAgg:
def __init__(self, nproc, detector_id, shape, modules,
adu_per_unit=1, rounding_threshold=None, px_area=None):
self.adu_per_unit = adu_per_unit
self.modules = modules
self.num_modules = len(modules)
self.detector_id = detector_id
if rounding_threshold is None:
self.rounding = False
self.rounding_shift = 0.0
else:
self.rounding = True
self.rounding_shift = rounding_threshold - 0.5
self.shape = shape
self.nproc = nproc
arr_shape = (nproc,) + tuple(shape)
if px_area is None:
px_area = np.ones(shape, np.float32)
self.px_area = px_area
self.part_mean = shmem.empty(arr_shape, float)
self.part_deviation = shmem.empty(arr_shape, float)
self.part_count = shmem.empty(arr_shape, int)
self.part_num_frames = shmem.empty(nproc, int)
def reset(self, module_ix, det_source, mask=None):
self.det_source = det_source
self.modi = module_ix
self.r = None
self.count = None
self.mean = None
self.deviation = None
self.num_frames = None
def _compute_worker_part(self, args):
i, dc = args
mean = self.part_mean[i]
mean[:] = 0
deviation = self.part_deviation[i]
deviation[:] = 0
count = self.part_count[i]
count[:] = 0
self.part_num_frames[i] = np.sum(
dc[self.det_source].data_counts(index_group="image",
labelled=False)
)
inp_mask = self.mask[self.modi - self.mod0][None, ...]
for tid, data in dc.trains():
a = data[self.det_source]["image.data"] / self.adu_per_unit
msk = (data[self.det_source]["image.mask"] == 0) & inp_mask
a[~msk] = 0.0
if self.rounding:
msk &= a > -self.rounding_shift - 0.5
np.round(a - self.rounding_shift, out=a)
np.clip(a, 0.0, None, out=a)
mean += np.sum(a, 0)
deviation += np.sum(a * a, 0)
count += np.sum(msk.astype(int), 0)
return i
def compute(self, dc_img, source_pattern):
pool = mp.Pool(self.nproc)
for modi, modno in self.iter_modules():
tm0 = time.perf_counter()
print(f"[{self._rank:2d}] {modno:3d}: ", end="")
source_id = source_pattern.format(
detector_id=self.detector_id, modno=modno)
dc = dc_img.select(
[(source_id, "image.data"), (source_id, "image.mask")],
require_all=True
)
self.reset(modi, source_id)
result_iter = pool.imap_unordered(
self._compute_worker_part,
enumerate(dc.split_trains(self.nproc))
)
for r in result_iter:
pass
self.finish()
self.write_module()
tm1 = time.perf_counter()
print(f"{tm1 - tm0:.1f} s")
pool.terminate()
pool.join()
def finish(self):
self.r = type("Moments", (), {})
self.count = np.sum(self.part_count, 0)
nz = self.count > 0
self.mean = np.zeros(self.shape, np.float32)
self.mean = np.divide(
np.sum(self.part_mean, 0), self.count,
out=self.mean, where=nz
)
self.mean /= self.px_area
self.deviation = np.zeros(self.shape, np.float32)
self.deviation = np.divide(
np.sum(self.part_deviation, 0), self.count,
out=self.deviation, where=nz
)
self.deviation = np.sqrt(self.deviation - self.mean * self.mean)
self.deviation /= self.px_area
self.num_frames = np.sum(self.part_num_frames)
def _create_h5datasets(self, h5f):
s = (self.num_modules,) + self.shape
self._ds_mean = h5f.create_dataset(
"powderSum/image/mean", s, dtype="f4")
self._ds_std = h5f.create_dataset(
"powderSum/image/std", s, dtype="f4")
self._ds_count = h5f.create_dataset(
"powderSum/image/count", s, dtype="i8")
self._ds_nfrm = h5f.create_dataset(
"powderSum/image/numFrames", self.num_modules, dtype="i8")
self._ds_mask = h5f.create_dataset(
"powderSum/image/mask", s, dtype="u1")
def _create_output_arrays(self):
n = self.modN - self.mod0
s = (n,) + self.shape
self._ds_mean = np.zeros(s, dtype=np.float32)
self._ds_std = np.zeros(s, dtype=np.float32)
self._ds_count = np.zeros(s, dtype=int)
self._ds_nfrm = np.zeros(n, dtype=int)
self._ds_mask = np.zeros(s, dtype=np.uint8)
def prepare(self, output_fn, mask=None, comm=None, conditions={}):
if comm is None:
size = 1
rank = 0
else:
size = comm.Get_size()
rank = comm.Get_rank()
self.mod0 = rank * self.num_modules // size
self.modN = (rank + 1) * self.num_modules // size
self._rank = rank
self._size = size
self._comm = comm
n = self.modN - self.mod0
if mask is None:
s = (n,) + self.shape
self.mask = np.ones(s, bool)
else:
self.mask = mask[self.mod0:self.modN] == 0
if rank == 0:
h5f = h5py.File(output_fn, "w")
# modules
h5f["powderSum/image/modules"] = self.modules
# conditions
conditions_grp = h5f.create_group("powderSum/conditions")
conditions_grp["detectorId"] = self.detector_id.encode("ascii")
for key, value in conditions.items():
conditions_grp[key] = value
else:
h5f = None
self._h5f = h5f
# average image
if size > 1:
self._create_output_arrays()
else:
self._create_h5datasets(h5f)
def iter_modules(self):
return enumerate(self.modules[self.mod0:self.modN], start=self.mod0)
def write_module(self):
i = self.modi - self.mod0
self._ds_mean[i] = self.mean
self._ds_std[i] = self.deviation
self._ds_count[i] = self.count
self._ds_nfrm[i] = self.num_frames
self._ds_mask[i] = ~self.mask[i] | (self.count == 0)
def _send(self):
self._comm.Gatherv(self._ds_mean, None, root=0)
self._comm.Gatherv(self._ds_std, None, root=0)
self._comm.Gatherv(self._ds_count, None, root=0)
self._comm.Gatherv(self._ds_nfrm, None, root=0)
self._comm.Gatherv(self._ds_mask, None, root=0)
def _recv_and_write(self):
s = (self.num_modules,) + self.shape
buf = np.zeros(s, dtype=np.float32)
self._comm.Gatherv(self._ds_mean, buf, root=0)
self._h5f["powderSum/image/mean"] = buf
self._comm.Gatherv(self._ds_std, buf, root=0)
self._h5f["powderSum/image/std"] = buf
buf = np.zeros(s, dtype=int)
self._comm.Gatherv(self._ds_count, buf, root=0)
self._h5f["powderSum/image/count"] = buf
buf = np.zeros(self.num_modules, dtype=int)
self._comm.Gatherv(self._ds_nfrm, buf, root=0)
self._h5f["powderSum/image/numFrames"] = buf
buf = np.zeros(s, dtype=np.uint8)
self._comm.Gatherv(self._ds_mask, buf, root=0)
self._h5f["powderSum/image/mask"] = buf
def flush(self):
if self._size <= 1:
return
if self._rank == 0:
self._recv_and_write()
self._h5f.close()
else:
self._send()
def __getstate__(self):
state = self.__dict__.copy()
state["_ds_mean"] = None
state["_ds_std"] = None
state["_ds_count"] = None
state["_ds_nfrm"] = None
state["_ds_mask"] = None
state["_h5f"] = None
state["_comm"] = None
return state
import multiprocessing as mp
import os
import re
import time
from argparse import ArgumentParser
import h5py
import numpy as np
import numpy as np # noqa: F401
import psutil
from extra_data import RunDirectory, open_run
from extra_data.read_machinery import find_proposal
from . import shmemarray as shmem
from .image_agg import ImageAgg
from .misc import agipd_pixel_area, jungfrau_pixel_area
try:
......@@ -45,244 +44,6 @@ PX_AREA = {
}
class ImageAgg:
def __init__(self, nproc, detector_id, shape, modules,
adu_per_unit=1, rounding_threshold=None, px_area=None):
self.adu_per_unit = adu_per_unit
self.modules = modules
self.num_modules = len(modules)
self.detector_id = detector_id
if rounding_threshold is None:
self.rounding = False
self.rounding_shift = 0.0
else:
self.rounding = True
self.rounding_shift = rounding_threshold - 0.5
self.shape = shape
self.nproc = nproc
arr_shape = (nproc,) + tuple(shape)
if px_area is None:
px_area = np.ones(shape, np.float32)
self.px_area = px_area
self.part_mean = shmem.empty(arr_shape, float)
self.part_deviation = shmem.empty(arr_shape, float)
self.part_count = shmem.empty(arr_shape, int)
self.part_num_frames = shmem.empty(nproc, int)
def reset(self, module_ix, det_source, mask=None):
self.det_source = det_source
self.modi = module_ix
self.r = None
self.count = None
self.mean = None
self.deviation = None
self.num_frames = None
def _compute_worker_part(self, args):
i, dc = args
mean = self.part_mean[i]
mean[:] = 0
deviation = self.part_deviation[i]
deviation[:] = 0
count = self.part_count[i]
count[:] = 0
self.part_num_frames[i] = np.sum(
dc[self.det_source].data_counts(index_group="image",
labelled=False)
)
inp_mask = self.mask[self.modi - self.mod0][None, ...]
for tid, data in dc.trains():
a = data[self.det_source]["image.data"] / self.adu_per_unit
msk = (data[self.det_source]["image.mask"] == 0) & inp_mask
a[~msk] = 0.0
if self.rounding:
msk &= a > -self.rounding_shift - 0.5
np.round(a - self.rounding_shift, out=a)
np.clip(a, 0.0, None, out=a)
mean += np.sum(a, 0)
deviation += np.sum(a * a, 0)
count += np.sum(msk.astype(int), 0)
return i
def compute(self, dc_img, source_pattern):
pool = mp.Pool(self.nproc)
for modi, modno in self.iter_modules():
tm0 = time.perf_counter()
print(f"[{self._rank:2d}] {modno:3d}: ", end="")
source_id = source_pattern.format(
detector_id=self.detector_id, modno=modno)
dc = dc_img.select(
[(source_id, "image.data"), (source_id, "image.mask")],
require_all=True
)
self.reset(modi, source_id)
result_iter = pool.imap_unordered(
self._compute_worker_part,
enumerate(dc.split_trains(self.nproc))
)
for r in result_iter:
pass
self.finish()
self.write_module()
tm1 = time.perf_counter()
print(f"{tm1 - tm0:.1f} s")
pool.terminate()
pool.join()
def finish(self):
self.r = type("Moments", (), {})
self.count = np.sum(self.part_count, 0)
nz = self.count > 0
self.mean = np.zeros(self.shape, np.float32)
self.mean = np.divide(
np.sum(self.part_mean, 0), self.count,
out=self.mean, where=nz
)
self.mean /= self.px_area
self.deviation = np.zeros(self.shape, np.float32)
self.deviation = np.divide(
np.sum(self.part_deviation, 0), self.count,
out=self.deviation, where=nz
)
self.deviation = np.sqrt(self.deviation - self.mean * self.mean)
self.deviation /= self.px_area
self.num_frames = np.sum(self.part_num_frames)
def _create_h5datasets(self, h5f):
s = (self.num_modules,) + self.shape
self._ds_mean = h5f.create_dataset(
"powderSum/image/mean", s, dtype="f4")
self._ds_std = h5f.create_dataset(
"powderSum/image/std", s, dtype="f4")
self._ds_count = h5f.create_dataset(
"powderSum/image/count", s, dtype="i8")
self._ds_nfrm = h5f.create_dataset(
"powderSum/image/numFrames", self.num_modules, dtype="i8")
self._ds_mask = h5f.create_dataset(
"powderSum/image/mask", s, dtype="u1")
def _create_output_arrays(self):
n = self.modN - self.mod0
s = (n,) + self.shape
self._ds_mean = np.zeros(s, dtype=np.float32)
self._ds_std = np.zeros(s, dtype=np.float32)
self._ds_count = np.zeros(s, dtype=int)
self._ds_nfrm = np.zeros(n, dtype=int)
self._ds_mask = np.zeros(s, dtype=np.uint8)
def prepare(self, output_fn, mask=None, comm=None, conditions={}):
if comm is None:
size = 1
rank = 0
else:
size = comm.Get_size()
rank = comm.Get_rank()
self.mod0 = rank * self.num_modules // size
self.modN = (rank + 1) * self.num_modules // size
self._rank = rank
self._size = size
self._comm = comm
n = self.modN - self.mod0
if mask is None:
s = (n,) + self.shape
self.mask = np.ones(s, bool)
else:
self.mask = mask[self.mod0:self.modN] == 0
if rank == 0:
h5f = h5py.File(output_fn, "w")
# modules
h5f["powderSum/image/modules"] = self.modules
# conditions
conditions_grp = h5f.create_group("powderSum/conditions")
conditions_grp["detectorId"] = self.detector_id.encode("ascii")
for key, value in conditions.items():
conditions_grp[key] = value
else:
h5f = None
self._h5f = h5f
# average image
if size > 1:
self._create_output_arrays()
else:
self._create_h5datasets(h5f)
def iter_modules(self):
return enumerate(self.modules[self.mod0:self.modN], start=self.mod0)
def write_module(self):
i = self.modi - self.mod0
self._ds_mean[i] = self.mean
self._ds_std[i] = self.deviation
self._ds_count[i] = self.count
self._ds_nfrm[i] = self.num_frames
self._ds_mask[i] = ~self.mask[i] | (self.count == 0)
def _send(self):
self._comm.Gatherv(self._ds_mean, None, root=0)
self._comm.Gatherv(self._ds_std, None, root=0)
self._comm.Gatherv(self._ds_count, None, root=0)
self._comm.Gatherv(self._ds_nfrm, None, root=0)
self._comm.Gatherv(self._ds_mask, None, root=0)
def _recv_and_write(self):
s = (self.num_modules,) + self.shape
buf = np.zeros(s, dtype=np.float32)
self._comm.Gatherv(self._ds_mean, buf, root=0)
self._h5f["powderSum/image/mean"] = buf
self._comm.Gatherv(self._ds_std, buf, root=0)
self._h5f["powderSum/image/std"] = buf
buf = np.zeros(s, dtype=int)
self._comm.Gatherv(self._ds_count, buf, root=0)
self._h5f["powderSum/image/count"] = buf
buf = np.zeros(self.num_modules, dtype=int)
self._comm.Gatherv(self._ds_nfrm, buf, root=0)
self._h5f["powderSum/image/numFrames"] = buf
buf = np.zeros(s, dtype=np.uint8)
self._comm.Gatherv(self._ds_mask, buf, root=0)
self._h5f["powderSum/image/mask"] = buf
def flush(self):
if self._size <= 1:
return
if self._rank == 0:
self._recv_and_write()
self._h5f.close()
else:
self._send()
def __getstate__(self):
state = self.__dict__.copy()
state["_ds_mean"] = None
state["_ds_std"] = None
state["_ds_count"] = None
state["_ds_nfrm"] = None
state["_ds_mask"] = None
state["_h5f"] = None
state["_comm"] = None
return state
def main(argv=None):
tm0 = time.perf_counter()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment