diff --git a/DSSC.py b/DSSC.py index 856ba47e8dc7ef98df7d7b3ba082fcebc5c801f7..6bda4b3ba85dc3354672b07eef49d3dbf77ea0e9 100644 --- a/DSSC.py +++ b/DSSC.py @@ -1,4 +1,4 @@ -import multiprocessing +from joblib import Parallel, delayed, parallel_backend from time import strftime import tempfile import shutil @@ -7,9 +7,9 @@ import os import warnings import psutil -import karabo_data as kd -from karabo_data.read_machinery import find_proposal -from karabo_data.geometry2 import DSSC_1MGeometry +import extra_data as ed +from extra_data.read_machinery import find_proposal +from extra_geom import DSSC_1MGeometry import ToolBox as tb import matplotlib.pyplot as plt from mpl_toolkits.axes_grid1 import ImageGrid @@ -43,8 +43,9 @@ class DSSC: self.aspect = self.px_pitch_v/self.px_pitch_h # aspect ratio of the DSSC images self.geom = None self.mask = None - self.max_fraction_memory = 0.8 + self.max_fraction_memory = 0.4 self.filter_mask = None + self.Nworker = 16 print('DSSC configuration') print(f'Topic: {self.topic}') @@ -62,7 +63,7 @@ class DSSC: shutil.rmtree(self.tempdir) def open_run(self, run_nr, isDark=False): - """ Open a run with karabo-data and prepare the virtual dataset for multiprocessing + """ Open a run with extra-data and prepare the virtual dataset for multiprocessing inputs: run_nr: the run number @@ -70,13 +71,12 @@ class DSSC: """ - print('Opening run data with karabo-data') + print('Opening run data with extra-data') self.run_nr = run_nr self.xgm = None self.filter_mask = None - self.scan_vname = None - - self.run = kd.open_run(self.proposal, self.run_nr) + + self.run = ed.open_run(self.proposal, self.run_nr) self.isDark = isDark self.plot_title = f'{self.proposal} run: {self.run_nr}' @@ -100,14 +100,14 @@ class DSSC: print(f'Temporary directory: {self.tempdir}') print('Creating virtual dataset') - self.vdslist = self.create_virtual_dssc_datasets(self.run, path=self.tempdir) + self.vds_filenames = self.create_virtual_dssc_datasets(self.run, path=self.tempdir) # create a dummy scan variable for dark run # for other type or run, use DSSC.define_run function to overwrite it self.scan = xr.DataArray(np.ones_like(self.run.train_ids), dims=['trainId'], - coords={'trainId': self.run.train_ids}) + coords={'trainId': self.run.train_ids}).to_dataset( + name='scan_variable') self.scan_vname = 'dummy' - self.vds_scan = None def define_scan(self, vname, bins): """ @@ -120,29 +120,24 @@ class DSSC: """ if type(vname) is dict: - self.scan = self.run.get_array(vname['source'], vname['key']) + scan = self.run.get_array(vname['source'], vname['key']) elif type(vname) is str: if vname not in tb.mnemonics: raise ValueError(f'{vname} not found in the ToolBox mnemonics table') - self.scan = self.run.get_array(tb.mnemonics[vname]['source'], tb.mnemonics[vname]['key']) + scan = self.run.get_array(tb.mnemonics[vname]['source'], tb.mnemonics[vname]['key']) else: raise ValueError(f'vname should be a string or a dict. We got {type(vname)}') if (type(bins) is int) or (type(bins) is float): - self.scan = bins * np.round(self.scan / bins) + scan = bins * np.round(scan / bins) else: # TODO: digitize the data raise ValueError(f'To be implemented') self.scan_vname = vname - - self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5') - if os.path.isfile(self.vds_scan): - os.remove(self.vds_scan) - - self.scan = self.scan.to_dataset(name='scan_variable') + + self.scan = scan.to_dataset(name='scan_variable') self.scan['xgm_pumped'] = self.xgm[:, :self.nbunches:2].mean('dim_0') self.scan['xgm_unpumped'] = self.xgm[:, 1:self.nbunches:2].mean('dim_0') - self.scan.to_netcdf(self.vds_scan, group='data') self.scan_counts = xr.DataArray(np.ones(len(self.scan['scan_variable'])), dims=['scan_variable'], @@ -176,7 +171,7 @@ class DSSC: """ if self.xgm is None: self.xgm = self.run.get_array(tb.mnemonics['SCS_SA3']['source'], - tb.mnemonics['SCS_SA3']['key'], roi=kd.by_index[:self.nbunches]) + tb.mnemonics['SCS_SA3']['key'], roi=ed.by_index[:self.nbunches]) def plot_xgm_hist(self, nbins=100): """ Plots an histogram of the SCS XGM dedicated SAS3 data. @@ -231,17 +226,28 @@ class DSSC: print((f'Rejecting {nrejected} out of {len(self.run.train_ids)} trains due to xgm ' f'thresholds: [{self.xgm_low}, {self.xgm_high}]')) - def load_geom(self): + def load_geom(self, geopath=None, quad_pos=None): """ Loads and return the DSSC geometry. + + inputs: + geopath: path to the h5 geometry file. If None uses a default file. + quad_pos: list of quadrants tuple position. If None uses a default position. + + output: + return the loaded geometry """ - quad_pos = [ - (-124.100, 3.112), # TR - (-133.068, -110.604), # BR - ( 0.988, -125.236), # BL - ( 4.528, -4.912) # TL - ] - path = '/gpfs/exfel/sw/software/exfel_environments/misc/git/karabo_data/docs/dssc_geo_june19.h5' - self.geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(path, quad_pos) + if quad_pos is None: + quad_pos = [(-124.100, 3.112), # TR + (-133.068, -110.604), # BR + ( 0.988, -125.236), # BL + ( 4.528, -4.912) # TL + ] + + if geopath is None: + geopath = '/gpfs/exfel/sw/software/git/EXtra-geom/docs/dssc_geo_june19.h5' + + self.geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(geopath, quad_pos) + return self.geom def load_mask(self, fname, plot=True): @@ -266,39 +272,39 @@ class DSSC: """ Create virtual datasets for each 16 DSSC modules used for the multiprocessing. input: - run: karabo-data run + run: extra-data run path: string where the virtual files are created + output: + dictionnary of key:module, value:virtual dataset filename """ - vds_list = [] - for m in tqdm(range(16)): - vds_filename = os.path.join(path, f'dssc{m}_vds.h5') - if os.path.isfile(vds_filename): - os.remove(vds_filename) - module_vds = run.get_virtual_dataset(f'SCS_DET_DSSC1M-1/DET/{m}CH0:xtdf', - 'image.data', filename=vds_filename) - vds_list.append([vds_filename, module_vds]) - return vds_list + vds_filenames = {} + + for module in tqdm(range(16)): + fname = os.path.join(path, f'dssc{module}_vds.h5') + if os.path.isfile(fname): + os.remove(fname) + + vds = run.get_virtual_dataset(f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf', + 'image.data', filename=fname) + + vds.file.close() # keep h5 file closed outside 'with' context + + vds_filenames[module] = fname + + return vds_filenames def binning(self, do_pulse_mean=True): """ Bin the DSSC data by the predifined scan type (DSSC.define()) using multiprocessing """ - if self.vds_scan is None: - # probably a dark run with a dummy scan variable - self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5') - if os.path.isfile(self.vds_scan): - os.remove(self.vds_scan) - - self.scan = self.scan.to_dataset(name='scan_variable') - self.scan.to_netcdf(self.vds_scan, group='data') # get available memory in GB, we will try to use 80 % of it max_GB = psutil.virtual_memory().available/1024**3 print(f'max available memory: {max_GB} GB') - # max_GB / (8byte * 16modules * 128px * 512px * N_pulses) - self.chunksize = int(self.max_fraction_memory*max_GB * 1024**3 // (8 * 16 * 128 * 512 * self.fpt)) + # max_GB / (8byte * Nworker * 128px * 512px * N_pulses) + self.chunksize = int(self.max_fraction_memory*max_GB * 1024**3 // (8 * self.Nworker * 128 * 512 * self.fpt)) print('processing', self.chunksize, 'trains per chunk') @@ -307,20 +313,29 @@ class DSSC: jobs.append(dict( module=m, fpt=self.fpt, - vdf_module=os.path.join(self.tempdir, f'dssc{m}_vds.h5'), + vds=self.vds_filenames[m], chunksize=self.chunksize, - vdf_scan=self.vds_scan, + scan=self.scan['scan_variable'], nbunches=self.nbunches, run_nr=self.run_nr, do_pulse_mean=do_pulse_mean )) + if self.Nworker != 16: + with warnings.catch_warnings(): + warnings.simplefilter("default") + warnings.warn(('Nworker other than 16 known to cause issue' + + '(https://in.xfel.eu/gitlab/SCS/ToolBox/merge_requests/76)'), + RuntimeWarning) + timestamp = strftime('%X') print(f'start time: {timestamp}') - with multiprocessing.Pool(16) as pool: - module_data = pool.map(process_one_module, jobs) - + with parallel_backend('loky', n_jobs=self.Nworker): + module_data = Parallel(verbose=20)( + delayed(process_one_module)(job) for job in tqdm(jobs) + ) + print('finished:', strftime('%X')) # rearange the multiprocessed data @@ -352,9 +367,9 @@ class DSSC: save_folder = self.save_folder if self.isDark: - fname = f'run{self.run_nr}_dark.h5' # no scan + fname = f'run{self.run_nr}_dark.nc' # no scan else: - fname = f'run{self.run_nr}.h5' # run with delay scan (change for other scan types!) + fname = f'run{self.run_nr}.nc' # run with delay scan (change for other scan types!) save_path = os.path.join(save_folder, fname) @@ -365,6 +380,7 @@ class DSSC: warnings.warn(f'Overwriting file: {save_path}') os.remove(save_path) self.module_data.to_netcdf(save_path, group='data') + self.module_data.close() os.chmod(save_path, 0o664) print('saving: ', save_path) else: @@ -385,8 +401,10 @@ class DSSC: self.plot_title = f'{self.proposal} run: {runNB} dark: {dark_runNB}' - dark = xr.open_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data') - binned = xr.open_dataset(os.path.join(save_folder, f'run{runNB}.h5'), group='data') + dark = xr.load_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.nc'), group='data', + engine='netcdf4') + binned = xr.load_dataset(os.path.join(save_folder, f'run{runNB}.nc'), group='data', + engine='netcdf4') binned['pumped'] = (binned['pumped'] - dark['pumped'].values) binned['unpumped'] = (binned['unpumped'] - dark['unpumped'].values) @@ -592,22 +610,19 @@ class DSSC: def process_one_module(job): module = job['module'] fpt = job['fpt'] - data_vdf = job['vdf_module'] - scan_vdf = job['vdf_scan'] + vds = job['vds'] + scan = job['scan'] chunksize = job['chunksize'] nbunches = job['nbunches'] do_pulse_mean = job['do_pulse_mean'] image_path = f'INSTRUMENT/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/data' npulse_path = f'INDEX/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/count' - with h5py.File(data_vdf, 'r') as m: + with h5py.File(vds, 'r') as m: all_trainIds = m['INDEX/trainId'][()] - n_trains = len(all_trainIds) - chunk_start = np.arange(n_trains, step=chunksize, dtype=int) + frames_per_train = m[npulse_path][()] + trains_with_data = all_trainIds[frames_per_train == fpt] - # load scan variable - scan = xr.open_dataset(scan_vdf, group='data')['scan_variable'] - scan.name = 'scan' len_scan = len(scan.groupby(scan)) if do_pulse_mean: @@ -631,14 +646,7 @@ def process_one_module(job): module_data['module'] = module # crunching - with h5py.File(data_vdf, 'r') as m: - #fpt_calc = int(len(m[image_path]) / n_trains) - #assert fpt_calc == fpt, f'data length does not match expected value (module {module})' - all_trainIds = m['INDEX/trainId'][()] - frames_per_train = m[npulse_path][()] - trains_with_data = all_trainIds[frames_per_train == fpt] - #print(np.unique(pulses_per_train), '/', fpt) - #print(len(trains_with_data)) + with h5py.File(vds, 'r') as m: chunk_start = np.arange(len(all_trainIds), step=chunksize, dtype=int) trains_start = 0 diff --git a/DSSC1module.py b/DSSC1module.py index b053da1b2efd3e29135648eb3ef79480c092c4e9..eea4232590bfded8f7c13a1c9ac7b9baeebd0147 100644 --- a/DSSC1module.py +++ b/DSSC1module.py @@ -5,8 +5,8 @@ import os import warnings import psutil -import karabo_data as kd -from karabo_data.read_machinery import find_proposal +import extra_data as ed +from extra_data.read_machinery import find_proposal import ToolBox as tb import matplotlib.pyplot as plt from mpl_toolkits.axes_grid1 import ImageGrid @@ -58,18 +58,18 @@ class DSSC1module: self.maxSaturatedPixel = 1 def open_run(self, run_nr, t0=0.0): - """ Open a run with karabo-data and prepare the virtual dataset for multiprocessing + """ Open a run with extra-data and prepare the virtual dataset for multiprocessing inputs: run_nr: the run number t0: optional t0 in mm """ - print('Opening run data with karabo-data') + print('Opening run data with extra-data') self.run_nr = run_nr self.xgm = None - self.run = kd.open_run(self.proposal, self.run_nr) + self.run = ed.open_run(self.proposal, self.run_nr) self.plot_title = f'{self.proposal} run: {self.run_nr}' self.fpt = self.run.detector_info(f'SCS_DET_DSSC1M-1/DET/{self.module}CH0:xtdf')['frames_per_train'] @@ -92,7 +92,7 @@ class DSSC1module: print(f'Loading XGM data') self.xgm = self.run.get_array(tb.mnemonics['SCS_SA3']['source'], tb.mnemonics['SCS_SA3']['key'], - roi=kd.by_index[:self.nbunches]) + roi=ed.by_index[:self.nbunches]) self.xgm = self.xgm.rename({'dim_0':'pulseId'}) self.xgm['pulseId'] = np.arange(0, 2*self.nbunches, 2) @@ -290,6 +290,7 @@ class DSSC1module: warnings.warn(f'Overwriting file: {save_path}') os.remove(save_path) data.to_netcdf(save_path, group='data') + data.close() os.chmod(save_path, 0o664) print('saving: ', save_path) else: @@ -306,7 +307,7 @@ class DSSC1module: save_folder = self.save_folder self.run_nr = dark_runNB - self.dark_data = xr.open_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data') + self.dark_data = xr.load_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data') self.plot_title = f"{self.proposal} dark: {self.dark_data['run'].values}" def show_rois(self): @@ -422,4 +423,4 @@ def process_one_module(job): module_data['std_data'] += (temp**2).sum(dim='trainId') module_data['counts'] += n_trains - return module_data \ No newline at end of file + return module_data diff --git a/FastCCD.py b/FastCCD.py index 1f4124c5081a6f57cd077dbdbbfebb19da6a1e50..e919dc131aaaba49c0a8f80487e55cb22302b539 100644 --- a/FastCCD.py +++ b/FastCCD.py @@ -7,8 +7,8 @@ import os import warnings import psutil -import karabo_data as kd -from karabo_data.read_machinery import find_proposal +import extra_data as ed +from extra_data.read_machinery import find_proposal import ToolBox as tb import matplotlib.pyplot as plt from mpl_toolkits.axes_grid1 import ImageGrid @@ -72,7 +72,7 @@ class FastCCD: shutil.rmtree(self.tempdir) def open_run(self, run_nr, isDark=False, t0=0.0): - """ Open a run with karabo-data and prepare the virtual dataset for multiprocessing + """ Open a run with extra-data and prepare the virtual dataset for multiprocessing inputs: run_nr: the run number @@ -80,11 +80,11 @@ class FastCCD: t0: optional t0 in mm """ - print('Opening run data with karabo-data') + print('Opening run data with extra-data') self.run_nr = run_nr self.xgm = None - self.run = kd.open_run(self.proposal, self.run_nr) + self.run = ed.open_run(self.proposal, self.run_nr) self.plot_title = f'{self.proposal} run: {self.run_nr}' self.isDark = isDark self.fpt = 1 @@ -108,7 +108,7 @@ class FastCCD: try: self.xgm = self.run.get_array(tb.mnemonics['SCS_SA3']['source'], tb.mnemonics['SCS_SA3']['key'], - roi=kd.by_index[:self.nbunches]) + roi=ed.by_index[:self.nbunches]) self.xgm = self.xgm.squeeze() # remove the pulseId dimension since XGM should have only 1 value per train except: self.xgm = xr.DataArray(np.zeros_like(self.run.train_ids),dims = 'trainId', coords = {"trainId":self.run.train_ids}) @@ -369,6 +369,7 @@ class FastCCD: warnings.warn(f'Overwriting file: {save_path}') os.remove(save_path) self.module_data.to_netcdf(save_path, group='data') + self.module_data.close() os.chmod(save_path, 0o664) print('saving: ', save_path) else: @@ -389,10 +390,10 @@ class FastCCD: self.plot_title = f'{self.proposal} run: {runNB} dark: {dark_runNB}' - binned = xr.open_dataset(os.path.join(save_folder, f'run{runNB}.h5'), group='data', cache=False) + binned = xr.load_dataset(os.path.join(save_folder, f'run{runNB}.h5'), group='data') if dark_runNB is not None: - dark = xr.open_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data', cache=False) + dark = xr.load_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data') binned['pumped'] = self.gain*(binned['pumped'] - dark['pumped'].squeeze(drop=True)) binned['unpumped'] = self.gain*(binned['unpumped'] - dark['unpumped'].squeeze(drop=True)) @@ -630,7 +631,7 @@ def process_one_module(job): data_pumped = ds.where(ds['FastADC5'] > 0, drop=True).groupby('scan_variable').sum('trainId') data_unpumped = ds.where(ds['FastADC5'] < 1, drop=True).groupby('scan_variable').sum('trainId') - module_data = data_pumped['fastccd'].to_dataset('pumped') + module_data = data_pumped['fastccd'].to_dataset(name='pumped') module_data['unpumped'] = data_unpumped['fastccd'] module_data['sum_count_pumped'] = data_pumped['sum_count'] module_data['sum_count_unpumped'] = data_unpumped['sum_count'] @@ -638,4 +639,4 @@ def process_one_module(job): module_data['xgm_unpumped'] = data_unpumped['xgm'] module_data['workerId'] = workerId - return module_data \ No newline at end of file + return module_data diff --git a/Load.py b/Load.py index 70b0c78710685e14bd9c763f56bc9d38a7d73de1..3b001795afa82e6220dcd8c7b10e8a91c96d770e 100644 --- a/Load.py +++ b/Load.py @@ -6,8 +6,8 @@ Copyright (2019) SCS Team. """ import numpy as np -from karabo_data import by_index, RunDirectory -from karabo_data.read_machinery import find_proposal +from extra_data import by_index, RunDirectory +from extra_data.read_machinery import find_proposal import xarray as xr import os from ToolBox.bunch_pattern import extractBunchPattern @@ -421,7 +421,7 @@ def load(fields, runNB, proposalNB, subFolder='raw', display=False, validate=Fal subFolder: (str) sub-folder from which to load the data. Use 'raw' for raw data or 'proc' for processed data. display: (bool) whether to show the run.info or not - validate: (bool) whether to run karabo-data-validate or not + validate: (bool) whether to run extra-data-validate or not subset: a subset of train that can be load with by_index[:5] for the first 5 trains rois: a dictionnary of mnemonics with a list of rois definition and the desired @@ -444,7 +444,7 @@ def load(fields, runNB, proposalNB, subFolder='raw', display=False, validate=Fal run = RunDirectory(runFolder).select_trains(subset) if validate: - get_ipython().system('karabo-data-validate ' + runFolder) + get_ipython().system('extra-data-validate ' + runFolder) if display: print('Loading data from {}'.format(runFolder)) run.info() diff --git a/bunch_pattern.py b/bunch_pattern.py index d2df8f5fe041f335bc79ac3905f468161729f1b2..0ff19613a89ff589d24c3a4050e4d75d84f6361c 100644 --- a/bunch_pattern.py +++ b/bunch_pattern.py @@ -22,7 +22,7 @@ def extractBunchPattern(bp_table=None, key='sase3', runDir=None): bp_table: DataArray corresponding to the mnemonics "bunchPatternTable". If None, the bunch pattern table is loaded using runDir. key: str, ['sase1', 'sase2', 'sase3', 'scs_ppl'] - runDir: karabo_data run directory. Required only if bp_table is None. + runDir: extra-data DataCollection. Required only if bp_table is None. Outputs: bunchPattern: DataArray containing indices of the sase/laser pulses for @@ -255,4 +255,4 @@ def sortBAMdata(data, key='scs_ppl', sa3Offset=0): ndata = xr.merge(mergeList, join='inner') for k in data.attrs.keys(): ndata.attrs[k] = data.attrs[k] - return ndata \ No newline at end of file + return ndata