Skip to content
Snippets Groups Projects
DSSC.py 15.5 KiB
Newer Older
import multiprocessing
from time import strftime
import tempfile
import shutil
from tqdm.auto import tqdm
import os
import warnings

import karabo_data as kd
from karabo_data.geometry2 import DSSC_1MGeometry
import ToolBox as tb
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
import h5py


class DSSC:
    
    def __init__(self, semester, proposal, topic='SCS'):
        """ Create a DSSC object to process DSSC data.
        
            inputs:
                semester: semester string
                proposal: proposal number string
                topic: topic, by default SCS
            
        """
        self.semester = semester
        self.proposal = proposal
        self.topic = topic
        self.tempdir = None
        self.save_folder = f'/gpfs/exfel/exp/{self.topic}/{self.semester}/{self.proposal}/usr/condensed_runs/'
        
        print('DSSC configuration')
        print(f'Topic: {self.topic}')
        print(f'Semester: {self.semester}')
        print(f'Proposal: {self.proposal}')
        print(f'Default save folder: {self.save_folder}')
        
        if not os.path.exists(self.save_folder):
            warnings.warn(f'Default save folder does not exist: {self.save_folder}')
        
    def __del__(self):
        # deleting temporay folder
        if self.tempdir:
            shutil.rmtree(self.tempdir)
    
    def open_run(self, run_nr, isDark=False):
        """ Open a run with karabo-data and prepare the virtual dataset for multiprocessing
        
            inputs:
                run_nr: the run number
                isDark: a boolean to specify if the run is a dark run or not
        
        """
        
        print('Opening run data with karabo-data')
        self.run_nr = run_nr
        self.xgm = None
        self.scan_vname = None
        
        self.run = kd.open_run(self.proposal, self.run_nr)
        self.isDark = isDark
        self.plot_title = f'{self.semester} run: {self.run_nr}'
        
        self.fpt = self.run.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf')['frames_per_train']
        self.nbunches = self.run.get_array('SCS_RR_UTC/MDL/BUNCH_DECODER', 'sase3.nPulses.value')
        self.nbunches = np.unique(self.nbunches)
        if len(self.nbunches) == 1:
            self.nbunches = self.nbunches[0]
        else:
            warnings.warn('not all trains have same length DSSC data')
            print(f'nbunches: {self.nbunches}')
            self.nbunches = self.nbunches[-1]
            
        print(f'DSSC frames per train: {self.fpt}')
        print(f'SA3 bunches per train: {self.nbunches}')
        
        if self.tempdir is not None:
            shutil.rmtree(self.tempdir)
        
        self.tempdir = tempfile.mkdtemp()
        print(f'Temporary directory: {self.tempdir}')

        print('Creating virtual dataset')
        self.vdslist = self.create_virtual_dssc_datasets(self.run, path=self.tempdir)
        
        # create a dummy scan variable for dark run
        # for other type or run, use DSSC.define_run function to overwrite it
        self.scan = xr.DataArray(np.ones_like(self.run.train_ids), dims=['trainId'],
                                 coords={'trainId': self.run.train_ids})
        self.scan_vname = 'dummy'
        self.vds_scan = None
        
    def define_scan(self, vname, bins):
        """
            Prepare the binning of the DSSC data.
            
            inputs:
                vname: variable name for the scan, can be a mnemonic string from ToolBox
                    or a dictionnary with ['source', 'key'] fields
                bins: step size (or bins_edge but not yet implemented)
        """

        if type(vname) is dict:
            self.scan = self.run.get_array(vname['source'], vname['key'])
        elif type(vname) is str:
            if vname not in tb.mnemonics:
                raise ValueError(f'{vname} not found in the ToolBox mnemonics table')
            self.scan = self.run.get_array(tb.mnemonics[vname]['source'], tb.mnemonics[vname]['key'])
        else:
            raise ValueError(f'vname should be a string or a dict. We got {type(vname)}')
            
        if (type(bins) is int) or (type(bins) is float):
            self.scan = bins * np.round(self.scan / bins)
        else:
            # TODO: digitize the data
            raise ValueError(f'To be implemented')
        self.scan_vname = vname
        
        self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
        if os.path.isfile(self.vds_scan):
            os.remove(self.vds_scan)
            
        self.scan = self.scan.to_dataset(name='scan_variable')
        self.scan['xgm_pumped'] = self.xgm[:, :self.nbunches:2].mean('dim_0')
        self.scan['xgm_unpumped'] = self.xgm[:, 1:self.nbunches:2].mean('dim_0')
        self.scan.to_netcdf(self.vds_scan, group='data')

        self.scan_counts = xr.DataArray(np.ones(len(self.scan['scan_variable'])),
                                        dims=['scan_variable'],
                                        coords={'scan_variable': self.scan['scan_variable'].values},
                                        name='counts')
        
        fig, (ax1, ax2) = plt.subplots(nrows=2, figsize=[5, 5])
        ax1.plot(self.scan.groupby('scan_variable').mean('trainId').coords['scan_variable'].values,
                 self.scan_counts.groupby('scan_variable').sum(),
                 'o-', ms=2)
        ax1.set_xlabel('scan variable')
        ax1.set_ylabel('# trains')
        ax1.set_title(self.plot_title)
        
        ax2.plot(self.scan['scan_variable'])
        ax2.set_xlabel('train #')
        ax2.set_ylabel(f'{vname}')

        plt.tight_layout()
            
    def load_xgm(self):
        """ Loads pulse resolved dedicated SAS3 data from the SCS XGM.
        
        """
        if self.xgm is None:
            self.xgm = self.run.get_array(tb.mnemonics['SCS_SA3']['source'],
                                          tb.mnemonics['SCS_SA3']['key'], roi=kd.by_index[:self.nbunches])
            
    def plot_xgm_hist(self, nbins=100):
        """ Plots an histogram of the SCS XGM dedicated SAS3 data.
        
            inputs:
                nbins: number of the bins for the histogram.
        """
        if self.xgm is None:
            self.load_xgm()
            
        hist, bins_edges = np.histogram(self.xgm, nbins, density=True)
        width = 1.0 * (bins_edges[1] - bins_edges[0])
        bins_center = 0.5*(bins_edges[:-1] + bins_edges[1:])
        
        plt.figure(figsize=(5,3))
        plt.bar(bins_center, hist, align='center', width=width)
        plt.xlabel(f"{tb.mnemonics['SCS_SA3']['source']}{tb.mnemonics['SCS_SA3']['key']}")
        plt.ylabel('density')
        plt.title(self.plot_title)
        plt.tight_layout()
        
    def xgm_filter(self, xgm_low=-np.inf, xgm_high=np.inf):
        """ Filters the data by train. If one pulse within a train has an SASE3 SCS XGM value below
            xgm_low or above xgm_high, that train will be dropped from the dataset.
            
            inputs:
                xgm_low: low threshold value
                xgm_high: high threshold value
        """
                   
        if self.xgm is None:
            self.load_xgm()
        
        if self.isDark:
            warnings.warn(f'This run was loaded as dark. Filtering on xgm makes no sense. Aborting')
            return
        
        self.xgm_low = xgm_low
        self.xgm_high = xgm_high
        
        valid = ((self.xgm > self.xgm_low) * (self.xgm < self.xgm_high)).prod('dim_0').astype(bool)
        xgm_valid = self.xgm.where(valid)
        xgm_valid = xgm_valid.dropna('trainId')
        self.scan = self.scan.sel({'trainId': xgm_valid.trainId})
        nrejected = len(self.run.train_ids) - len(self.scan)
        print((f'Rejecting {nrejected} out of {len(self.run.train_ids)} trains due to xgm '
               f'thresholds: [{self.xgm_low}, {self.xgm_high}]'))

    def load_geom(self):
        """ Loads and return the DSSC geometry.
        """
        quad_pos = [
            (-124.100,    3.112),  # TR
            (-133.068, -110.604),  # BR
            (   0.988, -125.236),  # BL
            (   4.528,   -4.912)   # TL
            ]
        path = '/gpfs/exfel/sw/software/exfel_environments/misc/git/karabo_data/docs/dssc_geo_june19.h5'
        geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(path, quad_pos)
        return geom

    def create_virtual_dssc_datasets(self, run, path=''):
        """ Create virtual datasets for each 16 DSSC modules used for the multiprocessing.
            
            input:
                run: karabo-data run
                path: string where the virtual files are created
        """
        
        vds_list = []
        for m in tqdm(range(16)):
            vds_filename = os.path.join(path, f'dssc{m}_vds.h5')
            if os.path.isfile(vds_filename):
                os.remove(vds_filename)
            module_vds = run.get_virtual_dataset(f'SCS_DET_DSSC1M-1/DET/{m}CH0:xtdf',
                                                 'image.data', filename=vds_filename)
            vds_list.append([vds_filename, module_vds])
        return vds_list
    
    def crunch(self):
        """ Crunch through the DSSC data using multiprocessing.
        
        """
        if self.vds_scan is None:
            # probably a dark run with a dummy scan variable
            self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
            if os.path.isfile(self.vds_scan):
                os.remove(self.vds_scan)
            
            self.scan = self.scan.to_dataset(name='scan_variable')
            self.scan.to_netcdf(self.vds_scan, group='data')

        max_GB = 400
        # max_GB / (8byte * 16modules * 128px * 512px * N_pulses)
        self.chunksize = int(max_GB * 1024**3 // (8 * 16 * 128 * 512 * self.fpt))
        
        print('processing', self.chunksize, 'trains per chunk')
                   
        jobs = []
        for m in range(16):
            jobs.append(dict(
            module=m,
            fpt=self.fpt,
            vdf_module=os.path.join(self.tempdir, f'dssc{m}_vds.h5'),
            chunksize=self.chunksize,
            vdf_scan=self.vds_scan,
            nbunches=self.nbunches,
            run_nr=self.run_nr,
            ))
            
        timestamp = strftime('%X')
        print(f'start time: {timestamp}')

        with multiprocessing.Pool(16) as pool:
            module_data = pool.map(process_one_module, jobs)
    
        print('finished:', strftime('%X'))
    
        # rearange the multiprocessed data
        self.module_data = xr.concat(module_data, dim='module')
        self.module_data['run'] = self.run_nr
        self.module_data = self.module_data.transpose('scan_variable', 'module', 'x', 'y')
                   
        self.module_data = xr.merge([self.module_data, self.scan.groupby('scan_variable').mean('trainId')])
        self.module_data = self.module_data.squeeze()

    def save(self, save_folder=None, overwrite=False):
        """ Save the crunched data.
        
            inputs:
                save_folder: string of the fodler where to save the data.
                overwrite: boolean whether or not to overwrite existing files.
        """
        if save_folder is None:
            save_folder = this.save_folder

        if self.isDark:
            fname = f'run{self.run_nr}_el.h5'  # no scan
        else:
            fname = f'run{self.run_nr}_by-delay_el.h5'  # run with delay scan (change for other scan types!)


        save_path = os.path.join(save_folder, fname)
        file_exists = os.path.isfile(save_path)

        if not file_exists or (file_exists and overwrite):
            if file_exists:
                warnings.warn(f'Overwriting file: {save_path}')
                os.remove(save_path)
            self.module_data.to_netcdf(save_path, group='data')
            os.chmod(save_path, 0o664)
            print('saving: ', save_path)
        else:
            print('file', save_path, 'exists and overwrite is False')

# since 'self' is not pickable, this function has to be outside the DSSC class so that it can be used
# by the multiprocessing pool.map function
def process_one_module(job):
    module = job['module']
    fpt = job['fpt']
    data_vdf = job['vdf_module']
    scan_vdf = job['vdf_scan']
    chunksize = job['chunksize']
    nbunches = job['nbunches']

    image_path = f'INSTRUMENT/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/data'
    npulse_path = f'INDEX/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/count'
    with h5py.File(data_vdf, 'r') as m:
        all_trainIds = m['INDEX/trainId'][()]
    n_trains = len(all_trainIds)
    chunk_start = np.arange(n_trains, step=chunksize, dtype=int)

    # load scan variable
    scan = xr.open_dataset(scan_vdf, group='data')['scan_variable']
    scan.name = 'scan'
    len_scan = len(scan.groupby(scan))

    # create empty dataset to add actual data to
    module_data = xr.DataArray(np.empty([len_scan, 128, 512], dtype=np.float64),
                               dims=['scan_variable', 'x', 'y'],
                               coords={'scan_variable': np.unique(scan)})
    module_data = module_data.to_dataset(name='pumped')
    module_data['unpumped'] = xr.full_like(module_data['pumped'], 0)
    module_data['sum_count'] = xr.DataArray(np.zeros_like(np.unique(scan)), dims=['scan_variable'])
    module_data['module'] = module

    # crunching
    with h5py.File(data_vdf, 'r') as m:
        #fpt_calc = int(len(m[image_path]) / n_trains)
        #assert fpt_calc == fpt, f'data length does not match expected value (module {module})'
        all_trainIds = m['INDEX/trainId'][()]
        frames_per_train = m[npulse_path][()]
        trains_with_data = all_trainIds[frames_per_train == fpt]
        #print(np.unique(pulses_per_train), '/', fpt)
        #print(len(trains_with_data))
        chunk_start = np.arange(len(all_trainIds), step=chunksize, dtype=int)
        trains_start = 0
                   
        # This line is the strange hack from https://github.com/tqdm/tqdm/issues/485
        print(' ', end='', flush=True)
        for c0 in tqdm(chunk_start, desc=f'pool.map#{module:02d}', position=module):
            chunk_dssc = np.s_[int(c0 * fpt):int((c0 + chunksize) * fpt)]  # for dssc data
            data = m[image_path][chunk_dssc].squeeze()
            data = data.astype(np.float64)
            n_trains = int(data.shape[0] // fpt)
            trainIds_chunk = np.unique(trains_with_data[trains_start:trains_start + n_trains])
            trains_start += n_trains
            n_trains_actual = len(trainIds_chunk)
            coords = {'trainId': trainIds_chunk}
            data = np.reshape(data, [n_trains_actual, fpt, 128, 512])[:, :int(2 * nbunches)]
            data = xr.DataArray(data, dims=['trainId', 'pulse', 'x', 'y'], coords=coords)
            data_pumped = (data[:, ::4]).mean('pulse')
            data_unpumped = (data[:, 2::4]).mean('pulse')
            data = data_pumped.to_dataset(name='pumped')
            data['unpumped'] = data_unpumped
            data['sum_count'] = xr.DataArray(np.ones(n_trains_actual), dims=['trainId'], coords=coords)
            # grouping and summing
            data['scan_variable'] = scan  # this only adds scan data for matching trainIds
            data = data.dropna('trainId')
            data = data.groupby('scan_variable').sum('trainId')
            where = {'scan_variable': data.scan_variable}
            for var in ['pumped', 'unpumped', 'sum_count']:
                module_data[var].loc[where] = module_data[var].loc[where] + data[var]
    for var in ['pumped', 'unpumped']:
        module_data[var] = module_data[var] / module_data.sum_count
    #module_data = module_data.drop('sum_count')
    return module_data