Skip to content
Snippets Groups Projects
test_dssc_cls.py 6.81 KiB
Newer Older
import unittest
import logging
import os
import argparse
import shutil
from time import strftime

import numpy as np
import xarray as xr

import toolbox_scs as tb
import toolbox_scs.detectors as tbdet

logging.basicConfig(level=logging.DEBUG)
log_root = logging.getLogger(__name__)


suites = {"no-processing": (
                "test_create",
                ),
          "processing-quick": (
                "test_binning_quick",
                ),
          "processing-full": (
                "test_binning_xgm",
def setup_tmp_dir():
    for d in _temp_dirs:
        if not os.path.isdir(d):
            os.mkdir(d)

def cleanup_tmp_dir():
    for d in _temp_dirs:
        shutil.rmtree(d, ignore_errors=True)
        log_root.info(f'remove {d}')


class TestDSSC(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        log_root.info("Start global setup")
        # ---------------------------------------------------------------------
        # global test settings
        # ---------------------------------------------------------------------

        setup_tmp_dir()

        # ---------------------------------------------------------------------
        log_root.info("Finished global setup, start tests")

    @classmethod
    def tearDownClass(cls):
        log_root.info("Clean up test environment....")
        cleanup_tmp_dir()

    def test_create(self):
        proposal_nb = 2212
        run_nb = 235
        run = tb.load_run(proposal_nb, run_nb, include='*DA*')
        run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
        bins_trainId = tb.get_array(run,
                                    'PP800_PhaseShifter',
                                    0.04)
        bins_pulse = ['pumped', 'unpumped'] * 10

        binner1 = tbdet.create_dssc_bins("trainId",
                                      run_info['trainIds'],
                                      bins_trainId.values)
        binner2 = tbdet.create_dssc_bins("pulse",
                                      np.linspace(0,19,20, dtype=int),
                                      bins_pulse)
        binners = {'trainId': binner1, 'pulse': binner2}
        params = {'binners': binners,
                  'use_xgm': True,
                  'xgm_threshold' : (0, np.inf),
                  'xgm_bins': bins_pulse}
        run235 = tbdet.DSSCBinner(proposal_nb, run_nb)
        run235 = tbdet.DSSCBinner(2212, 235, use_xgm=True)
        run235 = tbdet.DSSCBinner(2212, 235,
                                  use_xgm=True,
                                  xgm_bins=bins_pulse)
        run235 = tbdet.DSSCBinner(proposal_nb, run_nb, **params)
        self.assertEqual(run235.binners['trainId'].values[0],
                         np.float32(7585.52))

        # expected fails
        with self.assertRaises(FileNotFoundError) as cm:
Rafael Gort's avatar
Rafael Gort committed
            run235 = tbdet.DSSCBinner(2212, 2354)
        err_msg = "[Errno 2] No such file or directory: " \
                  "'/gpfs/exfel/exp/SCS/201901/p002212/raw/r2354'"
        self.assertEqual(str(cm.exception), err_msg)


        # dark
        proposal_nb = 2212
        run_nb = 232
        run = tb.load_run(proposal_nb, run_nb, include='*DA*')
        run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
        bins_trainId = tb.get_array(run,
                                    'PP800_PhaseShifter',
                                    0.03)
        bins_pulse = ['pumped', 'unpumped'] * 10

        binner1 = tbdet.create_dssc_bins("trainId",
                                      run_info['trainIds'],
                                      bins_trainId.values)
        binner2 = tbdet.create_dssc_bins("pulse",
                                      np.linspace(0,19,20, dtype=int),
                                      bins_pulse)
        binners = {'trainId': binner1, 'pulse': binner2}
        run232 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
        mod_list = [2,15]
        data = run232.bin_data(modules=mod_list)
        data = run232.bin_data(backend='multiprocessing', modules=mod_list)
        tbdet.save_xarray(cls._module_data, './tmp/run235.h5')
        data = tbdet.load_xarray('./tmp/run232.h5')
        self.assertIsNotNone(data.data)


    def test_binning_xgm(self):
        proposal_nb = 2212
        run_nb = 235

        run = tb.load_run(proposal_nb, run_nb, include='*DA*')
        run_info = tbdet.load_dssc_info(proposal_nb, run_nb)

        bins_trainId = tb.get_array(run,
                                    'PP800_PhaseShifter',
                                    0.03)
        bins_pulse = ['pumped', 'unpumped'] * 10

        binner1 = tbdet.create_dssc_bins("trainId",
                                      run_info['trainIds'],
                                      bins_trainId.values)
        binner2 = tbdet.create_dssc_bins("pulse",
                                      np.linspace(0,19,20, dtype=int),
                                      bins_pulse)
        binners = {'trainId': binner1, 'pulse': binner2}

        params = {'binners':binners,
                  'use_xgm':True,
                  'xgm_threshold':(0, np.inf), 
                  'xgm_bins':bins_pulse
                 }

        run235 = tbdet.DSSCBinner(proposal_nb, run_nb, **params)
        data = run235.bin_data(backend='multiprocessing', modules=[3])
        self.assertIsNotNone(data.data)

        xgm_binned = run235.get_xgm_binned()
        self.assertIsNotNone(xgm_binned)


def list_suites():
    print("\nPossible test suites:\n" + "-" * 79)
    for key in suites:
        print(key)
    print("-" * 79 + "\n")


def suite(*tests):
    suite = unittest.TestSuite()
    for test in tests:
        suite.addTest(TestDSSC(test))
    return suite


def main(*cliargs):
    try:
        for test_suite in cliargs:
            if test_suite in suites:
                runner = unittest.TextTestRunner(verbosity=2)
                runner.run(suite(*suites[test_suite]))
            else:
                log_root.warning(
                    "Unknown suite: '{}'".format(test_suite))
                pass
    except Exception as err:
        log_root.error("Unecpected error: {}".format(err),
                       exc_info=True)
        pass


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--list-suites',
                        action='store_true',
                        help='list possible test suites')
    parser.add_argument('--run-suites', metavar='S',
                        nargs='+', action='store',
                        help='a list of valid test suites')
    args = parser.parse_args()

    if args.list_suites:
        list_suites()

    if args.run_suites:
        main(*args.run_suites)