Skip to content
Snippets Groups Projects
DSSC.py 13.7 KiB
Newer Older

import multiprocessing
from time import strftime
import tempfile
import shutil
from tqdm.auto import tqdm
import os
import warnings

import karabo_data as kd
from karabo_data.geometry2 import DSSC_1MGeometry
import ToolBox as tb
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
import h5py


class DSSC:
    
    def __init__(self, semester, proposal, topic='SCS'):
        self.semester = semester
        self.proposal = proposal
        self.topic = topic
        self.tempdir = None
        self.save_folder = f'/gpfs/exfel/exp/{self.topic}/{self.semester}/{self.proposal}/usr/condensed_runs/'
        
        print('DSSC configuration')
        print(f'Topic: {self.topic}')
        print(f'Semester: {self.semester}')
        print(f'Proposal: {self.proposal}')
        print(f'Default save folder: {self.save_folder}')
        
        if not os.path.exists(self.save_folder):
            warnings.warn(f'Default save folder does not exist: {self.save_folder}')
        
    def __del__(self):
        # deleting temporay folder
        if self.tempdir:
            shutil.rmtree(self.tempdir)
    
    def open_run(self, run_nr, isDark=False):
        print('Opening run data with karabo-data')
        self.run_nr = run_nr
        self.xgm = None
        self.scan_vname = None
        
        self.run = kd.open_run(self.proposal, self.run_nr)
        self.isDark = isDark
        self.plot_title = f'{self.semester} run: {self.run_nr}'
        
        self.fpt = self.run.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf')['frames_per_train']
        self.nbunches = self.run.get_array('SCS_RR_UTC/MDL/BUNCH_DECODER', 'sase3.nPulses.value')
        self.nbunches = np.unique(self.nbunches)
        if len(self.nbunches) == 1:
            self.nbunches = self.nbunches[0]
        else:
            warnings.warn('not all trains have same length DSSC data')
            print(f'nbunches: {self.nbunches}')
            self.nbunches = self.nbunches[-1]
            
        print(f'DSSC frames per train: {self.fpt}')
        print(f'SA3 bunches per train: {self.nbunches}')
        
        if self.tempdir is not None:
            shutil.rmtree(self.tempdir)
        
        self.tempdir = tempfile.mkdtemp()
        print(f'Temporary directory: {self.tempdir}')

        print('Creating virtual dataset')
        self.vdslist = self.create_virtual_dssc_datasets(self.run, path=self.tempdir)
        
        # create a dummy scan variable for dark run
        # for other type or run, use DSSC.define_run function to overwrite it
        self.scan = xr.DataArray(np.ones_like(self.run.train_ids), dims=['trainId'],
                                 coords={'trainId': self.run.train_ids})
        self.scan_vname = 'dummy'
        self.vds_scan = None
        
    def define_scan(self, vname, bins):
        """
            vname: variable name for the scan, can be a mnemonic string from ToolBox
                or a dictionnary with ['source', 'key'] fields
            bins: step size or bins
        """

        if type(vname) is dict:
            self.scan = self.run.get_array(vname['source'], vname['key'])
        elif type(vname) is str:
            if vname not in tb.mnemonics:
                raise ValueError(f'{vname} not found in the ToolBox mnemonics table')
            self.scan = self.run.get_array(tb.mnemonics[vname]['source'], tb.mnemonics[vname]['key'])
        else:
            raise ValueError(f'vname should be a string or a dict. We got {type(vname)}')
            
        if (type(bins) is int) or (type(bins) is float):
            self.scan = bins * np.round(self.scan / bins)
        else:
            # TODO: digitize the data
            raise ValueError(f'To be implemented')
        self.scan_vname = vname
        
        self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
        if os.path.isfile(self.vds_scan):
            os.remove(self.vds_scan)
            
        self.scan = self.scan.to_dataset(name='scan_variable')
        self.scan['xgm_pumped'] = self.xgm[:, :self.nbunches:2].mean('dim_0')
        self.scan['xgm_unpumped'] = self.xgm[:, 1:self.nbunches:2].mean('dim_0')
        self.scan.to_netcdf(self.vds_scan, group='data')

        self.scan_counts = xr.DataArray(np.ones(len(self.scan['scan_variable'])),
                                        dims=['scan_variable'],
                                        coords={'scan_variable': self.scan['scan_variable'].values},
                                        name='counts')
        
        fig, (ax1, ax2) = plt.subplots(nrows=2, figsize=[5, 5])
        ax1.plot(self.scan.groupby('scan_variable').mean('trainId').coords['scan_variable'].values,
                 self.scan_counts.groupby('scan_variable').sum(),
                 'o-', ms=2)
        ax1.set_xlabel('scan variable')
        ax1.set_ylabel('# trains')
        ax1.set_title(self.plot_title)
        
        ax2.plot(self.scan['scan_variable'])
        ax2.set_xlabel('train #')
        ax2.set_ylabel(f'{vname}')

        plt.tight_layout()
            
    def load_xgm(self):
        if self.xgm is None:
            self.xgm = self.run.get_array(tb.mnemonics['SCS_SA3']['source'],
                                          tb.mnemonics['SCS_SA3']['key'], roi=kd.by_index[:self.nbunches])
            
    def plot_xgm_hist(self, nbins):
        if self.xgm is None:
            self.load_xgm()
            
        hist, bins_edges = np.histogram(self.xgm, nbins, density=True)
        width = 1.0 * (bins_edges[1] - bins_edges[0])
        bins_center = 0.5*(bins_edges[:-1] + bins_edges[1:])
        plt.figure()
        plt.bar(bins_center, hist, align='center', width=width)
        plt.xlabel(f"{tb.mnemonics['SCS_SA3']['source']}{tb.mnemonics['SCS_SA3']['key']}")
        plt.ylabel('density')
        plt.title(self.plot_title)
        plt.tight_layout()
        
    def xgm_filter(self, xgm_low=-np.inf, xgm_high=np.inf):
        if self.xgm is None:
            self.load_xgm()
        
        if self.isDark:
            warnings.warn(f'This run was loaded as dark. Filtering on xgm makes no sense. Aborting')
            return
        
        self.xgm_low = xgm_low
        self.xgm_high = xgm_high
        
        valid = ((self.xgm > self.xgm_low) * (self.xgm < self.xgm_high)).prod('dim_0').astype(bool)
        xgm_valid = self.xgm.where(valid)
        xgm_valid = xgm_valid.dropna('trainId')
        self.scan = self.scan.sel({'trainId': xgm_valid.trainId})
        nrejected = len(self.run.train_ids) - len(self.scan)
        print((f'Rejecting {nrejected} out of {len(self.run.train_ids)} trains due to xgm '
               f'thresholds: [{self.xgm_low}, {self.xgm_high}]'))

    def load_geom(self):
        quad_pos = [
            (-124.100,    3.112),  # TR
            (-133.068, -110.604),  # BR
            (   0.988, -125.236),  # BL
            (   4.528,   -4.912)   # TL
            ]
        path = '/gpfs/exfel/sw/software/exfel_environments/misc/git/karabo_data/docs/dssc_geo_june19.h5'
        geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(path, quad_pos)
        return geom

    def create_virtual_dssc_datasets(self, run, path=''):
        vds_list = []
        for m in tqdm(range(16)):
            vds_filename = os.path.join(path, f'dssc{m}_vds.h5')
            if os.path.isfile(vds_filename):
                os.remove(vds_filename)
            module_vds = run.get_virtual_dataset(f'SCS_DET_DSSC1M-1/DET/{m}CH0:xtdf',
                                                 'image.data', filename=vds_filename)
            vds_list.append([vds_filename, module_vds])
        return vds_list
    
    def crunch(self):
                   
        if self.vds_scan is None:
            # probably a dark run with a dummy scan variable
            self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
            if os.path.isfile(self.vds_scan):
                os.remove(self.vds_scan)
            
            self.scan = self.scan.to_dataset(name='scan_variable')
            self.scan.to_netcdf(self.vds_scan, group='data')

        max_GB = 400
        # max_GB / (8byte * 16modules * 128px * 512px * N_pulses)
        self.chunksize = int(max_GB * 1024**3 // (8 * 16 * 128 * 512 * self.fpt))
        
        print('processing', self.chunksize, 'trains per chunk')
                   
        jobs = []
        for m in range(16):
            jobs.append(dict(
            module=m,
            fpt=self.fpt,
            vdf_module=os.path.join(self.tempdir, f'dssc{m}_vds.h5'),
            chunksize=self.chunksize,
            vdf_scan=self.vds_scan,
            nbunches=self.nbunches,
            run_nr=self.run_nr,
            ))
            
        timestamp = strftime('%X')
        print(f'start time: {timestamp}')

        with multiprocessing.Pool(16) as pool:
            module_data = pool.map(process_one_module, jobs)
    
        print('finished:', strftime('%X'))
    
        # rearange the multiprocessed data
        self.module_data = xr.concat(module_data, dim='module')
        self.module_data['run'] = self.run_nr
        self.module_data = self.module_data.transpose('scan_variable', 'module', 'x', 'y')
                   
        self.module_data = xr.merge([self.module_data, self.scan.groupby('scan_variable').mean('trainId')])
        self.module_data = self.module_data.squeeze()

    def save(self, save_folder=None, overwrite=False):
        if save_folder is None:
            save_folder = this.save_folder

        if self.isDark:
            fname = f'run{self.run_nr}_el.h5'  # no scan
        else:
            fname = f'run{self.run_nr}_by-delay_el.h5'  # run with delay scan (change for other scan types!)


        save_path = os.path.join(save_folder, fname)
        file_exists = os.path.isfile(save_path)

        if not file_exists or (file_exists and overwrite):
            if file_exists:
                warnings.warn(f'Overwriting file: {save_path}')
                os.remove(save_path)
            self.module_data.to_netcdf(save_path, group='data')
            os.chmod(save_path, 0o664)
            print('saving: ', save_path)
        else:
            print('file', save_path, 'exists and overwrite is False')

# since 'self' is not pickable, this function has to be outside the DSSC class so that it can be used
# by the multiprocessing pool.map function
def process_one_module(job):
    module = job['module']
    fpt = job['fpt']
    data_vdf = job['vdf_module']
    scan_vdf = job['vdf_scan']
    chunksize = job['chunksize']
    nbunches = job['nbunches']

    image_path = f'INSTRUMENT/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/data'
    npulse_path = f'INDEX/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/count'
    with h5py.File(data_vdf, 'r') as m:
        all_trainIds = m['INDEX/trainId'][()]
    n_trains = len(all_trainIds)
    chunk_start = np.arange(n_trains, step=chunksize, dtype=int)

    # load scan variable
    scan = xr.open_dataset(scan_vdf, group='data')['scan_variable']
    scan.name = 'scan'
    len_scan = len(scan.groupby(scan))

    # create empty dataset to add actual data to
    module_data = xr.DataArray(np.empty([len_scan, 128, 512], dtype=np.float64),
                               dims=['scan_variable', 'x', 'y'],
                               coords={'scan_variable': np.unique(scan)})
    module_data = module_data.to_dataset(name='pumped')
    module_data['unpumped'] = xr.full_like(module_data['pumped'], 0)
    module_data['sum_count'] = xr.DataArray(np.zeros_like(np.unique(scan)), dims=['scan_variable'])
    module_data['module'] = module

    # crunching
    with h5py.File(data_vdf, 'r') as m:
        #fpt_calc = int(len(m[image_path]) / n_trains)
        #assert fpt_calc == fpt, f'data length does not match expected value (module {module})'
        all_trainIds = m['INDEX/trainId'][()]
        frames_per_train = m[npulse_path][()]
        trains_with_data = all_trainIds[frames_per_train == fpt]
        #print(np.unique(pulses_per_train), '/', fpt)
        #print(len(trains_with_data))
        chunk_start = np.arange(len(all_trainIds), step=chunksize, dtype=int)
        trains_start = 0
                   
        # This line is the strange hack from https://github.com/tqdm/tqdm/issues/485
        print(' ', end='', flush=True)
        for c0 in tqdm(chunk_start, desc=f'pool.map#{module:02d}', position=module):
            chunk_dssc = np.s_[int(c0 * fpt):int((c0 + chunksize) * fpt)]  # for dssc data
            data = m[image_path][chunk_dssc].squeeze()
            data = data.astype(np.float64)
            n_trains = int(data.shape[0] // fpt)
            trainIds_chunk = np.unique(trains_with_data[trains_start:trains_start + n_trains])
            trains_start += n_trains
            n_trains_actual = len(trainIds_chunk)
            coords = {'trainId': trainIds_chunk}
            data = np.reshape(data, [n_trains_actual, fpt, 128, 512])[:, :int(2 * nbunches)]
            data = xr.DataArray(data, dims=['trainId', 'pulse', 'x', 'y'], coords=coords)
            data_pumped = (data[:, ::4]).mean('pulse')
            data_unpumped = (data[:, 2::4]).mean('pulse')
            data = data_pumped.to_dataset(name='pumped')
            data['unpumped'] = data_unpumped
            data['sum_count'] = xr.DataArray(np.ones(n_trains_actual), dims=['trainId'], coords=coords)
            # grouping and summing
            data['scan_variable'] = scan  # this only adds scan data for matching trainIds
            data = data.dropna('trainId')
            data = data.groupby('scan_variable').sum('trainId')
            where = {'scan_variable': data.scan_variable}
            for var in ['pumped', 'unpumped', 'sum_count']:
                module_data[var].loc[where] = module_data[var].loc[where] + data[var]
    for var in ['pumped', 'unpumped']:
        module_data[var] = module_data[var] / module_data.sum_count
    #module_data = module_data.drop('sum_count')
    return module_data