Skip to content
Snippets Groups Projects
Commit f3115a6a authored by Rafael Gort's avatar Rafael Gort
Browse files

Updated dssc test suites to new structure

parent 10fe1725
Branches DevelopmentRG
No related tags found
No related merge requests found
......@@ -19,8 +19,8 @@ def _to_netcdf(fname, data, group, mode):
try:
data.to_netcdf(fname, group=group, mode='a', engine='h5netcdf')
log.info(f"Created group {group} in file {fname}")
except ValueError:
msg = "Group exists and has incompatible dimensions."
except (ValueError, TypeError):
msg = f"Group {group} exists and has incompatible dimensions."
log.warning(f"Could not store data: {msg}")
raise ToolBoxFileError(msg, fname)
else:
......@@ -90,9 +90,11 @@ def load_xarray(fname, group='data', form='dataset'):
f_exists = os.path.isfile(fname)
if f_exists:
if form == 'dataset':
return xr.open_dataset(fname, group, engine='h5netcdf')
log.debug(f'open xarray dataset {fname}')
return xr.load_dataset(fname, group=group, engine='h5netcdf')
elif form == 'array':
return xr.open_dataarray(fname, group, engine='h5netcdf')
log.debug(f'open xarray dataarray {fname}')
return xr.load_dataarray(fname, group=group, engine='h5netcdf')
else:
msg = "File does not exists."
raise ToolBoxFileError(msg, fname)
......
......@@ -3,7 +3,6 @@ import logging
import os
import argparse
import shutil
import multiprocessing
from time import strftime
import numpy as np
......@@ -22,12 +21,6 @@ suites = {"no-processing": (
"processing-quick": (
"test_binning_quick",
),
"processing-full-multiprocessing": (
"test_binning_multiprocessing",
),
"processing-full-joblib": (
"test_binning_joblib",
),
"processing-full": (
"test_binning_xgm",
)
......@@ -126,57 +119,14 @@ class TestDSSC(unittest.TestCase):
bins_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
run232 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
mod_list = [2,8]
data = run232.bin_data(use_joblib=False, modules=mod_list)
mod_list = [2,15]
data = run232.bin_data(modules=mod_list)
self.assertIsNotNone(data.data)
data = run232.bin_data(use_joblib=True, modules=mod_list)
data = run232.bin_data(backend='multiprocessing', modules=mod_list)
self.assertIsNotNone(data.data)
tbdet.save_to_file(data, './tmp/run232.h5')
data = tbdet.load_from_file('./tmp/run232.h5')
self.assertIsNotNone(data.data)
def test_binning_multiprocessing(self):
proposal_nb = 2212
run_nb = 235
run = tb.load_run(proposal_nb, run_nb, include='*DA*')
run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
bins_trainId = tb.get_array(run,
'PP800_PhaseShifter',
0.03)
bins_pulse = ['pumped', 'unpumped'] * 10
binner1 = tbdet.create_dssc_bins("trainId",
run_info['trainIds'],
bins_trainId.values)
binner2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,19,20, dtype=int),
bins_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
run235 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
data = run235.bin_data(use_joblib=False)
self.assertIsNotNone(data.data)
def test_binning_joblib(self):
proposal_nb = 2212
run_nb = 235
run = tb.load_run(proposal_nb, run_nb, include='*DA*')
run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
bins_trainId = tb.get_array(run,
'PP800_PhaseShifter',
0.03)
bins_pulse = ['pumped', 'unpumped'] * 10
binner1 = tbdet.create_dssc_bins("trainId",
run_info['trainIds'],
bins_trainId.values)
binner2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,19,20, dtype=int),
bins_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
run235 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
data = run235.bin_data(use_joblib=True)
tbdet.save_xarray(cls._module_data, './tmp/run235.h5')
data = tbdet.load_xarray('./tmp/run232.h5')
self.assertIsNotNone(data.data)
......@@ -207,7 +157,7 @@ class TestDSSC(unittest.TestCase):
}
run235 = tbdet.DSSCBinner(proposal_nb, run_nb, **params)
data = run235.bin_data(use_joblib=True, modules=[3])
data = run235.bin_data(backend='multiprocessing', modules=[3])
self.assertIsNotNone(data.data)
xgm_binned = run235.get_xgm_binned()
......
......@@ -3,7 +3,7 @@ import logging
import os
import argparse
import shutil
import multiprocessing
import joblib
from time import strftime
import numpy as np
......@@ -238,11 +238,15 @@ class TestDSSC(unittest.TestCase):
cls = self.__class__
chunksize = 512
print('processing', chunksize, 'trains per chunk')
jobs = []
for m in range(2):
jobs.append(dict(
backend = 'multiprocessing'
mod_list = [1, 15]
n_jobs = len(mod_list)
log_root.info(f'processing {chunksize} trains per chunk')
log_root.info(f'using parallelization backend {backend}')
module_jobs = []
for m in mod_list:
module_jobs.append(dict(
proposal=2212,
run_nr=235,
module=m,
......@@ -252,31 +256,38 @@ class TestDSSC(unittest.TestCase):
pulsemask=cls._pulsemask
))
print(f'start processing modules:', strftime('%X'))
print('start processing modules:', strftime('%X'))
with multiprocessing.Pool(16) as pool:
module_data = pool.map(tbdet.bin_data_multipr, jobs)
data = joblib.Parallel(n_jobs=n_jobs, backend=backend) \
(joblib.delayed(tbdet.bin_data)(**module_jobs[i]) \
for i in range(len(mod_list)))
print('finished processing modules:', strftime('%X'))
module_data = xr.concat(module_data, dim='module')
print(module_data)
self.assertIsNotNone(module_data)
data = xr.concat(data, dim='module')
print(data)
self.assertIsNotNone(data)
def test_processmodule2(self):
cls = self.__class__
chunksize = 512
backend = 'multiprocessing'
mod_list = [15, 3]
n_jobs = len(mod_list)
log_root.info(f'processing {chunksize} trains per chunk')
log_root.info(f'using parallelization backend {backend}')
info = tbdet.load_dssc_info(2212, 89)
bin1 = tbdet.create_dssc_bins("trainId",
info['trainIds'],
np.ones(1691))
binners = {'trainId': bin1}
jobs = []
for m in range(2):
jobs.append(dict(
proposal=cls._proposal,
module_jobs = []
for m in mod_list:
module_jobs.append(dict(
proposal=2212,
run_nr=89,
module=m,
chunksize=chunksize,
......@@ -284,29 +295,33 @@ class TestDSSC(unittest.TestCase):
dssc_binners=binners,
))
with multiprocessing.Pool(16) as pool:
module_data = pool.map(tbdet.bin_data_multipr, jobs)
print('start processing modules:', strftime('%X'))
data = joblib.Parallel(n_jobs=n_jobs, backend=backend) \
(joblib.delayed(tbdet.bin_data)(**module_jobs[i]) \
for i in range(len(mod_list)))
print('finished processing modules:', strftime('%X'))
module_data = xr.concat(module_data, dim='module')
module_data = module_data.squeeze()
print(module_data)
self.assertIsNotNone(module_data)
data = xr.concat(data, dim='module')
data = data.squeeze()
print(data)
self.assertIsNotNone(data)
def test_storage(self):
cls = self.__class__
tbdet.save_to_file(cls._module_data, './tmp/run235.h5')
tbdet.save_to_file(cls._module_data, './tmp/run235.h5',
overwrite = True)
run235 = tbdet.load_from_file('./tmp/run235.h5')
tbdet.save_xarray('./tmp/run235.h5', cls._module_data)
tbdet.save_xarray('./tmp/run235.h5', cls._module_data,
mode = 'w')
run235 = tbdet.load_xarray('./tmp/run235.h5')
self.assertIsNotNone(run235)
#run235.close()
with self.assertRaises(ToolBoxFileError) as cm:
tbdet.save_to_file(cls._module_data, './tmp/run235.h5',
overwrite = False)
tbdet.save_xarray('./tmp/run235.h5',
cls._module_data.isel(pulse=0),
mode = 'a')
def list_suites():
print("\nPossible test suites:\n" + "-" * 79)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment