Skip to content
Snippets Groups Projects
Commit 34ce4dec authored by Egor Sobolev's avatar Egor Sobolev
Browse files

Initial commit

parent 194c6587
No related branches found
No related tags found
No related merge requests found
# emacs backup
*~
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
include geomtools/sfx/report.ipynb
Detector geometry tools
----------------------------------
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "geomtools"
version = "0.0.1"
authors = [
{ name="European XFEL", email="da-support@xfel.eu" },
]
description = "Detector geometry tools"
readme = "README.md"
requires-python = ">=3.7"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"cfelpyutils",
"importlib_resources",
"matplotlib",
"nbconvert",
"numpy",
"pandas",
"papermill",
"psutil",
]
[project.scripts]
geomtools-push = "geomtools.sfx.report:push_geometry"
# flake8: noqa E401
from .sfx import (
parse_crystfel_streamfile, read_crystfel_streamfile, extract_geometry,
plot_center_shift, plot_cell_parameters, plot_peakogram, spacing,
gauss2d_fit, ellipse, parse_xwiz_summary,
)
import numpy as np
from extra_data import SourceNameError
AGIPD1M_MOTOR_KEYS = ([f"q{i // 2 + 1}m{i % 2 + 1}" for i in range(8)] +
["z_stepper"])
AGIPD1M_MOTORS = dict((motor, "SPB_IRU_AGIPD1M/MOTOR/" + motor.upper())
for motor in AGIPD1M_MOTOR_KEYS)
AGIPD1M_DATA_SELECTOR = "SPB_IRU_AGIPD1M1/MDL/DATA_SELECTOR"
def _make_node_name(device_id):
# Replace non alpha-numeric chars with blanks
mangled_id = ''.join(c if c.isalnum() else ' ' for c in device_id)
# Make camelCase
mangled_id = ''.join(mangled_id.title().split())
if mangled_id:
mangled_id = mangled_id[0].lower() + mangled_id[1:]
return mangled_id
def _mangle_device_id(device_id):
return ''.join(c if c.isalnum() else '_' for c in device_id)
def read_motor_positions(dc, motor_devices, data_selector_id=""):
try:
data_selector = dc[data_selector_id]
except SourceNameError:
data_selector = None
motors = {}
ts = set()
for motor_id, dev_id in motor_devices.items():
try:
src = dc[dev_id]
key = "actualPosition"
except SourceNameError:
if data_selector is None:
continue
src = data_selector
key = _make_node_name(dev_id) + ".actualPosition"
if key not in src.keys():
key = _mangle_device_id(dev_id) + ".actualPosition"
if key not in src.keys():
continue
pos = src[key].ndarray()
uniq_pos = np.unique(pos)
ts |= set(np.unique(src[key + ".timestamp"].ndarray()))
med_pos = np.median(pos)
if len(uniq_pos) != 1 and np.any(abs(uniq_pos - med_pos) > 0.001):
raise RuntimeError("Motor {motor_id} was moved during the runs")
motors[motor_id] = float(med_pos)
return ts, motors
# flake8: noqa E401
from .crystfelio import (
parse_crystfel_streamfile, read_crystfel_streamfile, extract_geometry
)
from .draw import plot_center_shift, plot_cell_parameters, plot_peakogram
from .lattice import spacing
from .misc import gauss2d_fit, ellipse
from .xwizio import parse_xwiz_summary
import io
import time
import psutil
import numpy as np
import pandas as pd
import multiprocessing as mp
from .lattice import spacing
from cfelpyutils.crystfel_stream import (
parse_chunk, CHUNK_START_MARKER, CHUNK_END_MARKER
)
GEOM_START_MARKER = "----- Begin geometry file -----"
GEOM_END_MARKER = "----- End geometry file -----"
def _read_to_line(f, marker, end):
while f.tell() < end:
line = f.readline()
if not line or line.strip() == marker:
return
def _buffer_to_line(f, marker):
s = io.StringIO()
line = f.readline()
while line and line.strip() != marker:
s.write(line)
line = f.readline()
s.seek(0)
return s
def parse_crystfel_streamfile(stream_filename, begin=0, end=None):
lattice_params = ['lattice_type', 'centering', 'unique_axis']
lattice_arrays = {
'Cell parameters/lengths': ['a', 'b', 'c'],
'Cell parameters/angles': ['alpha', 'beta', 'gamma'],
'predict_refine/det_shift': ['xc', 'yc']
}
cell_columns = ['a', 'b', 'c', 'alpha', 'beta', 'gamma']
peaks = []
lattices = []
reflexes = []
frame_ix = 0
cryst_ix = 0
with open(stream_filename, 'r') as f:
if end is None:
end = f.seek(0, 2)
f.seek(begin, 0)
_read_to_line(f, CHUNK_START_MARKER, end)
while f.tell() < end:
buffer = _buffer_to_line(f, CHUNK_END_MARKER)
if not buffer:
break
chunk = parse_chunk(buffer, peak_tbl=True, refl_tbl=True)
# append peaks
pe = chunk['peaks']
pe['frame'] = frame_ix
peaks.append(pe)
# append lattice
for crystal in chunk['crystals']:
la = {'frame': frame_ix}
la.update(dict(
[(param, crystal[param]) for param in lattice_params]))
for arr_name, col_names in lattice_arrays.items():
la.update(dict(zip(col_names, crystal[arr_name].tolist())))
for param in ['a', 'b', 'c']:
la[param] *= 10
lattices.append(la)
la_kwargs = dict((name, la[name]) for name in cell_columns)
re = crystal['reflections']
re['res'] = np.sqrt(spacing(
re.h.values, re.k.values, re.l.values, **la_kwargs))
re['frame'] = frame_ix
re['cryst'] = cryst_ix
reflexes.append(re)
cryst_ix += 1
frame_ix += 1
_read_to_line(f, CHUNK_START_MARKER, end)
peaks = pd.concat(peaks, ignore_index=True)
reflexes = pd.concat(reflexes, ignore_index=True)
lattices = pd.DataFrame(lattices)
return peaks, lattices, reflexes, frame_ix
def extract_geometry(stream_filename):
"""Extracts geometry from Crystfel stream file.
Input
-----
stream_filename: str
Crystfel stream file name
Returns
-------
StringIO buffer with geometry file
"""
with open(stream_filename, 'r') as f:
line = f.readline()
while line and line.strip() != GEOM_START_MARKER:
if line.strip() == CHUNK_START_MARKER:
return
line = f.readline()
geom = _buffer_to_line(f, GEOM_END_MARKER)
return geom
def _split_file(filename, nproc=20, partsize=None, nbytes=None):
"""Split file on portions."""
if nbytes is None:
with open(filename, 'r') as f:
nbytes = f.seek(0, 2)
if partsize is None:
npart = nproc
partsize = nbytes // npart + (nbytes % npart != 0)
else:
npart = nbytes // partsize + (nbytes % partsize != 0)
return [
(filename, i * nbytes // npart, (i + 1) * nbytes // npart)
for i in range(npart)
]
def _read_chunk(args):
stream_filename, begin, end = args
return parse_crystfel_streamfile(stream_filename, begin=begin, end=end)
def read_crystfel_streamfile(stream_filename, disp=False):
"""Read Crystfel stream file in parallel.
Input
-----
stream_filename: str
Crystfel stream file name
disp: bool
Print progressbar if True
Returns
-------
three pandas DataFrames (peaks, lattices and reflexes)
and number of parsed chunks
"""
tm0 = time.monotonic()
nproc = psutil.cpu_count(logical=False)
slices = _split_file(stream_filename, nproc, 3 << 23)
nslice = len(slices)
if disp:
print(f"nproc: {nproc} nslice: {nslice}")
with mp.Pool(min(nproc, nslice)) as pool:
res = pool.imap(_read_chunk, slices)
peak = []
reflex = []
lattice = []
nframe = 0
ncryst = 0
for i, (pe, la, re, n) in enumerate(res):
pe.frame += nframe
re.frame += nframe
re.cryst += ncryst
peak.append(pe)
reflex.append(re)
lattice.append(la)
nframe += n
ncryst += len(la)
if disp:
progress = i / nslice
bar = '=' * int(progress * 40)
elapsed = time.monotonic() - tm0
remained = (elapsed * (1. / progress - 1.)
if progress > .0 else np.inf)
print(f"[{bar:<40s}] {progress:6.1%} "
f"elapsed:{elapsed:6.0f}s, remained:{remained: 6.0f}s",
end='\r')
if disp:
elapsed = time.monotonic() - tm0
progress = 1.0
bar = '=' * 40
print(f"[{bar:<40s}] 100.0% elapsed:{elapsed:6.0f}s, remained: 0s")
peak = pd.concat(peak, ignore_index=True)
lattice = pd.concat(lattice, ignore_index=True)
reflex = pd.concat(reflex, ignore_index=True)
return peak, lattice, reflex, nframe
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import patches as mpatch
from mpl_toolkits.axes_grid1 import make_axes_locatable
from matplotlib.colors import LogNorm
from .misc import gauss2d_fit, ellipse
def plot_center_shift(lattice, figsize=(12, 12), frameon=False, **kwargs):
x, y = lattice.xc.values, lattice.yc.values
p = gauss2d_fit(x, y)
A, Mx, My, Dx, Ds, Dy = p
w, v = ellipse(p)
b, a = np.sqrt(w)
limx, limy = np.sqrt(Dx), np.sqrt(Dy)
theta = np.arctan2(v[0, 0], v[1, 0])*180/np.pi
side = 6 * max(limx, limy)
r = np.array([x, y])
ru, cu = np.unique(r, axis=1, return_counts=True)
fig, ax = plt.subplots(1, 1, figsize=figsize, frameon=frameon, **kwargs)
px = ax.scatter(ru[0], ru[1], s=1, c=cu)
ax.set_xlim(Mx - side, Mx + side)
ax.set_ylim(My - side, My + side)
ax.set_xlabel('x shift (mm)', fontsize=12)
ax.set_ylabel('y shift (mm)', fontsize=12)
ax.set_aspect('equal')
ax.grid(True)
ax.add_patch(mpatch.Ellipse((Mx, My), a*8, b*8,
angle=-theta, fill=False, ec='r'))
ax.plot(Mx, My, 'o', color='purple')
ax.plot(0, 0, 'o', color='cyan')
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.1)
plt.colorbar(px, cax=cax)
cax.set_ylabel('counts', fontsize=12)
return fig, ax
def plot_cell_parameters(lattice, figsize=(16, 5), frameon=False):
columns = ['a', 'b', 'c', 'alpha', 'beta', 'gamma']
fig, axs = plt.subplots(2, 3, figsize=figsize, frameon=frameon)
for ax, col in zip(axs.flat, columns):
lab = col if len(col) == 1 else '\\' + col
unit = '\\AA' if len(col) == 1 else '\\degree'
a = lattice[col].values
mu = np.mean(a)
sig = np.std(a)
h, e = np.histogram(a, range=(mu-6*sig, mu+6*sig),
bins=50, density=True)
ax.bar(0.5*(e[1:] + e[:-1]), h,
width=(e[1]-e[0]), color='k', edgecolor='w')
b = (e - mu) / sig
amp = 1. / np.sqrt(2*np.pi*sig*sig)
ax.plot(e, np.exp(-0.5 * b * b) * amp, 'r')
ymx = max(amp, np.max(h))
ax.set_ylim(0, 1.1*ymx)
ax.set_xlim(mu-6*sig, mu+6*sig)
ax.text(mu-5.7*sig, 0.9*ymx,
f"${lab}={mu:.2f}\\pm{sig:.2f} {unit}$", fontsize=12)
ax.yaxis.set_ticks([])
for side in ['top', 'right', 'left']:
ax.spines[side].set_visible(False)
return fig, axs
def plot_peakogram(reflex, figsize=(12, 11), frameon=False, **kwargs):
H, ex, ey = np.histogram2d(reflex.peak, reflex.res, bins=(600, 400))
H1 = np.ma.masked_array(H, H == 0)
fig, ax = plt.subplots(1, 1, figsize=figsize, frameon=frameon, **kwargs)
extent = (ey[0], ey[-1], ex[0], ex[-1])
pg = ax.matshow(H1, origin='lower', extent=extent, norm=LogNorm())
ax.set_aspect('auto', adjustable='box')
ax.xaxis.tick_bottom()
ax.set_ylabel('Reflection max intensity', fontsize=12)
ax.set_xlabel('$1/d \\quad (\\AA^{-1})$', fontsize=12)
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.1)
plt.colorbar(pg, cax=cax)
cax.set_ylabel('counts', fontsize=12)
return fig, ax
# flake8: noqa E741
import numpy as np
def spacing(h, k, l, a, b, c, alpha, beta, gamma):
alpha *= np.pi / 180.
beta *= np.pi / 180.
gamma *= np.pi / 180.
a2, b2, c2, abc = a * a, b * b, c * c, a * b * c
sna, snb, snc = np.sin(alpha), np.sin(beta), np.sin(gamma)
csa, csb, csc = np.cos(alpha), np.cos(beta), np.cos(gamma)
V2 = abc * abc * (
1. - csa * csa - csb * csb - csc * csc + 2. * csa * csb * csc)
S11 = b2 * c2 * sna * sna * h * h
S22 = a2 * c2 * snb * snb * k * k
S33 = a2 * b2 * snc * snc * l * l
S12 = 2 * abc * c * (csa * csb - csc) * h * k
S23 = 2 * abc * a * (csb * csc - csa) * k * l
S13 = 2 * abc * b * (csc * csa - csb) * h * l
d = (S11 + S22 + S33 + S12 + S23 + S13) / V2
return d
def spacing_default(h, k, l, a, b, c):
h_a, k_b, l_c = h / a, k / b, l / c
return h_a * h_a + k_b * k_b + l_c * l_c
def spacing_monoclinic(h, k, l, a, b, c, beta):
snb, csb = np.sin(beta), np.cos(beta)
snb2 = snb * snb
h_a, k_b, l_c = h / a, k / b, l / c
return (h_a * h_a + k_b * k_b * snb +
l_c * l_c - 2. * h_a * l_c * csb) / snb2
def spacing_hexagonal(h, k, l, a, b, c):
h_a, k_b, l_c = h / a, k / b, l / c
return 4. * (h_a * h_a + h_a * k_b + k_b * k_b) / 3. + l_c * l_c
def spacing_rhombohedral(h, k, l, a, alpha):
sna, csa = np.sin(a), np.cos(a)
csa2 = csa * csa
nm = ((h * h + k * k + l * l) * sna * sna +
2. * (h * k + k * l + h * l) * csa2 - csa)
dn = a * a * (1. - csa2 * (3 - 2. * csa))
return nm / dn
import numpy as np
def gauss2d_fit(x, y):
"""Computes parameters of 2D Gaussian approximation of input data
"""
n = ny = len(y)
nx = len(x)
assert nx == ny
My = np.sum(y) / n
Mx = np.sum(x) / n
y2 = np.sum(y * y) / n
x2 = np.sum(x * x) / n
xy = np.sum(x * y) / n
Dy = y2 - My*My
Dx = x2 - Mx*Mx
Ds = xy - Mx*My
D = Dx*Dy-Ds*Ds
A = n / np.sqrt(4*np.pi*np.pi*D)
return A, Mx, My, Dx, Ds, Dy
def ellipse(p):
"""Computes elipse parametes of 2D Gaussia isolines"""
A, Mx, My, Dx, Ds, Dy = p
DsDs = Ds * Ds
D = Dx * Dy - DsDs
T = 0.5 * (Dx + Dy)
G = np.sqrt(T * T - D)
a, b = T + G, T - G
u = Dx - a
d = np.sqrt(u * u + DsDs)
x, y = u / d, Ds / d
return np.array([b, a]), np.array([[x, y], [y, -x]])
%% Cell type:code id:3852d9ec-1143-4846-9f48-11751729d547 tags:parameters
``` python
xwiz_dir = ""
stream_file = ""
summary_file = ""
prefix=prefix = ""
output_dir = ""
```
%% Cell type:code id:19d522a3-26f5-4ac6-a4af-ee2139deafa2 tags:
``` python
import os
import matplotlib.pyplot as plt
from geomtools import (
read_crystfel_streamfile, extract_geometry, parse_xwiz_summary,
plot_center_shift, plot_cell_parameters, plot_peakogram,
)
```
%% Cell type:code id:776ff0aa tags:
``` python
xwiz_conf, summary = parse_xwiz_summary(summary_file)
propno = int(xwiz_conf['data']['proposal'])
runs = eval(xwiz_conf['data']['runs'])
pk, la, re, nfrm = read_crystfel_streamfile(stream_file, disp=False)
```
%% Cell type:markdown id:046f728b tags:
# EXtra-xwiz summary
%% Cell type:code id:7c2a58aa tags:
``` python
print(f"Proposal: {propno}")
print(f"Runs: {runs}")
print()
print(summary)
print()
print("From Crystfel stream file")
print(f"read: {nfrm:9d} chunks, {len(la):9d} crystalls")
```
%% Cell type:markdown id:3978ac22 tags:
# Cell parameters
%% Cell type:code id:2050288b-6bbd-4e5a-9854-04cde094ff6b tags:
``` python
fig, axs = plot_cell_parameters(la)
plt.show()
```
%% Cell type:markdown id:59980ed1 tags:
# Center shift
%% Cell type:code id:c4265f40-f0c4-4ad6-bfbd-21796fb1354c tags:
``` python
fig, ax = plot_center_shift(la)
plt.show()
```
%% Cell type:markdown id:d772ce42 tags:
# Peakogram
%% Cell type:code id:5ce7abee-fd09-48d3-85ef-cacc7b3a3468 tags:
``` python
fig, ax = plot_peakogram(re)
plt.show()
```
import argparse
import importlib_resources
import nbformat
import papermill as pm
import pathlib
import yaml
from traitlets.config import Config
from nbconvert import PDFExporter
from extra_data import open_run
from .xwizio import parse_xwiz_summary
from .crystfelio import extract_geometry
from geomtools.motor import (
read_motor_positions, AGIPD1M_MOTORS, AGIPD1M_DATA_SELECTOR
)
REPO = pathlib.Path("/gpfs/exfel/data/scratch/spb_geom")
DET = {
"AGIPD1M": ("*AGIPD1MCTRL*", AGIPD1M_MOTORS, AGIPD1M_DATA_SELECTOR),
"Jungfrau4M": ("*JNGFRCTRL*", {}, ""),
}
def make_report(notebook_filename, parameters):
report_ref = importlib_resources.files("geomtools.sfx") / "report.ipynb"
with importlib_resources.as_file(report_ref) as report_path:
pm.execute_notebook(
report_path.absolute(),
notebook_filename,
parameters=parameters,
)
notebook = nbformat.reads(notebook_filename.read_text(), as_version=4)
c = Config()
c.TemplateExporter.exclude_input = True
pdf_exporter = PDFExporter(config=c)
(body, resources) = pdf_exporter.from_notebook_node(notebook)
notebook_filename.with_suffix(".pdf").write_bytes(body)
notebook_filename.unlink()
def push_geometry():
parser = argparse.ArgumentParser(
description='Makes the detector geometry report')
parser.add_argument('-w', '--xwiz-dir', type=pathlib.Path,
help="Directory with EXtra-xwiz files")
parser.add_argument('-s', '--summary', required=True, type=pathlib.Path,
help="EXtra-xwiz summary file")
parser.add_argument('-i', '--stream', type=pathlib.Path,
help="Crystfel stream file")
parser.add_argument('-r', '--report-only', action="store_true")
parser.add_argument('detector', choices=DET.keys())
parser.add_argument('tag')
parser.add_argument('sample')
args = parser.parse_args()
if args.xwiz_dir is None:
xwiz_dir = args.summary.absolute().parent
summary_file = args.summary
else:
xwiz_dir = args.xwiz_dir
if not args.summary.is_absolute():
summary_file = xwiz_dir / args.summary
if not summary_file.exists():
summary_file = args.summary
print("EXtra-xwiz directory:", xwiz_dir)
print("EXtra-xwiz summary file:", summary_file)
try:
xwiz_conf, summary = parse_xwiz_summary(summary_file.absolute())
except Exception:
print("EXtra-xwiz summary file is not found. Quit...")
exit(-1)
propno = int(xwiz_conf['data']['proposal'])
runs = eval(xwiz_conf['data']['runs'])
print("Proposal:", propno)
print("Runs:", runs)
prefix = xwiz_conf['data'].get('list_prefix')
if prefix is None:
prefix = f"p{propno:06d}_r{runs[0]:04d}-{runs[-1]:04d}"
if args.stream is not None:
stream_file = args.stream
else:
stream_file = xwiz_dir / (prefix + "_hits.stream")
if not stream_file.exists():
stream_file = xwiz_dir / (prefix + ".stream")
if not stream_file.exists():
print("Crystfel stream file is not found. Quit...")
exit(-1)
print("Crystfel stream file:", stream_file)
ctrlaggr, motors, data_selector = DET[args.detector]
dc = open_run(propno, runs[0], data='raw', include=ctrlaggr)
date = dc.run_metadata()['creationDate']
day, _, tm = date.partition('T')
print("Data: ", day)
geom_id = '_'.join([args.detector, day, args.tag])
if not args.report_only:
output_dir = (REPO / args.detector / day[:4]
/ (day[4:] + '-' + args.tag))
print("Repo directory:", output_dir)
try:
output_dir.mkdir(parents=True, exist_ok=False)
except FileExistsError:
print("Target directory exists, use another 'tag'. Quit...")
exit(-1)
geom = extract_geometry(stream_file)
if geom is None:
print("Geometry is not found in crystfel stream file. "
"Use file from xwix.summary.")
geom = open(xwiz_conf['geom']['file_path'], 'r')
dc = open_run(propno, runs[0], include="*-DA??-*")
for runno in runs[1:]:
dc = dc.union(open_run(propno, runno, include="*-DA??-*"))
ts, motor_pos = read_motor_positions(dc, motors, data_selector)
meta = {
'proposal': propno,
'runs': runs,
'method': 'sfx',
'tool': 'geoptimiser',
'sample': args.sample,
'date': date,
'motors': motor_pos,
}
geom_file = output_dir / (geom_id + ".geom")
geom_file.write_text(geom.read())
geom.close()
geom_file.with_suffix(".yaml").write_text(yaml.dump(meta))
summary_copy = (output_dir / f"{geom_id}_xwiz.summary")
summary_copy.write_text(summary_file.read_text())
else:
print("Dry run, only generate report...")
output_dir = pathlib.Path.cwd()
notebook_filename = output_dir / f"{geom_id}_report.ipynb"
report_parameters = dict(
xwiz_dir=str(xwiz_dir.absolute()),
stream_file=str(stream_file.absolute()),
summary_file=str(summary_file.absolute()),
prefix=prefix,
output_dir=str(output_dir.absolute()),
)
make_report(notebook_filename, report_parameters)
if __name__ == "__main__":
push_geometry()
import os
XWIZ_SUMMARY_HEADER = "SUMMARY OF XWIZ WORKFLOW"
XWIZ_CONF_START_MARKER = "BASE CONFIGURATION USED"
XWIZ_CONF_END_MARKER = ""
def parse_xwiz_summary(filename):
with open(filename, "r") as f:
line = f.readline()
if line.strip() != XWIZ_SUMMARY_HEADER:
raise ValueError("Unexpected content of EXtra-xwiz summary file")
for line in f:
if line.strip() == XWIZ_CONF_START_MARKER:
break
xwiz_conf = {}
for line in f:
if line.strip() == XWIZ_CONF_END_MARKER:
break
key, _, val = line.partition(":")
key = key.strip()
val = val.strip()
if key == "Group":
group = xwiz_conf.setdefault(val, {})
else:
group[key] = val
cell_path = xwiz_conf["unit_cell"]["file"]
cell_name = os.path.basename(cell_path)
summary = ""
for line in f:
line = line.replace(cell_path, cell_name)
summary += line
return xwiz_conf, summary
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment