Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • SCS/ToolBox
  • kluyvert/ToolBox
2 results
Show changes
Showing
with 1238 additions and 83 deletions
Loading run data
================
.. toctree::
Loading_data_in_memory
Short version
-------------
Loading data in memory is performed as follows:
**Option 1**:
.. code:: python3
import toolbox_scs as tb
# optional, check available mnemonics
# print(tb.mnemonics)
# fields is a list of available mnemonics, representing the data
# to be loaded
fields = ["FastADC4raw", "scannerX"]
proposalNr = 2565
runNr = 19
run, data = tb.load(proposalNr, runNr, fields)
run is an extra_data dataCollection and data is an xarray Dataset containing all variables listed in fields. All variables are aligned by train Id.
**Option 2**:
.. code:: python3
import toolbox_scs as tb
# get entry for single data source defined in mnemonics
proposalNr = 2565
runNr = 19
run = tb.open_run(proposalNr, runNr)
data = tb.get_array(run, "scannerX")
run is an extra_data dataCollection and data an xarray dataArray for a single data source.
.. _maintainers:
Maintainers
~~~~~~~~~~~
Creating a Toolbox environment in the proposal folder
-----------------------------------------------------
A Toolbox environment can be created by running the following commands
in Maxwell:
.. code:: shell
module load exfel exfel-python
scs-create-toolbox-env --proposal <PROPOSAL>
where ``<PROPOSAL>`` is the desired proposal number. This will create a Python
environment and will download and install the Toolbox source code. It will
result to the creation of the following folders in the path
``<PROPOSAL_PATH>/scratch``.
.. code-block::
<PROPOSAL_PATH>/scratch/
├─ checkouts/
│ ├─ toolbox_<PROPOSAL>/
│ │ ├─ <source code>
├─ envs/
│ ├─ toolbox_<PROPOSAL>/
│ │ ├─ <Python files>
The ``checkouts`` folder contains the Toolbox source codes, correspondingly
labeled according to the environment identifier, which is the proposal number
by default. The downloaded code defaults to the **master** version at the
time when the environment is created.
The ``envs`` folder contains the Python environment with the packages necessary
to run the Toolbox. It is also correspondingly labeled according to the
environment identifier, which is the proposal number by default.
.. note::
One can find the proposal path by running ``findxfel <PROPOSAL>``.
It is a good practice to tag the Toolbox version at a given milestone.
This version can be then supplied to the script as:
.. code:: shell
scs-create-toolbox-env --proposal <PROPOSAL> --version <VERSION>
It might also be helpful to supply an identifier to distinguish environments
from each other. This can be done by running:
.. code:: shell
scs-create-toolbox-env --proposal <PROPOSAL> --identifier <IDENTIFIER>
The environment would then be identified as ``toolbox_<IDENTIFIER>`` instead
of ``toolbox_<PROPOSAL>``.
Installing additional packages
------------------------------
In order to install additional packages in a Toolbox environment, one should
run the following commands:
.. code:: shell
cd <PROPOSAL_PATH>/scratch/
source envs/toolbox_<IDENTIFIER>/bin/activate
pip install ...
There's no need to load the ``exfel`` module.
Updating the source codes
--------------------------
Should there be desired changes in the Toolbox codes, may it be bug fixes or
additional features during beamtime, one can freely modify the source codes in
the following path: ``<PROPOSAL_PATH>/scratch/checkouts/toolbox_<IDENTIFIER>``.
The contents of this folder should be a normal git repository. Any changes can
be easily done (e.g., editing a line of code, checking out a different
branch, etc.) and such changes are immediately reflected on the environment.
doc/metadata.png

136 KiB

sphinx
sphinx_rtd_theme
autoapi
sphinx-autoapi
nbsphinx
urllib3<2.0.0
#!/bin/bash
#SBATCH -N 1
#SBATCH --partition=exfel
#SBATCH --time=12:00:00
#SBATCH --mail-type=END,FAIL
#SBATCH --output=logs/%j-%x.out
while getopts ":p:d:r:k:m:x:b:" option
do
case $option in
p) PROPOSAL="$OPTARG";;
d) DARK="$OPTARG";;
r) RUN="$OPTARG";;
k) KERNEL="$OPTARG";;
m) MODULE_GROUP="$OPTARG";;
x) XAXIS="$OPTARG";;
b) BINWIDTH="$OPTARG";;
\?) echo "Unknown option"
exit 1;;
:) echo "Missing option for input flag"
exit 1;;
esac
done
# Load xfel environment
source /etc/profile.d/modules.sh
module load exfel exfel-python
echo processing run $RUN
PDIR=$(findxfel $PROPOSAL)
PPROPOSAL="p$(printf '%06d' $PROPOSAL)"
RDIR="$PDIR/usr/processed_runs/r$(printf '%04d' $RUN)"
mkdir $RDIR
NB='Dask DSSC module binning.ipynb'
# kernel list can be seen from 'jupyter kernelspec list'
if [ -z "${KERNEL}" ]; then
KERNEL="toolbox_$PPROPOSAL"
fi
python -c "import papermill as pm; pm.execute_notebook(\
'$NB', \
'$RDIR/output$MODULE_GROUP.ipynb', \
kernel_name='$KERNEL', \
parameters=dict(proposalNB=int('$PROPOSAL'), \
dark_runNB=int('$DARK'), \
runNB=int('$RUN'), \
module_group=int('$MODULE_GROUP'), \
path='$RDIR/', \
xaxis='$XAXIS', \
bin_width=float('$BINWIDTH')))"
#!/bin/bash
#SBATCH -N 1
#SBATCH --partition=allgpu
#SBATCH --constraint=V100
#SBATCH --time=2:00:00
#SBATCH --mail-type=END,FAIL
#SBATCH --output=logs/%j-%x.out
ROISTH='1'
SATLEVEL='500'
MODULE='15'
while getopts ":p:d:r:k:g:t:s:m:" option
do
case $option in
p) PROPOSAL="$OPTARG";;
d) DARK="$OPTARG";;
r) RUN="$OPTARG";;
k) KERNEL="$OPTARG";;
g) GAIN="$OPTARG";;
t) ROISTH="$OPTARG";;
s) SATLEVEL="$OPTARG";;
m) MODULE="$OPTARG";;
\?) echo "Unknown option"
exit 1;;
:) echo "Missing option for input flag"
exit 1;;
esac
done
# Load xfel environment
source /etc/profile.d/modules.sh
module load exfel exfel-python
echo processing run $RUN
PDIR=$(findxfel $PROPOSAL)
PPROPOSAL="p$(printf '%06d' $PROPOSAL)"
RDIR="$PDIR/usr/processed_runs/r$(printf '%04d' $RUN)"
mkdir $RDIR
NB='BOZ analysis part I.a Correction determination.ipynb'
# kernel list can be seen from 'jupyter kernelspec list'
if [ -z "${KERNEL}" ]; then
KERNEL="toolbox_$PPROPOSAL"
fi
python -c "import papermill as pm; pm.execute_notebook(\
'$NB', \
'$RDIR/output.ipynb', \
kernel_name='$KERNEL', \
parameters=dict(proposal=int('$PROPOSAL'), \
darkrun=int('$DARK'), \
run=int('$RUN'), \
module=int('$MODULE'), \
gain=float('$GAIN'), \
rois_th=float('$ROISTH'), \
sat_level=int('$SATLEVEL')))"
import os
import logging
import argparse
import numpy as np
import toolbox_scs as tb
import toolbox_scs.detectors as tbdet
logging.basicConfig(level=logging.INFO)
log_root = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# user input:
# -----------------------------------------------------------------------------
run_type = 'static, delay, .....'
description = 'useful description or comment .....'
#add xgm data to formatted file if save_xgm_binned was set to True
metadata = ['binner1', 'binner2', 'xgm_binned'] # ['binner1', 'binner2']
# -----------------------------------------------------------------------------
def formatting(run_number, run_folder):
log_root.debug("Collect, combine and format files in run folder")
run_formatted = tbdet.DSSCFormatter(run_folder)
run_formatted.combine_files()
run_formatted.add_dataArray(metadata)
attrs = {'run_type':run_type,
'description':description,
'run_number':run_number}
run_formatted.add_attributes(attrs)
run_formatted.save_formatted_data(f'{run_folder}run_{run_number}_formatted.h5')
log_root.debug("Formatting finished successfully.")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--run-number', metavar='S',
action='store',
help='run number')
parser.add_argument('--run-folder', metavar='S',
action='store',
help='the run folder containing fractional data')
args = parser.parse_args()
formatting(str(args.run_number), str(args.run_folder))
#!/bin/bash
RUN_NR=${1}
RUN_DIR="../processed_runs/r_${RUN_NR}/"
if [ -d $RUN_DIR ]
then
echo creating formatted .h5 file for run $RUN_NR in $RUN_DIR
source /etc/profile.d/modules.sh
module load exfel exfel-python
python format_data.py --run-number $RUN_NR --run-folder $RUN_DIR
#chgrp -R 60002711-part $RUN_DIR
chmod -R 777 $RUN_DIR
else
echo run folder $RUN_DIR does not exist
echo please provide a valid run number
fi
import os
import logging
import argparse
import h5py
import numpy as np
import extra_data as ed
import toolbox_scs as tb
import toolbox_scs.detectors as tbdet
logging.basicConfig(level=logging.INFO)
log_root = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# user input: run-type specific
# -----------------------------------------------------------------------------
proposal_nb = 2599
output_filepath = "../processed_runs/"
# these get set by the shell script now! (e.g. "--runtype static")
# runtype = 'energyscan'
# runtype = 'energyscan_pumped'
# runtype = 'static'
# runtype = 'static_IR'
# runtype = 'delayscan'
# runtype = 'timescan'
# useful metadata to be added to h5 files
scriptname = os.path.basename(__file__)
save_xgm_binned = True
# optional prebinning methods for DSSC data
normevery = 2 # 2 if use intradark, 1 otherwise
xgm_mask = True # True: xgm_threshold will be used to drop corresponding DSSC frames accordingly to the xgm treshold
xgm_threshold = (1000, np.inf) # or you mean bad pulses here ?
filename_dark = None # 200
xgm_normalization = False
# -----------------------------------------------------------------------------
def process(run_nb, runtype, modules=[]):
run_description = f'{runtype}; script {scriptname}'
print(run_description)
mod_list = modules
if len(mod_list)==0:
mod_list = [i for i in range(16)]
path = f'{output_filepath}r_{run_nb}/'
log_root.info("create run objects")
run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
fpt = run_info['frames_per_train']
n_trains = run_info['number_of_trains']
trainIds = run_info['trainIds']
# -------------------------------------------------------------------------
# user input: run specific
# -------------------------------------------------------------------------
run_obj = ed.open_run(proposal_nb, run_nb)
if runtype == 'static':
buckets_train = np.zeros(n_trains)
pulsepattern = ['image', 'intradark']
buckets_pulse = pulsepattern * (fpt // len(pulsepattern))
if runtype == 'energyscan':
buckets_train = tb.get_array(run_obj, 'nrj', 0.1).values
pulsepattern = ['image', 'intradark']
buckets_pulse = pulsepattern * (fpt // len(pulsepattern))
if runtype == 'static_IR':
buckets_train = np.zeros(n_trains)
pulsepattern = ['unpumped', 'unpumped_intradark', 'pumped', 'pumped_intradark']
buckets_pulse = pulsepattern * (fpt // len(pulsepattern))
if runtype == 'energyscan_pumped':
buckets_train = tb.get_array(run_obj, 'nrj', 0.1).values
pulsepattern = ['unpumped', 'unpumped_intradark', 'pumped', 'pumped_intradark']
buckets_pulse = pulsepattern * (fpt // len(pulsepattern))
if runtype == 'delayscan':
buckets_train = tb.get_array(run_obj, 'PP800_DelayLine', 0.03).values
pulsepattern = ['unpumped', 'unpumped_intradark', 'pumped', 'pumped_intradark']
buckets_pulse = pulsepattern * (fpt // len(pulsepattern))
if runtype == 'timescan': # 10s bins (tstamp is in ns)
bin_nsec = 10 * 1e9
tstamp = run_obj.get_array('SCS_RR_UTC/TSYS/TIMESERVER', 'id.timestamp')
buckets_train = (bin_nsec * np.round(tstamp / bin_nsec) - tstamp.min()) / 1e9
pulsepattern = ['unpumped', 'unpumped_intradark', 'pumped', 'pumped_intradark']
buckets_pulse = pulsepattern * (fpt // len(pulsepattern))
# -------------------------------------------------------------------------
# create binner
binner1 = tbdet.create_dssc_bins("trainId",trainIds,buckets_train)
binner2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,fpt-1,fpt, dtype=int),
buckets_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb,
binners=binners,
dssc_coords_stride=normevery)
if xgm_mask:
bin_obj.create_pulsemask('xgm', xgm_threshold)
dark=None
if filename_dark:
dark = tbdet.load_xarray(filename_dark)
dark = dark['data']
bin_params = {'modules':mod_list,
'chunksize':248,
'filepath':path,
'xgm_normalization':xgm_normalization,
'normevery':normevery,
'dark_image':dark}
log_root.info("start binning routine")
bin_obj.process_data(**bin_params)
log_root.info("Add additional data to module files")
if save_xgm_binned:
bin_obj.load_xgm()
xgm_binned = bin_obj.get_xgm_binned()
if not os.path.isdir(path):
os.mkdir(path)
for m in mod_list:
fname = f'run_{run_nb}_module{m}.h5'
if save_xgm_binned:
tbdet.save_xarray(
path+fname, xgm_binned, group='xgm_binned', mode='a')
tbdet.save_xarray(path+fname, binner1, group='binner1', mode='a')
tbdet.save_xarray(path+fname, binner2, group='binner2', mode='a')
metadata = {'run_number':run_nb,
'module':m,
'run_description':run_description}
tbdet.save_attributes_h5(path+fname, metadata)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--run-number', metavar='S',
action='store',
help='the run to be processed')
parser.add_argument('--module', metavar='S',
nargs='+', action='store',
help='modules to be processed')
parser.add_argument('--runtype', metavar='S',
nargs='+', action='store',
help=('type of run (static, static_IR, energyscan, energyscan_pumped)'
', delayscan', 'timescan)'))
args = parser.parse_args()
runtype = args.runtype[0]
if args.run_number:
if args.module is not None:
modules = []
if len(args.module) == 1:
args.module = args.module[0].split(" ")
modules = list(map(int, args.module))
process(str(args.run_number), runtype, modules)
else:
process(str(args.run_number), runtype)
#!/bin/bash
#SBATCH -N 1
#SBATCH --partition=upex
#SBATCH --time=00:30:00
#SBATCH --mail-type=END,FAIL
#SBATCH --output=../logs/%j-%x.out
RUN=$1
MODULES=$2
RUNTYPE=$3
source /etc/profile.d/modules.sh
module load exfel exfel-python
echo processing modules $MODULES of run $RUN
python process_data_201007_23h.py --run-number $RUN --module ${MODULES} --runtype $RUNTYPE
#!/bin/bash
RUN=$1
RUNTYPE=$2
if [ $RUN ] && [ $RUNTYPE ]
then
echo processing run $RUN
source /etc/profile.d/modules.sh
module load exfel exfel-python
sbatch ./start_job_single.sh $RUN '0 1 2 3' $RUNTYPE
sbatch ./start_job_single.sh $RUN '4 5 6 7' $RUNTYPE
sbatch ./start_job_single.sh $RUN '8 9 10 11' $RUNTYPE
sbatch ./start_job_single.sh $RUN '12 13 14 15' $RUNTYPE
else
echo please specify a run number and type
echo available runtypes:
echo energyscan, energyscan_pumped, static, static_IR, delayscan, timescan
fi
doc/sview.png

47.1 KiB

Finding time overlap by transient reflectivity
----------------------------------------------
Transient reflectivity of the optical laser measured on a large bandgap material pumped by the FEL is often used at SCS to find the time overlap between the two beams. The example notebook
* :doc:`Transient reflectivity measurement <Transient reflectivity measurement>`
shows how to analyze such data, including correcting the delay by the bunch arrival monitor (BAM).
""" Toolbox for SCS.
Various utilities function to quickly process data measured at the SCS instruments.
Copyright (2019) SCS Team.
"""
import matplotlib.pyplot as plt
import numpy as np
from scipy.special import erfc
from scipy.optimize import curve_fit
def knife_edge(nrun, axisKey='scannerX', signalKey='FastADC4peaks', p0=None, full=False, plot=False):
''' Calculates the beam radius at 1/e^2 from a knife-edge scan by fitting with
erfc function: f(a, u) = a*erfc(u) or f(a, u) = a*erfc(-u) where
u = sqrt(2)*(x-x0)/w0 with w0 the beam radius at 1/e^2 and x0 the beam center.
Inputs:
nrun: xarray Dataset containing the detector signal and the motor
position.
axisKey: string, key of the axis against which the knife-edge is
performed.
signalKey: string, key of the detector signal.
p0: list, initial parameters used for the fit: x0, w0, a. If None, a beam
radius of 100 um is assumed.
full: bool: If False, returns the beam radius and standard error. If True,
returns the popt, pcov list of parameters and covariance matrix from
curve_fit.
plot: bool: If True, plots the data and the result of the fit.
Outputs:
If full is False, ndarray with beam radius at 1/e^2 in mm and standard
error from the fit in mm. If full is True, returns popt and pcov from
curve_fit function.
'''
def integPowerUp(x, x0, w0, a):
return a*erfc(-np.sqrt(2)*(x-x0)/w0)
def integPowerDown(x, x0, w0, a):
return a*erfc(np.sqrt(2)*(x-x0)/w0)
#get the number of pulses per train from the signal source:
dim = nrun[signalKey].dims[1]
#duplicate motor position values to match signal shape
#this is much faster than using nrun.stack()
positions = np.repeat(nrun[axisKey].values,
len(nrun[dim])).astype(nrun[signalKey].dtype)
#sort the data to decide which fitting function to use
sortIdx = np.argsort(positions)
positions = positions[sortIdx]
intensities = nrun[signalKey].values.flatten()[sortIdx]
# estimate a linear slope fitting the data to determine which function to fit
slope = np.cov(positions, intensities)[0][1]/np.var(positions)
if slope < 0:
func = integPowerDown
funcStr = 'a*erfc(np.sqrt(2)*(x-x0)/w0)'
else:
func = integPowerUp
funcStr = 'a*erfc(-np.sqrt(2)*(x-x0)/w0)'
if p0 is None:
p0 = [np.mean(positions), 0.1, np.max(intensities)/2]
popt, pcov = curve_fit(func, positions, intensities, p0=p0)
print('fitting function:', funcStr)
print('w0 = (%.1f +/- %.1f) um'%(popt[1]*1e3, pcov[1,1]**0.5*1e3))
print('x0 = (%.3f +/- %.3f) mm'%(popt[0], pcov[0,0]**0.5*1e3))
print('a = %e +/- %e '%(popt[2], pcov[2,2]**0.5*1e3))
if plot:
xfit = np.linspace(positions.min(), positions.max(), 1000)
yfit = func(xfit, *popt)
plt.figure(figsize=(7,4))
plt.scatter(positions, intensities, color='C1', label='exp', s=2, alpha=0.01)
plt.plot(xfit, yfit, color='C4',
label=r'fit $\rightarrow$ $w_0=$(%.1f $\pm$ %.1f) $\mu$m'%(popt[1]*1e3, pcov[1,1]**0.5*1e3))
leg = plt.legend()
for lh in leg.legendHandles:
lh.set_alpha(1)
plt.ylabel(signalKey)
plt.xlabel(axisKey + ' position [mm]')
plt.title(nrun.attrs['runFolder'])
plt.tight_layout()
if full:
return popt, pcov
else:
return np.array([popt[1], pcov[1,1]**0.5])
%% Cell type:markdown id: tags:
This notebook shows an example of X-ray absorption spectroscopy (XAS) and X-ray magnetic circular dichroism (XMCD) experiment.
%% Cell type:markdown id: tags:
# Import toolbox_scs and subpackages
%% Cell type:code id: tags:
``` python
import numpy as np
%matplotlib notebook
import matplotlib.pyplot as plt
plt.rcParams['figure.constrained_layout.use'] = True
import toolbox_scs as tb
import toolbox_scs.detectors as tbdet
import toolbox_scs.routines as tbr
import extra_data as ed
import xarray as xr
import logging
logging.basicConfig(level=logging.INFO)
log_root = logging.getLogger(__name__)
```
%% Cell type:markdown id: tags:
# Load data, extract pulse-resolved signals from detectors
Here, we use the ToolBox mnemonics to get the arrays: MCP1apd is the pulse-resolved peak-integrated TIM data from MCP 1, SCS_SA3 is the XGM data for SASE 3 pulses in SCS hutch, nrj is the monochromator energy in eV and magnet is the current applied to the magnet in A.
The function get_all_detectors() takes care of aligning the pulse-resolved data (TIM and XGM) according to the sase 3 bunch pattern.
%% Cell type:code id: tags:
``` python
runNB = 491
proposalNB = 900094
mcp = 'MCP1apd'
fields = [mcp, 'SCS_SA3', 'nrj', 'magnet']
run, ds = tb.load(proposalNB, runNB, fields)
mcp = mcp.replace('apd', 'peaks')
ds
```
%% Cell type:markdown id: tags:
## Split positive and negative magnetic fields, calculate XAS and XMCD spectra
Here we first define the monochromator energy bins, and apply the xas() subroutine to subsets of the data. The subsets correspond to positive magnet current and negative magnet current. The xas() routine returns (among other quantities) the binned energy and the absorption, which are then plotted for both positive and negative magnet currents.
%% Cell type:code id: tags:
``` python
nrj_bins_edges = np.linspace(704, 712, 31)
pxas = tbr.xas(ds.where(ds['magnet'] > 2, drop=True), bins=nrj_bins_edges, plot=False, Itkey=mcp)
nxas = tbr.xas(ds.where(ds['magnet'] < -2, drop=True), bins=nrj_bins_edges, plot=False, Itkey=mcp)
```
%% Cell type:code id: tags:
``` python
plt.figure()
plt.plot(pxas['nrj'], pxas['muA'])
plt.plot(nxas['nrj'], nxas['muA'])
plt.ylabel('$\mu_A$')
plt.xlabel('Energy [eV]')
```
%% Cell type:markdown id: tags:
We now calculate the XMCD using the positive and negative data returned by xas() routine.
%% Cell type:code id: tags:
``` python
res = tbr.xasxmcd(pxas, nxas)
```
%% Cell type:code id: tags:
``` python
plt.figure()
plt.plot(res['nrj'], res['muXAS'])
plt.ylabel('$\mu_A$')
plt.xlabel('Energy [eV]')
plt.twinx()
plt.plot(res['nrj'], -res['muXMCD'], c='C1')
plt.ylabel('$\mu_{XMCD}$')
```
%% Cell type:markdown id: tags:
# Only 2000 first train
%% Cell type:code id: tags:
``` python
subds = ds.isel({'trainId':slice(1600)})
pxas2 = tbr.xas(subds.where(subds['magnet'] > 2, drop=True), bins=nrj_bins_edges, plot=False, Itkey=mcp)
nxas2 = tbr.xas(subds.where(subds['magnet'] < -2, drop=True), bins=nrj_bins_edges, plot=False, Itkey=mcp)
```
%% Cell type:code id: tags:
``` python
plt.figure()
plt.plot(subds['nrj'])
plt.ylabel('energy [eV]')
plt.xlabel('train number')
plt.twinx()
plt.plot(subds['magnet'], c='C1')
plt.ylabel('magnet current [A]')
```
%% Cell type:code id: tags:
``` python
plt.figure()
plt.errorbar(pxas['nrj'], pxas['muA'], pxas['sterrA'], label='pos all trains')
plt.errorbar(pxas2['nrj'], pxas2['muA'], pxas2['sterrA'], c='C0', ls='--', label='pos first 2000 trains')
plt.errorbar(nxas['nrj'], nxas['muA'], nxas['sterrA'], label='neg all trains')
plt.errorbar(nxas2['nrj'], nxas2['muA'], nxas['sterrA'], c='C1', ls='--', label='neg first 2000 trains')
plt.legend()
plt.ylabel('$\mu_A$')
plt.xlabel('Energy [eV]')
```
%% Cell type:code id: tags:
``` python
res2 = tbr.xasxmcd(pxas2, nxas2)
```
%% Cell type:code id: tags:
``` python
plt.figure()
plt.plot(res['nrj'], res['muXAS'])
plt.plot(res2['nrj'], res2['muXAS'], ls='--', c='C0')
plt.ylabel('$\mu_A$')
plt.xlabel('Energy [eV]')
plt.twinx()
plt.plot(res['nrj'], -res['muXMCD'], c='C1')
plt.plot(res2['nrj'], -res2['muXMCD'], ls='--', c='C1')
plt.ylabel('$\mu_{XMCD}$')
```
%% Cell type:markdown id: tags:
# Mono up or down
%% Cell type:code id: tags:
``` python
from scipy.signal import savgol_filter
dE = np.gradient(savgol_filter(ds['nrj'], 9, 1))
mono_turn = dE > 0
plt.figure()
plt.plot(1e3*dE)
plt.ylabel('Delta energy (meV)')
plt.twinx()
plt.plot(mono_turn, ls='', marker='.', c='C1')
plt.ylabel('mono going up')
plt.title(f'run {runNB} p{proposalNB}')
```
%% Cell type:code id: tags:
``` python
ds['mono_turn'] = xr.DataArray(np.gradient(savgol_filter(ds['nrj'], 9, 1))>0, dims=['trainId'])
```
%% Cell type:code id: tags:
``` python
ds
```
%% Cell type:code id: tags:
``` python
subds = ds#.isel({'trainId':slice(1600)})
Eshift = 0.05
shift_subds = subds
shift_subds['nrj'] = subds['nrj'] + Eshift
pxas3a = tbr.xas(shift_subds.where((subds['magnet'] > 2)*subds['mono_turn'], drop=True),
bins=nrj_bins_edges, plot=False, Itkey=mcp)
nxas3a = tbr.xas(shift_subds.where((subds['magnet'] < -2)*subds['mono_turn'], drop=True),
bins=nrj_bins_edges, plot=False, Itkey=mcp)
shift_subds['nrj'] = subds['nrj'] - Eshift
pxas3b = tbr.xas(shift_subds.where((subds['magnet'] > 2)*np.logical_not(subds['mono_turn']), drop=True),
bins=nrj_bins_edges, plot=False, Itkey=mcp)
nxas3b = tbr.xas(shift_subds.where((subds['magnet'] < -2)*np.logical_not(subds['mono_turn']), drop=True),
bins=nrj_bins_edges, plot=False, Itkey=mcp)
```
%% Cell type:code id: tags:
``` python
plt.figure()
#plt.errorbar(pxas['nrj'], pxas['muA'], pxas['sterrA'])
plt.errorbar(pxas3a['nrj'], pxas3a['muA'], pxas3a['sterrA'], c='C0', ls='-', label='+H, mono up')
plt.errorbar(pxas3b['nrj'], pxas3b['muA'], pxas3b['sterrA'], c='C0', ls='--',
label=f'+H, mono down, dE = {-1e3*Eshift} meV')
#plt.errorbar(nxas['nrj'], nxas['muA'], nxas['sterrA'])
plt.errorbar(nxas3a['nrj'], nxas3a['muA'], nxas3a['sterrA'], c='C1', ls='-', label='-H, mono up')
plt.errorbar(nxas3b['nrj'], nxas3b['muA'], nxas3b['sterrA'], c='C1', ls='--',
label=f'-H, mono down, dE = {-1e3*Eshift} meV')
plt.xlabel('Energy (eV)')
plt.ylabel('XAS (arb. units)')
plt.title(f'run {runNB} p{proposalNB}')
plt.xlim([707, 709])
plt.ylim([3, 4.5])
plt.legend()
```
%% Cell type:code id: tags:
``` python
plt.figure()
plt.title(f'run {runNB} p{proposalNB}')
plt.xlabel('Energy (eV)')
plt.ylabel('XAS (mono up) - XAS (mono down) (arb. units)')
plt.plot(pxas3a['nrj'], pxas3a['muA'] - pxas3b['muA'])
plt.plot(nxas3a['nrj'], nxas3a['muA'] - nxas3b['muA'])
plt.ylim([-0.1, 0.18])
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:markdown id: tags:
# How to
This notebook has been created for the latest commit on DevelopmentRG.
- git fetch origin
- git checkout DevelopmentRG
some methods changed, so the processing scripts have to be adapted correspondingly (if you want to use the tim for the processing in some way).
# changes since last version
selection from the changelog that is relevant for you (changes since 1.1.1-rc2)
* (DSSCBinner):
- constructor: kwargs 'tim_names' and 'dssc_coords_stride'
in constructor the latter described the relation between
xgm/tim pulses to the dssc frames. Can be an int or a list.
- load_xgm(), load_tim(): loading formatted xgm tim data
- create_pulsemask(), kwargs use_data='xgm'/'tim'. The
pulsemask is created from either the xgm or tim data.
- get_xgm_binned(), get_tim_binned(): returns the
binned xgm/tim data
# comments
The methods used in the code below use the automatic peak integration for the adcs. We have 4mcp's. 3 of them are downstream of the sample, one is in the FFT just under the sample and measures fluorecence from the sample. the automatic peak integratio has been set up for the 3 downstream mcps only. So this code here only works for these mcps. If you would like to use the 4th as well you have to look at the raw data (example at the end of the notebook) and do the peak integration by hand (older version of toolbox). The raw data is recorded all the time anyway. Mcp4 has been switched on after run 181.
%% Cell type:code id: tags:
``` python
import importlib
import logging
import numpy as np
import xarray as xr
import pandas as pd
import extra_data as ed
import matplotlib.pyplot as plt
import toolbox_scs as tb
import toolbox_scs.detectors as tbdet
import toolbox_scs.misc as tbm
import toolbox_scs.detectors.dssc_plot as dssc_plot
```
%% Cell type:code id: tags:
``` python
logging.basicConfig(level=logging.INFO)
log_root = logging.getLogger(__name__)
#%matplotlib inline
#xr.set_options(display_style='text')
```
%% Cell type:code id: tags:
``` python
proposal_nb = 2719
run_nb = 194
run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
fpt = run_info['frames_per_train']
n_trains = run_info['number_of_trains']
trainIds = run_info['trainIds']
```
%% Output
INFO:extra_data.read_machinery:Found proposal dir '/gpfs/exfel/exp/SCS/202002/p002719' in 0.0048 s
%% Cell type:code id: tags:
``` python
n_trains
```
%% Output
9170
%% Cell type:code id: tags:
``` python
buckets_train = ['chunk1']*3000 + ['chunk2']*3000 + ['chunk3']*3170
buckets_pulse = np.linspace(0,fpt-1,fpt, dtype=int)
binner1 = tbdet.create_dssc_bins("trainId",trainIds,buckets_train)
binner2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,fpt-1,fpt, dtype=int),
buckets_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
```
%% Cell type:code id: tags:
``` python
bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb,
binners=binners,
dssc_coords_stride=2)
```
%% Output
INFO:extra_data.read_machinery:Found proposal dir '/gpfs/exfel/exp/SCS/202002/p002719' in 0.0031 s
INFO:extra_data.read_machinery:Found proposal dir '/gpfs/exfel/exp/SCS/202002/p002719' in 0.0027 s
%% Cell type:code id: tags:
``` python
bin_obj.get_info()
```
%% Output
Frozen(SortedKeysDict({'trainId': 3, 'pulse': 52, 'x': 128, 'y': 512}))
%% Cell type:code id: tags:
``` python
bin_obj.load_tim()
tim_binned = bin_obj.get_tim_binned()
```
%% Output
INFO:toolbox_scs.detectors.digitizers:Extracting ADQ412 peaks from ['MCP2apd', 'MCP1apd', 'MCP3apd'].
INFO:toolbox_scs.detectors.dssc_misc:loaded formatted tim data.
INFO:toolbox_scs.detectors.dssc:binned tim data according to dssc binners.
%% Cell type:code id: tags:
``` python
bin_obj.load_xgm()
xgm_binned = bin_obj.get_xgm_binned()
```
%% Output
INFO:toolbox_scs.detectors.xgm:Extracting XGM data from ['SCS_SA3'].
INFO:toolbox_scs.detectors.dssc_misc:loaded formatted xgm data.
INFO:toolbox_scs.detectors.dssc:binned xgm data according to dssc binners.
%% Cell type:code id: tags:
``` python
plt.plot(xgm_binned['xgm'][0,:])
plt.plot(xgm_binned['xgm'][1,:])
plt.plot(xgm_binned['xgm'][2,:])
```
%% Output
[<matplotlib.lines.Line2D at 0x2ba157986860>]
%% Cell type:code id: tags:
``` python
plt.plot(tim_binned['tim'][0,:])
plt.plot(tim_binned['tim'][1,:])
plt.plot(tim_binned['tim'][2,:])
```
%% Output
[<matplotlib.lines.Line2D at 0x2ba1571a5e80>]
%% Cell type:code id: tags:
``` python
plt.plot(bin_obj.xgm.sel(pulse=20)[0:3000], 'o')
plt.plot(bin_obj.xgm.sel(pulse=0)[0:3000], 'o')
```
%% Output
[<matplotlib.lines.Line2D at 0x2ba157b3f780>]
%% Cell type:code id: tags:
``` python
plt.plot(bin_obj.tim.sel(pulse=20)[0:3000], 'o')
plt.plot(bin_obj.tim.sel(pulse=0)[0:3000], 'o')
```
%% Output
[<matplotlib.lines.Line2D at 0x2ba1571da9b0>]
%% Cell type:code id: tags:
``` python
plt.plot(bin_obj.xgm.sel(pulse=2)[0:3000], bin_obj.tim.sel(pulse=2)[0:3000], '.')
```
%% Output
[<matplotlib.lines.Line2D at 0x2ba1579bee10>]
%% Cell type:code id: tags:
``` python
plt.plot(bin_obj.xgm.sel(pulse=2)[-3000::], bin_obj.tim.sel(pulse=2)[-3000::], '.')
```
%% Output
[<matplotlib.lines.Line2D at 0x2ba157513438>]
%% Cell type:code id: tags:
``` python
# this is how the correlation should look like ideally (run from last week)
```
from setuptools import setup, find_packages
with open('README.rst') as f:
readme = f.read()
with open('VERSION') as f:
_version = f.read()
_version = _version.strip("\n")
basic_analysis_reqs = ['numpy', 'scipy',] # and is readily available in Karabo
advanced_analysis_reqs = [
'pandas', 'imageio', 'xarray>=0.13.0', 'psutil', 'h5py', 'h5netcdf',]
interactive_reqs = ['ipykernel', 'matplotlib', 'tqdm',]
maxwell_reqs = ['joblib', 'papermill', 'dask[diagnostics]',
'extra_data', 'extra_geom', 'euxfel_bunch_pattern>=0.6',
'pyFAI',]
docs_reqs = ['sphinx', 'nbsphinx', 'sphinx-autoap', 'pydata-sphinx-theme']
setup(name='toolbox_scs',
version=_version,
description="A collection of code for the SCS beamline",
long_description=readme,
author='SCS team',
author_email='scs@xfel.eu',
url="https://git.xfel.eu/gitlab/SCS/ToolBox.git",
keywords='XAS, xgm, DSSC, FCCD, PPL',
license="GPL",
package_dir={'': 'src'},
packages=find_packages('src'),
package_data={},
install_requires=basic_analysis_reqs,
extras_require={
'advanced': advanced_analysis_reqs,
'interactive': interactive_reqs,
'maxwell': advanced_analysis_reqs + interactive_reqs + maxwell_reqs,
'docs': docs_reqs,
'test': ['pytest']
}
)
from .constants import *
from .detectors import *
# Module name is the same as a child function, we use alias to avoid conflict
import toolbox_scs.load as load_module
from .load import *
from .misc import *
from .mnemonics_machinery import *
from .routines import *
__all__ = (
# top-level modules
constants.__all__
+ load_module.__all__
+ mnemonics_machinery.__all__
# submodules
+ detectors.__all__
+ misc.__all__
+ routines.__all__
)
from . import knife_edge as knife_edge_module
from .knife_edge import *
__all__ = knife_edge_module.__all__
import numpy as np
from scipy import special
from scipy.optimize import curve_fit
__all__ = ['knife_edge', 'knife_edge_base']
def knife_edge(positions, intensities, axisRange=None, p0=None):
"""
Calculates the beam radius at 1/e^2 from a knife-edge scan by
fitting with erfc function: f(a,b,u) = a*erfc(u) + b or
where u = sqrt(2)*(x-x0)/w0 with w0 the beam radius at 1/e^2
and x0 the beam center.
Parameters
----------
positions : np.ndarray
Motor position values, typically 1D
intensities : np.ndarray
Intensity values, could be either 1D or 2D, with the number or rows
equivalent with the position size
axisRange : sequence of two floats or None
Edges of the scanning axis between which to apply the fit.
p0 : list of floats, numpy 1D array
Initial parameters used for the fit: x0, w0, a, b. If None, a beam
radius of 100 um is assumed.
Returns
-------
width : float
The beam radius at 1/e^2
std : float
The standard deviation of the width
"""
popt, pcov = knife_edge_base(positions, intensities,
axisRange=axisRange, p0=p0)
width, std = 0, 0
if popt is not None and pcov is not None:
width, std = np.abs(popt[1]), pcov[1, 1]**0.5
return width, std
def knife_edge_base(positions, intensities, axisRange=None, p0=None):
"""
The base implementation of the knife-edge scan analysis.
Calculates the beam radius at 1/e^2 from a knife-edge scan by
fitting with erfc function: f(a,b,u) = a*erfc(u) + b or
where u = sqrt(2)*(x-x0)/w0 with w0 the beam radius at 1/e^2
and x0 the beam center.
Parameters
----------
positions : np.ndarray
Motor position values, typically 1D
intensities : np.ndarray
Intensity values, could be either 1D or 2D, with the number or rows
equivalent with the position size
axisRange : sequence of two floats or None
Edges of the scanning axis between which to apply the fit.
p0 : list of floats, numpy 1D array
Initial parameters used for the fit: x0, w0, a, b. If None, a beam
radius of 100 um is assumed.
Returns
-------
popt : sequence of floats or None
The parameters of the resulting fit.
pcov : sequence of floats
The covariance matrix of the resulting fit.
"""
# Prepare arrays
positions, intensities = prepare_arrays(positions, intensities,
xRange=axisRange)
# Estimate initial fitting params
if p0 is None:
p0 = [np.mean(positions), 0.1, np.max(intensities) / 2, 0]
# Fit
popt, pcov = function_fit(erfc, positions, intensities, p0=p0)
return popt, pcov
def function_fit(func, x, y, **kwargs):
"""A wrapper for scipy.optimize curve_fit()
"""
# Fit
try:
popt, pcov = curve_fit(func, x, y, **kwargs)
except (TypeError, RuntimeError) as err:
print("Fit did not converge:", err)
popt, pcov = (None, None)
return popt, pcov
def prepare_arrays(arrX: np.ndarray, arrY: np.ndarray,
xRange=None, yRange=None):
"""
Preprocessing of the input x and y arrays.
This involves the following steps.
1. Converting the arrays to 1D of the same size
2. Select the ranges from the input x- and y-ranges
3. Retrieve finite values.
"""
# Convert both arrays to 1D of the same size
arrX, arrY = arrays_to1d(arrX, arrY)
# Select ranges
if xRange is not None:
low, high = xRange
if low == high:
raise ValueError('The supplied xRange is not a valid range.')
mask_ = range_mask(arrX, low, high)
arrX = arrX[mask_]
arrY = arrY[mask_]
if yRange is not None:
low, high = yRange
if low == high:
raise ValueError('The supplied xRange is not a valid range.')
mask_ = range_mask(arrY, low, high)
arrX = arrX[mask_]
arrY = arrY[mask_]
# Clean both arrays by only getting finite values
finite_idx = np.isfinite(arrX) & np.isfinite(arrY)
arrX = arrX[finite_idx]
arrY = arrY[finite_idx]
return arrX, arrY
def arrays_to1d(arrX: np.ndarray, arrY: np.ndarray):
"""Flatten two arrays and matches their sizes
"""
assert arrX.shape[0] == arrY.shape[0]
arrX, arrY = arrX.flatten(), arrY.flatten()
if len(arrX) > len(arrY):
arrY = np.repeat(arrY, len(arrX) // len(arrY))
else:
arrX = np.repeat(arrX, len(arrY) // len(arrX))
return arrX, arrY
def range_mask(array, minimum=None, maximum=None):
"""Retrieve the resulting array from the given minimum and maximum
"""
default = np.ones(array.shape, dtype=bool)
min_slice, max_slice = default, default
if minimum is not None:
if minimum > np.nanmax(array):
raise ValueError('The range minimum is too large.')
min_slice = array >= minimum
if maximum is not None:
if maximum < np.nanmin(array):
raise ValueError('The range maximum is too small.')
max_slice = array <= maximum
return min_slice & max_slice
def erfc(x, x0, w0, a, b):
return a * special.erfc(np.sqrt(2) * (x - x0) / w0) + b