diff --git a/src/toolbox_scs/detectors/dssc_data.py b/src/toolbox_scs/detectors/dssc_data.py index baa16d9d808cae857d526ef80e991f590e3a94f6..f38afe6148c2ff7316cbcff5add0023cb593b1f7 100644 --- a/src/toolbox_scs/detectors/dssc_data.py +++ b/src/toolbox_scs/detectors/dssc_data.py @@ -19,8 +19,8 @@ def _to_netcdf(fname, data, group, mode): try: data.to_netcdf(fname, group=group, mode='a', engine='h5netcdf') log.info(f"Created group {group} in file {fname}") - except ValueError: - msg = "Group exists and has incompatible dimensions." + except (ValueError, TypeError): + msg = f"Group {group} exists and has incompatible dimensions." log.warning(f"Could not store data: {msg}") raise ToolBoxFileError(msg, fname) else: @@ -90,9 +90,11 @@ def load_xarray(fname, group='data', form='dataset'): f_exists = os.path.isfile(fname) if f_exists: if form == 'dataset': - return xr.open_dataset(fname, group, engine='h5netcdf') + log.debug(f'open xarray dataset {fname}') + return xr.load_dataset(fname, group=group, engine='h5netcdf') elif form == 'array': - return xr.open_dataarray(fname, group, engine='h5netcdf') + log.debug(f'open xarray dataarray {fname}') + return xr.load_dataarray(fname, group=group, engine='h5netcdf') else: msg = "File does not exists." raise ToolBoxFileError(msg, fname) diff --git a/src/toolbox_scs/test/test_dssc_cls.py b/src/toolbox_scs/test/test_dssc_cls.py index a2557ba7e8f1a4668bd781fe4c354cf4c1d9f407..746529fa7b6a19dafbe692f09dde0d175386e192 100644 --- a/src/toolbox_scs/test/test_dssc_cls.py +++ b/src/toolbox_scs/test/test_dssc_cls.py @@ -3,7 +3,6 @@ import logging import os import argparse import shutil -import multiprocessing from time import strftime import numpy as np @@ -22,12 +21,6 @@ suites = {"no-processing": ( "processing-quick": ( "test_binning_quick", ), - "processing-full-multiprocessing": ( - "test_binning_multiprocessing", - ), - "processing-full-joblib": ( - "test_binning_joblib", - ), "processing-full": ( "test_binning_xgm", ) @@ -126,57 +119,14 @@ class TestDSSC(unittest.TestCase): bins_pulse) binners = {'trainId': binner1, 'pulse': binner2} run232 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners) - mod_list = [2,8] - data = run232.bin_data(use_joblib=False, modules=mod_list) + mod_list = [2,15] + data = run232.bin_data(modules=mod_list) self.assertIsNotNone(data.data) - data = run232.bin_data(use_joblib=True, modules=mod_list) + data = run232.bin_data(backend='multiprocessing', modules=mod_list) self.assertIsNotNone(data.data) - tbdet.save_to_file(data, './tmp/run232.h5') - data = tbdet.load_from_file('./tmp/run232.h5') - self.assertIsNotNone(data.data) - - - def test_binning_multiprocessing(self): - proposal_nb = 2212 - run_nb = 235 - run = tb.load_run(proposal_nb, run_nb, include='*DA*') - run_info = tbdet.load_dssc_info(proposal_nb, run_nb) - bins_trainId = tb.get_array(run, - 'PP800_PhaseShifter', - 0.03) - bins_pulse = ['pumped', 'unpumped'] * 10 - - binner1 = tbdet.create_dssc_bins("trainId", - run_info['trainIds'], - bins_trainId.values) - binner2 = tbdet.create_dssc_bins("pulse", - np.linspace(0,19,20, dtype=int), - bins_pulse) - binners = {'trainId': binner1, 'pulse': binner2} - run235 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners) - data = run235.bin_data(use_joblib=False) - self.assertIsNotNone(data.data) - - def test_binning_joblib(self): - proposal_nb = 2212 - run_nb = 235 - run = tb.load_run(proposal_nb, run_nb, include='*DA*') - run_info = tbdet.load_dssc_info(proposal_nb, run_nb) - bins_trainId = tb.get_array(run, - 'PP800_PhaseShifter', - 0.03) - bins_pulse = ['pumped', 'unpumped'] * 10 - - binner1 = tbdet.create_dssc_bins("trainId", - run_info['trainIds'], - bins_trainId.values) - binner2 = tbdet.create_dssc_bins("pulse", - np.linspace(0,19,20, dtype=int), - bins_pulse) - binners = {'trainId': binner1, 'pulse': binner2} - run235 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners) - data = run235.bin_data(use_joblib=True) + tbdet.save_xarray(cls._module_data, './tmp/run235.h5') + data = tbdet.load_xarray('./tmp/run232.h5') self.assertIsNotNone(data.data) @@ -207,7 +157,7 @@ class TestDSSC(unittest.TestCase): } run235 = tbdet.DSSCBinner(proposal_nb, run_nb, **params) - data = run235.bin_data(use_joblib=True, modules=[3]) + data = run235.bin_data(backend='multiprocessing', modules=[3]) self.assertIsNotNone(data.data) xgm_binned = run235.get_xgm_binned() diff --git a/src/toolbox_scs/test/test_dssc_methods.py b/src/toolbox_scs/test/test_dssc_methods.py index 4bd9061e08b1cad35bf3ba534900c5c750d2f3eb..d5c804a27799556f509cd07b3d901ffeb9c2392f 100644 --- a/src/toolbox_scs/test/test_dssc_methods.py +++ b/src/toolbox_scs/test/test_dssc_methods.py @@ -3,7 +3,7 @@ import logging import os import argparse import shutil -import multiprocessing +import joblib from time import strftime import numpy as np @@ -238,11 +238,15 @@ class TestDSSC(unittest.TestCase): cls = self.__class__ chunksize = 512 - print('processing', chunksize, 'trains per chunk') - - jobs = [] - for m in range(2): - jobs.append(dict( + backend = 'multiprocessing' + mod_list = [1, 15] + n_jobs = len(mod_list) + log_root.info(f'processing {chunksize} trains per chunk') + log_root.info(f'using parallelization backend {backend}') + + module_jobs = [] + for m in mod_list: + module_jobs.append(dict( proposal=2212, run_nr=235, module=m, @@ -252,31 +256,38 @@ class TestDSSC(unittest.TestCase): pulsemask=cls._pulsemask )) - print(f'start processing modules:', strftime('%X')) + print('start processing modules:', strftime('%X')) - with multiprocessing.Pool(16) as pool: - module_data = pool.map(tbdet.bin_data_multipr, jobs) + data = joblib.Parallel(n_jobs=n_jobs, backend=backend) \ + (joblib.delayed(tbdet.bin_data)(**module_jobs[i]) \ + for i in range(len(mod_list))) print('finished processing modules:', strftime('%X')) - module_data = xr.concat(module_data, dim='module') - print(module_data) - self.assertIsNotNone(module_data) + data = xr.concat(data, dim='module') + print(data) + self.assertIsNotNone(data) def test_processmodule2(self): cls = self.__class__ chunksize = 512 + backend = 'multiprocessing' + mod_list = [15, 3] + n_jobs = len(mod_list) + log_root.info(f'processing {chunksize} trains per chunk') + log_root.info(f'using parallelization backend {backend}') + info = tbdet.load_dssc_info(2212, 89) bin1 = tbdet.create_dssc_bins("trainId", info['trainIds'], np.ones(1691)) binners = {'trainId': bin1} - jobs = [] - for m in range(2): - jobs.append(dict( - proposal=cls._proposal, + module_jobs = [] + for m in mod_list: + module_jobs.append(dict( + proposal=2212, run_nr=89, module=m, chunksize=chunksize, @@ -284,29 +295,33 @@ class TestDSSC(unittest.TestCase): dssc_binners=binners, )) - with multiprocessing.Pool(16) as pool: - module_data = pool.map(tbdet.bin_data_multipr, jobs) - + print('start processing modules:', strftime('%X')) + data = joblib.Parallel(n_jobs=n_jobs, backend=backend) \ + (joblib.delayed(tbdet.bin_data)(**module_jobs[i]) \ + for i in range(len(mod_list))) print('finished processing modules:', strftime('%X')) - module_data = xr.concat(module_data, dim='module') - module_data = module_data.squeeze() - print(module_data) - self.assertIsNotNone(module_data) + + data = xr.concat(data, dim='module') + data = data.squeeze() + print(data) + self.assertIsNotNone(data) def test_storage(self): cls = self.__class__ - tbdet.save_to_file(cls._module_data, './tmp/run235.h5') - tbdet.save_to_file(cls._module_data, './tmp/run235.h5', - overwrite = True) - run235 = tbdet.load_from_file('./tmp/run235.h5') + tbdet.save_xarray('./tmp/run235.h5', cls._module_data) + tbdet.save_xarray('./tmp/run235.h5', cls._module_data, + mode = 'w') + run235 = tbdet.load_xarray('./tmp/run235.h5') self.assertIsNotNone(run235) + #run235.close() with self.assertRaises(ToolBoxFileError) as cm: - tbdet.save_to_file(cls._module_data, './tmp/run235.h5', - overwrite = False) + tbdet.save_xarray('./tmp/run235.h5', + cls._module_data.isel(pulse=0), + mode = 'a') def list_suites(): print("\nPossible test suites:\n" + "-" * 79)