{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Interpolate Missing Constants #\n", "\n", "Version 0.1, Author: S. Hauf\n", "\n", "Currently available instances are LPD1M1 and AGIPD1M1" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "detector_instance = \"LPD1M1\" # the detector instance to get constants for e.g. LPD1M1, required\n", "valid_at = \"\" # ISO formatted date for which constants shoudl be valid. Leave empty to get most current ones\n", "cal_db_interface = \"tcp://max-exfl015:5005\"\n", "modules = [-1] # modules to get data from, in terms of numerical quadrant indices, range allowed\n", "beam_energy = \"None\"\n", "pixels_y = 256\n", "pixels_x = 256\n", "capacitor = \"5pF\"\n", "photon_energy = 9.2\n", "memory_cells = 512\n", "bias_voltage = 250.\n", "only_constants = \"SlopesCI\"\n", "force_interpolation = [4]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "dtype = \"LPD\" if \"LPD\" in detector_instance.upper() else \"AGIPD\"\n", "darkconst = [\"Offset\", \"Noise\", \"SlopesPC\", \"SlopesCI\", \"BadPixelsDark\", \"BadPixelsPC\", \"BadPixelsCI\"]\n", "skip = [\"BadPixels\"]\n", "\n", "overwrites = {\"LPD\": {\"SlopesFF\": {\"memory_cells\": 1},\n", " \"BadPixelsFF\": {\"memory_cells\": 1}}}\n", "\n", "only_constants = only_constants.split(\",\")\n", "\n", "if modules[0] == -1:\n", " modules = list(range(16))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import copy\n", "import datetime\n", "import inspect\n", "\n", "import h5py\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "from cal_tools.enums import BadPixels\n", "\n", "%matplotlib inline\n", "\n", "from iCalibrationDB import Conditions, ConstantMetaData, Constants, Detectors, Versions\n", "\n", "\n", "def extend_parms(detector_instance):\n", " import inspect\n", "\n", " from iCalibrationDB import Conditions\n", " existing = set()\n", " def extract_parms(cls):\n", " args, varargs, varkw, defaults = inspect.getargspec(cls.__init__)\n", " pList = [] \n", " for i, arg in enumerate(args[1:][::-1]):\n", " if arg in existing:\n", " continue\n", " \n", " existing.add(arg)\n", " \n", " if i < len(defaults):\n", " default = defaults[::-1][i] \n", " if str(default).isdigit():\n", " pList.append(\"{} = {}\".format(arg, default))\n", " elif default is None or default == \"None\":\n", " pList.append(\"{} = \\\"None\\\"\".format(arg))\n", " else:\n", " pList.append(\"{} = \\\"{}\\\"\".format(arg, default))\n", " else:\n", " pList.append(\"{} = 0. # required\".format(arg))\n", " return set(pList[::-1]) # mandatories first\n", " dtype = \"LPD\" if \"LPD\" in detector_instance.upper() else \"AGIPD\"\n", " all_conditions = set()\n", " for c in dir(Conditions):\n", " if c[:2] != \"__\":\n", " condition = getattr(Conditions, c)\n", " parms = extract_parms(getattr(condition, dtype))\n", " [all_conditions.add(p) for p in parms]\n", " return \"\\n\".join(all_conditions)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "det = getattr(Detectors, detector_instance)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "ip = get_ipython()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "first_cell = next(ip.history_manager.get_range(ip.history_manager.get_last_session_id(), 1, 2, raw=True))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "_, _, code = first_cell\n", "code = code.split(\"\\n\")\n", "parms = {}\n", "for c in code:\n", " n, v = c.split(\"=\")\n", " n = n.strip()\n", " v = v.strip()\n", " try:\n", " parms[n] = float(v)\n", " except:\n", " parms[n] = str(v) if not isinstance(v, str) else v\n", " if parms[n] == \"None\" or parms[n] == \"'None'\" or parms[n] == '\"None\"':\n", " parms[n] = None" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "parms" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "scrolled": false }, "outputs": [], "source": [ "detector = getattr(Detectors, detector_instance)\n", "dconstants = getattr(Constants, dtype)\n", "for const in dir(dconstants):\n", " if const[:2] != \"__\" and (const in only_constants or not only_constants):\n", " missing = []\n", " data = []\n", " for im in modules:\n", " qm = \"Q{}M{}\".format(im//4+1, im%4+1)\n", " module = getattr(detector, qm)\n", "\n", "\n", " if const in skip:\n", " continue\n", " \n", " cparms = copy.copy(parms)\n", " if dtype in overwrites:\n", " do = overwrites[dtype]\n", " if const in do:\n", " for arg, v in do[const].items():\n", " cparms[arg] = v\n", " \n", " try:\n", " metadata = ConstantMetaData()\n", " cons = getattr(dconstants, const)()\n", " metadata.calibration_constant = cons\n", "\n", " # set the operating condition\n", "\n", " cond = Conditions.Dark if const in darkconst else Conditions.Illuminated\n", "\n", " condition = getattr(cond, dtype)\n", "\n", " args, varargs, varkw, defaults = inspect.getargspec(condition.__init__)\n", " alist = []\n", " plist = {}\n", " for i, arg in enumerate(args[1:][::-1]):\n", " #if i < len(defaults):\n", " # plist[arg] = parms[arg]\n", " #else:\n", " # alist.append(parms[arg])\n", " plist[arg] = cparms[arg]\n", "\n", "\n", " condition = condition(**plist)\n", "\n", " metadata.detector_condition = condition\n", "\n", " # specify the a version for this constant\n", " if valid_at is None or valid_at == \"\":\n", " creation_time = datetime.datetime.now()\n", " metadata.calibration_constant_version = Versions.Now(\n", " device=module)\n", " else:\n", " metadata.calibration_constant_version = Versions.Timespan(\n", " device=module,\n", " start=valid_at)\n", " creation_time = valid_at\n", "\n", " ctime = creation_time.isoformat() if not isinstance(creation_time, str) else creation_time\n", "\n", " metadata.retrieve(cal_db_interface, when=ctime)\n", " if im in force_interpolation or np.all(np.isnan(metadata.calibration_constant.data)) or np.count_nonzero(metadata.calibration_constant.data) == 0 or np.all(~np.isfinite(metadata.calibration_constant.data)):\n", " missing.append(im)\n", " print(\"Not good {}\".format(im))\n", " else:\n", " print(qm)\n", " fig = plt.figure(figsize=(10,10))\n", " ax = fig.add_subplot(111)\n", " ax.imshow(metadata.calibration_constant.data[...,0,0,0], vmin=0, vmax=2)\n", " data.append(metadata.calibration_constant.data)\n", " except Exception as e:\n", " print(\"Failed for const {} of {}: {}\".format(const, qm, e))\n", " missing.append(im)\n", " \n", " if len(data) == 0:\n", " print(\"No data for {} available, skipping\".format(const))\n", " continue\n", " data = np.array(data)\n", " \n", " print(\"Interpolated {} modules for constants {}.\".format(len(missing), const))\n", " if not \"BadPixel\" in const:\n", " interpol = np.nanmedian(data, axis=0)\n", " else:\n", " interpol = np.zeros(data.shape[1:], np.uint32)\n", " interpol[...] = BadPixels.INTERPOLATED.value\n", " print(\"Badpixels interpolation will be set to INTERPOLATED\")\n", " print(\"Interpolated shape is {} vs. {}\".format(interpol.shape, data[0].shape if len(data) else None))\n", " for i in missing:\n", " qm = \"Q{}M{}\".format(i//4+1, i%4+1)\n", " module = getattr(detector, qm)\n", " print(\"will add {}\".format(qm))\n", " try:\n", " metadata = ConstantMetaData()\n", " cons = getattr(dconstants, const)()\n", " metadata.calibration_constant = cons\n", "\n", " # set the operating condition\n", "\n", " cond = Conditions.Dark if const in darkconst else Conditions.Illuminated\n", "\n", " condition = getattr(cond, dtype)\n", "\n", " args, varargs, varkw, defaults = inspect.getargspec(condition.__init__)\n", " alist = []\n", " plist = {}\n", " for i, arg in enumerate(args[1:][::-1]):\n", " #if i < len(defaults):\n", " # plist[arg] = parms[arg]\n", " #else:\n", " # alist.append(parms[arg])\n", " plist[arg] = cparms[arg]\n", "\n", "\n", " condition = condition(**plist)\n", " \n", "\n", " metadata.detector_condition = condition\n", "\n", " # specify the a version for this constant\n", " \n", " creation_time = datetime.datetime.now()\n", " metadata.calibration_constant_version = Versions.Now(\n", " device=module)\n", " \n", " ctime = creation_time.isoformat() if not isinstance(creation_time, str) else creation_time\n", " metadata.calibration_constant.data = interpol\n", " metadata.send(cal_db_interface) \n", " except Exception as e:\n", " print(\"Failed for const {} of {}: {}\".format(const, qm, e))\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" } }, "nbformat": 4, "nbformat_minor": 2 }