Skip to content
Snippets Groups Projects
Commit c716ada2 authored by midonc's avatar midonc
Browse files

added fastXPCS from online cluster

parent fa018d56
No related branches found
No related tags found
No related merge requests found
from .algos import WelfordAlg
def read_stats(path, pulseIds, user_mask=None):
import numpy as np
import xarray as xr
from h5py import File
with File(path) as stats:
mean_pixelcell = []
for mod in range(16):
summed= np.zeros((len(pulseIds),512,128))
counts = 0
for file in stats[f'mod{mod:02d}'].keys():
_cnt = np.array(stats[f'mod{mod:02d}/'+file+'/count'])
summed += np.array(stats[f'mod{mod:02d}/'+file+'/mean/'])*_cnt
counts += _cnt
mean_pixelcell.append(summed/counts)
mean_pixelcell = xr.DataArray(
data = np.stack(mean_pixelcell),
dims = ['module','pulseId','dim_0','dim_1'],
coords = dict(
module = np.arange(16),
pulseId = pulseIds,
dim_0 = np.arange(512),
dim_1 = np.arange(128))).transpose('pulseId','module','dim_0','dim_1')
if user_mask is not None:
mean_pixelcell = mean_pixelcell.where(~user_mask[None, :, :, :])
return mean_pixelcell
import time
from dataclasses import dataclass
import numpy as np
from .welford import welford
from .dense import ttcDense, ttcDense_int
from .fxpcs_sparse import sparsify, twoTime, twoTimeSymmNorm
class WelfordAlg:
def __init__(self, size, dtype=np.float32):
self.dtype = dtype
self.count = np.zeros(size, dtype=dtype)
self.avg = np.zeros(size, dtype=dtype)
self.var = np.zeros(size, dtype=dtype)
def update(self, x, mask=None, alpha=None):
x = x.astype(self.dtype)
cond = ~mask & (self.count == 0)
self.avg = np.where(~mask, self.avg, x)
self.var = np.where(~mask, self.var, 0.0)
alph_arr = np.where(self.count, 1/self.count, 0.0)
if alpha is not None:
alph_arr[:] = np.where(~mask, alpha, 0.0)
welford(x, self.avg, self.var, alph_arr)
self.count = np.where(~mask, self.count + 1, self.count)
### MATH ON TRAINS
def do_stats(iterator, *args):
tids = list()
tid, img, cmsk = iterator.__next__()
tids.append(tid)
wf = WelfordAlg(img.shape)
_msk = (cmsk > 0).astype(np.bool_)
wf.update(img, _msk)
for tid, img, cmsk in iterator:
tids.append(tid)
_msk = (cmsk > 0).astype(np.bool_)
wf.update(img, _msk)
#print(tid, wf.count.mean())
return tids, wf.avg, wf.var, wf.count
##############
# DENSE TTC
##############
def do_dense(iterator, m):
# all_ttcs_on = list()
# all_ttcs_off= list()
tids = list()
tid, img, cmsk = iterator.__next__()
tids.append(tid)
j = 0
prevImg = np.ones_like(img, dtype=img.dtype)
prevMsk = np.ones_like(m, dtype=m.dtype)
_msk = (m & ~np.any(cmsk, axis=0) & prevMsk).astype(np.uint8)
ttc_on, ttc_off = ttcDense_int(img.reshape(img.shape[0], -1),
prevImg.reshape(img.shape[0], -1), _msk)
s_on = np.einsum( 'ia, ja -> ij', _msk.reshape(_msk.shape[0], -1), img.reshape(img.shape[0], -1),
optimize='optimal', dtype=np.int32)
s_cnt = _msk.sum((1,2))
# all_ttcs_on.append(ttc_on)
# all_ttcs_off.append(ttc_off)
all_ttcs_on = np.zeros( [256] + list(ttc_on.shape), dtype=img.dtype )
all_ttcs_off= np.zeros( [256] + list(ttc_off.shape), dtype=img.dtype )
all_s_on = np.zeros( [256] + list(ttc_on.shape)[:2], dtype=img.dtype )
all_s_cnt= np.zeros( [256, ttc_on.shape[0]], dtype=img.dtype )
all_ttcs_on[j] = ttc_on.copy()
all_ttcs_off[j] = ttc_off.copy()
all_s_on[j] = s_on.copy()
all_s_cnt[j] = s_cnt.copy()
prevImg[:] = img
prevMsk[:] = ~np.any(cmsk, axis=0)
for tid, img, cmsk in iterator:
j += 1
tids.append(tid)
_msk = (m & ~np.any(cmsk, axis=0) & prevMsk).astype(np.uint8)
ttc_on, ttc_off = ttcDense_int(img.reshape(img.shape[0], -1),
prevImg.reshape(img.shape[0], -1), _msk)
s_on = np.einsum( 'ia, ja -> ij', _msk.reshape(_msk.shape[0], -1), img.reshape(img.shape[0], -1),
optimize='optimal', dtype=np.int32)
s_cnt = _msk.sum((1,2))
all_ttcs_on[j] = ttc_on.astype(np.float32).copy()
all_ttcs_off[j] = ttc_off.astype(np.float32).copy()
all_s_on[j] = s_on.copy()
all_s_cnt[j] = s_cnt.copy()
#print(tid, j, flush=True)
prevImg[:] = img
prevMsk[:] = ~np.any(cmsk, axis=0)
dx, dy = np.diag_indices(img.shape[0])
all_ttcs_on = all_ttcs_on + np.swapaxes(all_ttcs_on,-2,-1)
all_ttcs_on[:,:, dx, dy] //= 2
all_ttcs_off = all_ttcs_off + np.swapaxes(all_ttcs_off,-2,-1)
all_ttcs_off[:,:, dx, dy] //= 2
return tids, all_ttcs_on[:j+1], all_ttcs_off[:j+1], all_s_on[:j+1], all_s_cnt[:j+1]
##############
# SPARSE TTC
##############
@dataclass
class TTCdata:
user_q_mask = None
current_image = None
current_image_sparse = None
current_cal_mask = None
prev_image = None
prev_image_sparse = None
prev_mask = None
ttc_on = None
ttc_off = None
s_on = None
s_off = None
s_cnt = None
def do_sparse_train(data: TTCdata):
if data.prev_image is None:
data.prev_image = np.ones_like(data.current_image)
data.prev_image_sparse = data.current_image_sparse # sparsify(data.prev_image.reshape(data.prev_image.shape[0], -1))
data.prev_mask = np.ones_like(data.user_q_mask)
start = time.time()
_msk = (data.user_q_mask & ~np.any(data.current_cal_mask, axis=0) & data.prev_mask).astype(np.uint8)
# print(f"Mask: {time.time() - start:.3f}")
_im = data.current_image.reshape(data.current_image.shape[0], -1)
start = time.time()
a = data.current_image_sparse # sparsify(_im)
# print(f"Sparsifying ({a.shape}): {time.time() - start:.3f}")
reps = (_im > 0).sum(0).cumsum().astype(np.uint32)
_imPrev = data.prev_image.reshape(data.prev_image.shape[0], -1)
b = data.prev_image_sparse # sparsify(_imPrev)
start = time.time()
repsPrev = (_imPrev > 0).sum(0).cumsum().astype(np.uint32)
# print(f"repsPrev: {time.time() - start:.3f}")
start = time.time()
data.s_cnt = _msk.sum((1,2))
# print(f"s_cnt: {data.s_cnt}")
ttc_on, ttc_off, s_on, s_off = twoTime(a, reps, b, repsPrev,
_msk, data.current_image.shape[0])
# print(f"TTCF: {time.time() - start:.3f}")
data.ttc_on = ttc_on
data.ttc_off = ttc_off
data.s_on = s_on
data.s_off = s_off
data.prev_image = data.current_image
data.prev_image_sparse = data.current_image_sparse # a
data.prev_mask = ~np.any(data.current_cal_mask, axis=0)
def do_sparse(iterator, m):
# all_ttcs = list()
tids = list()
tid, img, cmsk = iterator.__next__()
tids.append(tid)
j = 0
ttc_data = TTCdata()
ttc_data.user_q_mask = m
ttc_data.current_image = img
ttc_data.current_cal_mask = cmsk
do_sparse_train(ttc_data)
ttc_on_shape = list(ttc_data.ttc_on.shape)
all_ttcs_on = np.zeros( [256] + ttc_on_shape, dtype=img.dtype )
all_ttcs_off = np.zeros( [256] + ttc_on_shape, dtype=img.dtype )
all_s_on = np.zeros( [256] + ttc_on_shape[:2], dtype=img.dtype )
all_s_off = np.zeros( [256] + ttc_on_shape[:2], dtype=img.dtype )
all_s_cnt = np.zeros( [256, ttc_on_shape[0]], dtype=img.dtype )
all_ttcs_on[j] = ttc_data.ttc_on.copy()
all_ttcs_off[j] = ttc_data.ttc_off.copy()
all_s_on[j] = ttc_data.s_on.copy()
all_s_off[j] = ttc_data.s_off.copy()
all_s_cnt[j] = ttc_data.s_cnt.copy()
for tid, img, cmsk in iterator:
j += 1
tids.append(tid)
ttc_data.current_image = img
ttc_data.current_cal_mask = cmsk
do_sparse_train(ttc_data)
all_ttcs_on[j] = ttc_data.ttc_on.copy()
all_ttcs_off[j] = ttc_data.ttc_off.copy()
all_s_on[j] = ttc_data.s_on.copy()
all_s_off[j] = ttc_data.s_off.copy()
all_s_cnt[j] = ttc_data.s_cnt.copy()
dx, dy = np.diag_indices(img.shape[0])
all_ttcs_on = all_ttcs_on + np.swapaxes(all_ttcs_on,-2,-1)
all_ttcs_on[:,:, dx, dy] //= 2
return tids, all_ttcs_on[:j+1], all_ttcs_off[:j+1], all_s_on[:j+1], all_s_off[:j+1], all_s_cnt[:j+1]
##############
# NUMPY EINSUM
##############
def do_traditional(iterator, m):
tids = list()
tid, img, cmsk = iterator.__next__()
tids.append(tid)
j = 0
prevImg = np.ones_like(img, dtype=img.dtype)
prevCmsk = np.ones_like(cmsk, dtype=img.dtype)
ttc_on = np.zeros( (m.shape[0], img.shape[0], img.shape[0]), dtype=img.dtype)
ttc_off = np.ones_like(ttc_on)
s_on = np.ones((m.shape[0], img.shape[0], ), dtype=img.dtype)
s_off = np.ones((m.shape[0], img.shape[0], ), dtype=img.dtype)
s_cnt = np.ones((m.shape[0]), dtype=img.dtype)
_ttc = np.zeros( (img.shape[0], img.shape[0]), dtype=img.dtype )
all_ttcs_on = np.zeros( (256, m.shape[0], img.shape[0], img.shape[0]), dtype=img.dtype )
all_ttcs_off = np.ones_like(all_ttcs_on)
all_s_on = np.ones( (256, m.shape[0], img.shape[0], ), dtype=img.dtype )
all_s_off = np.ones( (256, m.shape[0], img.shape[0], ), dtype=img.dtype )
all_s_cnt = np.ones( (256, m.shape[0]), dtype=img.dtype )
for i, _m in enumerate(m):
_msk = (_m & ~(np.any(cmsk, axis=0)) & ~(np.any(prevCmsk, axis=0))).astype(_m.dtype)
_a = img[:, _msk].reshape(img.shape[0], -1)
_ttc = np.einsum( 'ia, ja -> ij', _a, _a, optimize='optimal', dtype=np.int32)
_s = np.nansum(_a,1)
ttc_on[i] = _ttc
s_on[i] = _s
s_cnt[i] = np.nansum(_msk)
_b = prevImg[:, _msk].reshape(img.shape[0], -1)
_ttc = np.einsum( 'ia, ja -> ij', _a, _b, optimize='optimal', dtype=np.int32)
ttc_off[i] = _ttc
_s_off = np.nansum(_b,1)
s_off[i] = _s_off
all_ttcs_on[j] = ttc_on.copy()
all_ttcs_off[j] = ttc_off.copy()
all_s_on[j] = s_on.copy()
all_s_off[j] = s_off.copy()
all_s_cnt[j] = s_cnt.copy()
prevImg[:] = img
for tid, img, cmsk in iterator:
j += 1
tids.append(tid)
for i, _m in enumerate(m):
_msk = (_m & ~(np.any(cmsk, axis=0)) & ~(np.any(prevCmsk, axis=0))).astype(_m.dtype)
_a = img[:, _msk].reshape(img.shape[0], -1)
_ttc = np.einsum( 'ia, ja -> ij', _a, _a, optimize='optimal', dtype=np.int32)
_s = np.nansum(_a,1)
ttc_on[i] = _ttc
s_on[i] = _s
s_cnt[i] = np.nansum(_msk)
_b = prevImg[:, _msk].reshape(img.shape[0], -1)
_ttc = np.einsum( 'ia, ja -> ij', _a, _b, optimize='optimal', dtype=np.int32)
ttc_off[i] = _ttc
_s_off = np.nansum(_b,1)
s_off[i] = _s_off
all_ttcs_on[j] = ttc_on.copy()
all_ttcs_off[j] = ttc_off.copy()
all_s_on[j] = s_on.copy()
all_s_off[j] = s_off.copy()
all_s_cnt[j] = s_cnt.copy()
#print(tid, j, flush=True)
prevImg[:] = img
prevCmsk[:] = cmsk
return tids, all_ttcs_on[:j+1], all_ttcs_off[:j+1], all_s_on[:j+1], all_s_off[:j+1], all_s_cnt[:j+1]
source diff could not be displayed: it is too large. Options to address this: view the blob.
cimport cython
import numpy as np
from scipy.linalg.cython_blas cimport ssyr, ssyr2
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
@cython.cdivision(True)
cpdef ttcDense(float[:,::1] data,
float[:,::1] prevDat,
char[:,:,::1] mask):
cdef int i, j, k, m
cdef int dim = data.shape[0]
cdef int n_reg = mask.shape[0]
# cdef int n_trn = data.shape[0]
cdef int n_pix = data.shape[1]
cdef float[:,:,::1] A = np.zeros(( n_reg, dim, dim), dtype=np.float32)
cdef float[:,:,::1] B = np.zeros_like(A)
cdef int one = 1
cdef float fac = 1., half=0.5
cdef int[::1] ixmsk = np.argmax(mask, axis=0).ravel().astype(np.int32)
cdef char[::1] cond = np.any(mask, axis=0).ravel().astype(np.int8)
# for i in range(n_trn):
for j in range(n_pix):
if cond[j]:
m = ixmsk[j]
ssyr(
"L",
&dim,
&fac,
&data[0 , j],
&n_pix,
&A[m,0,0],
&dim
)
ssyr2(
"L",
&dim,
&half,
&data[ 0 , j],
&n_pix,
&prevDat[ 0 , j],
&n_pix,
&B[m,0,0],
&dim
)
return np.asarray(A), np.asarray(B)
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
@cython.cdivision(True)
cdef _ttc_upd(int val,
int* data,
int* prevDat,
int* A,
int* B,
int size,
int stride):
cdef int l, v2
for l in range(size):
v2 = data[l * stride]
A[l] += val * v2
v2 = prevDat[l * stride]
B[l] += val * v2
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
@cython.cdivision(True)
cpdef ttcDense_int(int[:,::1] data,
int[:,::1] prevDat,
char[:,:,::1] mask):
cdef int i, j, k, m, l
cdef int v1, v2
cdef int dim = data.shape[0]
cdef int n_reg = mask.shape[0]
# cdef int n_trn = data.shape[0]
cdef int n_pix = data.shape[1]
cdef int[:,:,::1] A = np.zeros(( n_reg, dim, dim), dtype=np.int32)
cdef int[:,:,::1] B = np.zeros_like(A)
cdef int[::1] ixmsk = np.argmax(mask, axis=0).ravel().astype(np.int32)
cdef char[::1] cond = np.any(mask, axis=0).ravel().astype(np.int8)
for j in range(n_pix):
if cond[j]:
m = ixmsk[j]
for k in range(dim):
v1 = data[k, j]
if v1:
_ttc_upd(v1,
&data[0,j],
&prevDat[0,j],
&A[m,k,0],
&B[m,k,0],
dim,
n_pix)
return np.asarray(A), np.asarray(B)
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
@cython.cdivision(True)
cpdef ttcDenseSymmNorm(float[:,:,::1] data,
char[:,:,::1] mask):
cdef int i, j, k, l, m
cdef int dim = data.shape[1]
cdef int n_reg = mask.shape[0]
cdef int n_trn = data.shape[0]
cdef int n_pix = data.shape[2]
cdef float[:,:,:,::1] A = np.zeros((n_trn, n_reg, dim, dim), dtype=np.float32)
cdef float[:,::1] _a = np.zeros((dim, dim), dtype=np.float32 )
cdef float[:,:,::1] N = np.zeros((n_trn, n_reg, dim), dtype=np.float32)
cdef int one = 1
cdef float fac = 1.
cdef float cs_left, cs_rght, nrm
cdef int[::1] ixmsk = np.argmax(mask, axis=0).ravel().astype(np.int32)
cdef char[::1] cond = np.any(mask, axis=0).ravel().astype(np.int8)
for i in range(n_trn):
for j in range(n_pix):
if cond[j]:
m = ixmsk[j]
_a[:] = 0
ssyr(
"L",
&dim,
&fac,
&data[i, 0 , j],
&n_pix,
&_a[0,0],
&dim
)
cs_left = 0
cs_rght = 0
for k in range(dim):
cs_left += data[i, k, j]
cs_rght += data[i, dim-k-1, j]
if cs_left and cs_rght:
nrm = ((k+1) * (k+1)) / (cs_left * cs_rght)
else:
nrm = 0
for l in range(k+1):
A[i,m,l,dim-k+l-1] += nrm * _a[l, dim-k+l-1]
return np.asarray(A)
source diff could not be displayed: it is too large. Options to address this: view the blob.
cimport cython
import numpy as np
@cython.boundscheck(False)
@cython.wraparound(False)
def sparsify( int[:,::1] data):
cdef int k = data.shape[0]
cdef int l = data.shape[1]
cdef int i, j
cdef list indices = []
cdef int a
cdef unsigned int b
ix_old = -1
for j in range(l):
for i in range(k):
a = data[i,j]
if a:
b = 0 # 16 bit 9 bit 7 bit
b |= (j << 16) ####pixel####### ###cell## #count#
b |= (i << 7)
b |= (a & 0x7F)
indices.append(b)
return np.asarray(indices, dtype=np.uint32)
@cython.wraparound(False)
def sparsify2(unsigned int[::1] pixels, unsigned int[:, ::1] indices, unsigned int[::1] out):
cdef unsigned int elem, cell, idx
for i in range(pixels.shape[0]):
cell = indices[1, i]
idx = indices[0, i]
elem = 0
elem |= (idx << 16)
elem |= (cell << 7)
elem |= (pixels[i] & 0x7F)
out[i] = elem
@cython.boundscheck(False)
@cython.wraparound(False)
def densify( unsigned int[::1] indices,
tuple size):
cdef int i, j
cdef short[:,::1] A = np.zeros(size, dtype=np.int16)
cdef int col, val
cdef int row
for i in range(len(indices)):
val = indices[i]
row = (val >> 7) & 0x1FF
col = val >> 16
val &= 0x7F
A[row, col] = val
return np.asarray(A)
# @cython.boundscheck(False)
@cython.wraparound(False)
def twoTime( unsigned int[::1] indices,
unsigned int[::1] reps,
unsigned int[::1] indices_prev,
unsigned int[::1] reps_prev,
char[:,:,::1] mask,
int dim):
cdef unsigned int i=0, j=0, k=0
cdef unsigned int n_reg = mask.shape[0]
cdef int[:,:,::1] ttc_on = np.zeros( (n_reg, dim, dim), dtype=np.int32)
cdef int[:,:,::1] ttc_off = np.zeros( (n_reg, dim, dim), dtype=np.int32)
cdef int[:,::1] s_on = np.zeros( (n_reg, dim, ), dtype=np.int32)
cdef int[:,::1] s_off = np.zeros( (n_reg, dim, ), dtype=np.int32)
cdef int[::1] s_cnt = np.zeros( (n_reg, ), dtype=np.int32)
cdef unsigned int col, val, v, rmax, rmin, lst_r=0
cdef unsigned int rP, lst_rP = 0
cdef unsigned int row, row2, val2, m
cdef int[::1] ixmsk = np.argmax(mask, axis=0).ravel().astype(np.int32)
cdef char[::1] cond = np.any(mask, axis=0).ravel().astype(np.int8)
for j in range(reps.shape[0]):
r = reps[j]
rmax = 0 if r ==0 else r - 1
rmin = min(lst_r, rmax)
val = indices[rmin]
col = val >> 16
# prevImg
rP = reps_prev[j]
# rmax = 0 if rP ==0 else rP - 1
# rmin = min(lst_rP, rmax)
# val = indices_prev[rmin]
# if r and rP:
# assert col == (val >> 16)
if cond[col]:
m = ixmsk[col]
i = lst_r
while i < r:
val = indices[i]
row = (val >> 7) & 0x1FF
val &= 0x7F
s_on[m,row] += val
k = i
while k <r:
val2 = indices[k]
row2 = (val2 >> 7) & 0x1FF
val2 &= 0x7F
ttc_on[m, row, row2] += val * val2
k += 1
# prevImg
k = lst_rP
while False: # k < rP:
val2 = indices_prev[k]
if (val2 >> 16) == col:
row2 = (val2 >> 7) & 0x1FF
val2 &= 0x7F
v = val * val2
ttc_off[m, row, row2] += v
# ttc_off[m, row2, row] += v
k += 1
i += 1
i = lst_rP
while False: # i < rP:
val2 = indices_prev[i]
row2 = (val2 >> 7) & 0x1FF
val2 &= 0x7F
s_off[m,row2] += val2
i += 1
lst_r = r
lst_rP = rP
return np.asarray(ttc_on), np.asarray(ttc_off), np.asarray(s_on), np.asarray(s_off)
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
def twoTimeSymmNorm( unsigned int[::1] indices,
unsigned int[::1] reps,
char[:,:,::1] mask,
float[:,::1] avg,
int dim):
cdef unsigned int i=0, j=0, k=0, k1, k2, m
cdef int n_reg = mask.shape[0]
cdef float[:,:,::1] ttc = np.zeros( (n_reg, dim, dim), dtype=np.float32)
cdef float[::1] nrm = np.zeros( dim, dtype=np.float32 )
cdef int[::1] ixmsk = np.argmax(mask, axis=0).ravel().astype(np.int32)
cdef char[::1] cond = np.any(mask, axis=0).ravel().astype(np.int8)
cdef unsigned int col, val1, val2, r, rmax, rmin, lst_r = 0, col1, col2
cdef unsigned int row1, row2
cdef unsigned int j1, j2
cdef float v, s1, s2, n
for j in range(reps.shape[0]):
r = reps[j]
rmax = 0 if r ==0 else r - 1
rmin = min(lst_r, rmax)
k1 = rmin
k2 = rmax
val1 = indices[k1]
row1 = (val1 >> 7) & 0x1FF
val2 = indices[k2]
row2 = (val2 >> 7) & 0x1FF
s1 = 0
s2 = 0
col = val1 >> 16
if cond[col]:
m = ixmsk[col]
for i in range(dim):
# if i == row1:
# v = val1 & 0x7F
# k1 = min(k1+1, rmax)
# val1 = indices[k1]
# row1 = (val1 >> 7) & 0x1FF
# else: v = 0
v = avg[i, col]
s1 += (v-s1)/(i+1)
# if (dim-1-i) == row2:
# v = val2 & 0x7F
# k2 = max(rmin, k2-1)
# val2 = indices[k2]
# row2 = (val2 >> 7) & 0x1FF
# else: v = 0
v = avg[dim-1-i, col]
s2 += (v-s2)/(i+1)
n = s1 * s2
nrm[dim-1-i] = 1/n if n > 0 else 0
i = lst_r
while i < r:
val1 = indices[i]
col1 = val1 >> 16
row1 = (val1 >> 7) & 0x1FF
val1 &= 0x7F
k = i
while k <r:
val2 = indices[k]
col2 = val2 >> 16
row2 = (val2 >> 7) & 0x1FF
val2 &= 0x7F
n = nrm[ row2 - row1 ]
ttc[m, row1, row2] += val1 * val2 * n
k += 1
i += 1
lst_r = r
return np.asarray(ttc)
source diff could not be displayed: it is too large. Options to address this: view the blob.
import cython
@cython.boundscheck(False)
@cython.wraparound(False)
def welford (float[:,:,::1] x,
float[:,:,::1] avg,
float[:,:,::1] var,
float[:,:,::1] alpha):
cdef int size = avg.shape[0] * avg.shape[1] * avg.shape[2]
_wf(&x[0,0,0], &avg[0,0,0], &var[0,0,0], &alpha[0,0,0], size)
@cython.boundscheck(False)
@cython.wraparound(False)
cdef _wf(float* x,
float* avg,
float* var,
float* alpha,
int size):
cdef float delta, al, oneNegAl# = 1 - alpha
cdef int i
for i in range(size):
al = alpha[i]
oneNegAl = 1 - al
delta = x[i] - avg[i]
avg[i] += al * delta
var[i] = oneNegAl * (var[i] + al * delta * delta)
import os, sys
from time import time
from pathlib import Path
from datetime import datetime
from h5py import File
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from extra_data import open_run
from .algos import WelfordAlg, do_stats, do_dense, do_sparse, do_traditional
from argparse import ArgumentParser, Namespace
runtypes = { 'stats' : '00',
'denseTTC' : '1a',
'denseTTCSymmNorm' : '1b',
'sparseTTC' : '2a',
'sparseTTCSymmNorm': '2b',
'traditional' : '3'
}
trainMath = { 'stats' : (do_stats, ['tids', 'mean', 'var', 'count']),
'denseTTC' : (do_dense, ['tids', 'ttc_on', 'ttc_off', 's_on', 's_cnt']),
'denseTTCSymmNorm' : '1b',
'sparseTTC' : (do_sparse, ['tids', 'ttc_on', 'ttc_off', 's_on', 's_off', 's_cnt']),
# 'sparseTTCSymmNorm': '2b',
'traditional' : (do_traditional, ['tids', 'ttc_on', 'ttc_off', 's_on', 's_off', 's_cnt'])
}
pars = ArgumentParser(prog='processOnCPU')
pars.add_argument('--user', default='braussef')
pars.add_argument('--proposal', '-p', dest='proposal', default=2853, type=int )
pars.add_argument('--runs', '-r', dest='runs', default=[70], type=int,
nargs='*')
pars.add_argument('--type', '-t', dest='type', default='stats',
choices=list(runtypes) )
pars.add_argument('--cal_mask', '-cmsk', dest='cal_mask',
action="store_true") # load/use mask from calibration pipeline
pars.add_argument('--photonize', '-phot', dest='photonize',
action="store_true") # photonize data
pars.add_argument('--debug', '-dbg', dest='debug',
action="store_true") # serialize for debugging
pars.add_argument('--aggregate-modules', '-agg', dest='agg', default=False,
action="store_true")
pars.add_argument('--energy', '-e', dest='eng', default=9.0,
type=float )
pars.add_argument('--adu_per_keV', '-adu', dest='adu_per_keV', default=1.0,
type=float )
pars.add_argument('--skip_after', '-sm', dest='skip_after', default=256,
type=int )
pars.add_argument('--filter_trains', '-ft', dest='filter_trains',
action="store_true") # process only trainIds in good_trains.npy
pars.add_argument('--out_dir', '-d', dest='out_dir', default=None,
type=str )
pars.add_argument('--pulse_slice', dest='pulse_slice', default=[0, 352, 1], type=int, nargs=3)
def getModNo(hdf_file):
return int([frag for frag in hdf_file.split('-') if 'AGIPD' in frag][0][-2:])
def getFileNo(hdf_file):
return int([frag.split('.')[0][-4:] for frag in hdf_file.split('-') if 'S000' in frag][0])
def sliceTrains(hdf_file, module_number, args, good_trains):
# global usr_mask, good_trains
i = 0
tIds = np.unique(hdf_file['INSTRUMENT/MID_DET_AGIPD1M-1/DET/%iCH0:xtdf/image/trainId'% module_number][()])
tIds = tIds[tIds>0]
det = hdf_file['INSTRUMENT/MID_DET_AGIPD1M-1/DET/%iCH0:xtdf/image/data' % module_number]
if args.cal_mask:
cmsk = hdf_file['INSTRUMENT/MID_DET_AGIPD1M-1/DET/%iCH0:xtdf/image/mask' % module_number]
nTrains = len(tIds)
nCells = len(det) // nTrains
#print(hdf_file,module_number,nTrains,nCells)
#return
#if nCells not in [64, 352]: return None
a = np.zeros( (nCells, 512, 128), dtype=det.dtype )
b = np.zeros( (nCells, 512, 128), dtype=det.dtype )
#print('skip after '+str(args.skip_after))
while i < nTrains and i <=args.skip_after:
tid = tIds[i]
if tid in good_trains:
det.read_direct(a, source_sel=np.s_[i*nCells : (i+1)*nCells, :, :])
if args.photonize:
_a = a / args.eng
_a /= args.adu_per_keV
_a = np.rint(_a) # `rint` does the same thing as `round`, but faster!
_a[_a<0] = 0
_a = _a[slice(*args.pulse_slice)].astype('int32').copy()
else:
_a = a[slice(*args.pulse_slice)] .astype('int32').copy()
if args.cal_mask:
cmsk.read_direct(b, source_sel=np.s_[i*nCells : (i+1)*nCells, :, :])
_b = (b>0)[slice(*args.pulse_slice)].astype(bool).copy()
yield tid, _a, _b
i += 1
def parallel_read(hdf_file, args, masks, good_trains):
# global args, masks
print(hdf_file)
f = File(hdf_file)
modNo = getModNo(hdf_file)
fNo = getFileNo(hdf_file)
it = sliceTrains(f, modNo, args, good_trains)
tic = time()
mtype = bool if args.type == 'traditional' else np.int8
m = masks[modNo].astype(mtype).copy()
do_math, keys = trainMath[args.type]
if np.any(m):
try:
res = do_math(it, m)
except StopIteration:
res = None
else:
res = None
toc = time()
print(f'Done with {hdf_file:s} after {toc-tic} secs')
return (modNo, fNo), res, toc - tic
def run_analysis(args: Namespace):
home_dir = '/beegfs/desy/user'
for r in args.runs:
print(args.cal_mask)
print(args.filter_trains)
if args.out_dir is None:
proc_dir = f'xpcs/p{args.proposal:4d}/r{r:04d}/'
path = '/'.join([home_dir, args.user, proc_dir])
print(path)
else:
path = args.out_dir
path = Path(path)
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
print(path)
_, keys = trainMath[args.type]
if args.type == 'stats':
masks = np.ones( (16, 1, 512, 128) )
else:
masks = np.load(path / 'all_masks.npy').swapaxes(0,1).copy()
run = open_run(proposal=args.proposal, run=r, data='all')
if args.filter_trains:
good_trains_path = path / "good_trains.npy"
print(f"Loading good trains from {good_trains_path}")
good_trains = np.load(good_trains_path)
else:
good_trains = run.train_ids
all_files = [f.filename for f in run.files if 'AGIPD' in f.filename and 'CTRL' not in f.filename]
log = open(path / 'log.txt', 'w')
log.write(f'Starting at {str(datetime.now()):s}\n')
log.write( 'Running on ' + os.environ['HOSTNAME'] + ' with ' + str(os.cpu_count()) + ' cores \n' )
log.write( str(len(all_files)) + ' files in total \n' )
walltime = time()
print('debug is '+str(args.debug))
if args.debug:
for F in all_files:
parallel_read(F, args, masks, good_trains)
else:
from itertools import repeat
with ProcessPoolExecutor( os.cpu_count()) as exc:
res = exc.map(parallel_read, all_files, repeat(args), repeat(masks), repeat(good_trains), )
walltime = time() - walltime
log.write(f'WALL TIME is {walltime:6.1f} secs\n')
log.write(f'Finished at {str(datetime.now()):s}\n')
log.close()
values = dict()
timings = dict()
for k,v,t in res: #(modNo, fNo), res, toc - tic
if v is not None:
values[k] = v
timings[k] = t
### sum over modules
if args.agg:
print(timings)
all_data = dict()
for k in keys:
all_data[k] = dict()
for k in sorted(values):
mod, fNo = k
mod, fNo = f'mod{mod:02d}', f'file{fNo:02d}'
for j, v in zip(keys, values[k]):
if not isinstance(v, np.ndarray):
v = np.asarray(v, dtype=v[0].dtype)
if not fNo in all_data[j]:
all_data[j][fNo] = v
elif j != 'tids':
all_data[j][fNo] += v
with File(path / f'output_{runtypes[args.type]:s}_{args.type:s}.h5', 'w') as f:
for j in sorted(all_data):
dset = all_data[j]
v = np.concatenate(list(dset.values()))
d = f.create_dataset(j, shape=v.shape, dtype=v.dtype)
d[:] = v
# d = f.create_dataset('tictoc', dtype=np.float32, shape=1)
# d[:] = timings[k]
### don't sum
else:
with File(path / f'output_{runtypes[args.type]:s}_{args.type:s}.h5', 'w') as f:
for k in sorted(values):
mod, fNo = k
mod, fNo = f'mod{mod:02d}', f'file{fNo:02d}'
if mod not in f.keys():
f.create_group(mod)
m = f[mod]
if fNo not in m.keys():
m.create_group(fNo)
for j, v in zip(keys, values[k]):
if not isinstance(v, np.ndarray):
v = np.asarray(v, dtype=v[0].dtype)
d = m[fNo].create_dataset(j, shape = v.shape, dtype=v.dtype)
d[:] = v
d = m[fNo].create_dataset('tictoc', dtype=np.float32, shape=1)
d[:] = timings[k]
if __name__ == '__main__':
args = pars.parse_args()
run_analysis(args)
import os, sys
from Cython.Build import build_ext, cythonize
from setuptools import setup, Extension, find_packages
def ext_modules():
import numpy as np
include_dirs = ['.', np.get_include()]
root_dir = os.path.abspath(os.path.dirname(__file__))
root_dir += "/fastXPCS/cython/"
dense_ext = Extension(
"dense",
sources=[root_dir + "dense.pyx"],
include_dirs=include_dirs,
)
sparse_ext = Extension(
"fxpcs_sparse",
sources=[root_dir + "fxpcs_sparse.pyx"],
include_dirs=include_dirs
)
wf_ext = Extension(
"welford",
sources=[root_dir + "welford.pyx"],
include_dirs=include_dirs
)
exts = [dense_ext, sparse_ext, wf_ext]
return cythonize(exts, language_level=3)
REQUIREMENTS = [
"h5py",
"numpy",
"cython",
]
setup(
name="fastXPCS",
packages=find_packages(),
ext_package="fastXPCS",
ext_modules=ext_modules(),
cmdclass={"build_ext" : build_ext},
zip_safe=False,
install_requires=REQUIREMENTS
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment