Skip to content
Snippets Groups Projects
Interpolate_Constants.ipynb 12.3 KiB
Newer Older
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Interpolate Missing Constants #\n",
    "\n",
    "Version 0.1, Author: S. Hauf\n",
    "\n",
    "Currently available instances are LPD1M1 and AGIPD1M1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "detector_instance = \"LPD1M1\"  # the detector instance to get constants for e.g. LPD1M1, required\n",
    "valid_at = \"\"  # ISO formatted date for which constants shoudl be valid. Leave empty to get most current ones\n",
    "cal_db_interface = \"tcp://max-exfl015:5005\"\n",
    "modules = [-1]  # modules to get data from, in terms of numerical quadrant indices, range allowed\n",
    "beam_energy = \"None\"\n",
    "pixels_y = 256\n",
    "pixels_x = 256\n",
    "capacitor = \"5pF\"\n",
    "photon_energy = 9.2\n",
    "memory_cells = 512\n",
    "bias_voltage = 250.\n",
    "only_constants = \"SlopesCI\"\n",
    "force_interpolation = [4]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "dtype = \"LPD\" if \"LPD\" in detector_instance.upper() else \"AGIPD\"\n",
    "darkconst = [\"Offset\", \"Noise\", \"SlopesPC\", \"SlopesCI\", \"BadPixelsDark\", \"BadPixelsPC\", \"BadPixelsCI\"]\n",
    "skip = [\"BadPixels\"]\n",
    "\n",
    "overwrites = {\"LPD\": {\"SlopesFF\": {\"memory_cells\": 1},\n",
    "                      \"BadPixelsFF\": {\"memory_cells\": 1}}}\n",
    "\n",
    "only_constants = only_constants.split(\",\")\n",
    "\n",
    "if modules[0] == -1:\n",
    "    modules = list(range(16))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "source": [
    "import copy\n",
    "import datetime\n",
    "import inspect\n",
    "\n",
    "import h5py\n",
    "import matplotlib.pyplot as plt\n",
    "import numpy as np\n",
    "from cal_tools.enums import BadPixels\n",
    "%matplotlib inline\n",
    "from iCalibrationDB import Conditions, ConstantMetaData, Constants, Detectors, Versions\n",
    "\n",
    "\n",
    "def extend_parms(detector_instance):\n",
    "    import inspect\n",
    "\n",
    "    from iCalibrationDB import Conditions\n",
    "    existing = set()\n",
    "    def extract_parms(cls):\n",
    "        args, varargs, varkw, defaults = inspect.getargspec(cls.__init__)\n",
    "        pList = []        \n",
    "        for i, arg in enumerate(args[1:][::-1]):\n",
    "            if arg in existing:\n",
    "                continue\n",
    "            \n",
    "            existing.add(arg)\n",
    "            \n",
    "            if i < len(defaults):\n",
    "                default = defaults[::-1][i]                \n",
    "                if str(default).isdigit():\n",
    "                    pList.append(\"{} = {}\".format(arg, default))\n",
    "                elif default is None or default == \"None\":\n",
    "                    pList.append(\"{} = \\\"None\\\"\".format(arg))\n",
    "                else:\n",
    "                    pList.append(\"{} = \\\"{}\\\"\".format(arg, default))\n",
    "            else:\n",
    "                pList.append(\"{} = 0.  # required\".format(arg))\n",
    "        return set(pList[::-1])  # mandatories first\n",
    "    dtype = \"LPD\" if \"LPD\" in detector_instance.upper() else \"AGIPD\"\n",
    "    all_conditions = set()\n",
    "    for c in dir(Conditions):\n",
    "        if c[:2] != \"__\":\n",
    "            condition = getattr(Conditions, c)\n",
    "            parms = extract_parms(getattr(condition, dtype))\n",
    "            [all_conditions.add(p) for p in parms]\n",
    "    return \"\\n\".join(all_conditions)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "det = getattr(Detectors, detector_instance)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "ip = get_ipython()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "first_cell = next(ip.history_manager.get_range(ip.history_manager.get_last_session_id(), 1, 2, raw=True))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "_, _, code = first_cell\n",
    "code = code.split(\"\\n\")\n",
    "parms = {}\n",
    "for c in code:\n",
    "    n, v = c.split(\"=\")\n",
    "    n = n.strip()\n",
    "    v = v.strip()\n",
    "    try:\n",
    "        parms[n] = float(v)\n",
    "    except:\n",
    "        parms[n] = str(v) if not isinstance(v, str) else v\n",
    "    if parms[n] == \"None\" or parms[n] == \"'None'\" or parms[n] == '\"None\"':\n",
    "        parms[n] = None"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "source": [
    "parms"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
    "collapsed": false,
    "scrolled": false
   "source": [
    "detector = getattr(Detectors, detector_instance)\n",
    "dconstants = getattr(Constants, dtype)\n",
    "for const in dir(dconstants):\n",
    "    if const[:2] != \"__\" and (const in only_constants or not only_constants):\n",
    "        missing = []\n",
    "        data = []\n",
    "        for im in modules:\n",
    "            qm = \"Q{}M{}\".format(im//4+1, im%4+1)\n",
    "            module = getattr(detector, qm)\n",
    "\n",
    "\n",
    "            if const in skip:\n",
    "                continue\n",
    "            \n",
    "            cparms = copy.copy(parms)\n",
    "            if dtype in overwrites:\n",
    "                do = overwrites[dtype]\n",
    "                if const in do:\n",
    "                    for arg, v in do[const].items():\n",
    "                        cparms[arg] = v\n",
    "            \n",
    "            try:\n",
    "                metadata = ConstantMetaData()\n",
    "                cons = getattr(dconstants, const)()\n",
    "                metadata.calibration_constant = cons\n",
    "\n",
    "                # set the operating condition\n",
    "\n",
    "                cond =  Conditions.Dark if const in darkconst else Conditions.Illuminated\n",
    "\n",
    "                condition = getattr(cond, dtype)\n",
    "\n",
    "                args, varargs, varkw, defaults = inspect.getargspec(condition.__init__)\n",
    "                alist = []\n",
    "                plist = {}\n",
    "                for i, arg in enumerate(args[1:][::-1]):\n",
    "                    #if i < len(defaults):\n",
    "                    #    plist[arg] = parms[arg]\n",
    "                    #else:\n",
    "                    #    alist.append(parms[arg])\n",
    "                    plist[arg] = cparms[arg]\n",
    "\n",
    "\n",
    "                condition = condition(**plist)\n",
    "\n",
    "                metadata.detector_condition = condition\n",
    "\n",
    "                # specify the a version for this constant\n",
    "                if valid_at is None or valid_at == \"\":\n",
    "                    creation_time = datetime.datetime.now()\n",
    "                    metadata.calibration_constant_version = Versions.Now(\n",
    "                        device=module)\n",
    "                else:\n",
    "                    metadata.calibration_constant_version = Versions.Timespan(\n",
    "                        device=module,\n",
    "                        start=valid_at)\n",
    "                    creation_time = valid_at\n",
    "\n",
    "                ctime = creation_time.isoformat() if not isinstance(creation_time, str) else creation_time\n",
    "\n",
    "                metadata.retrieve(cal_db_interface, when=ctime)\n",
    "                if im in force_interpolation or np.all(np.isnan(metadata.calibration_constant.data)) or np.count_nonzero(metadata.calibration_constant.data) == 0 or np.all(~np.isfinite(metadata.calibration_constant.data)):\n",
    "                    missing.append(im)\n",
    "                    print(\"Not good {}\".format(im))\n",
    "                    print(qm)\n",
    "                    fig = plt.figure(figsize=(10,10))\n",
    "                    ax = fig.add_subplot(111)\n",
    "                    ax.imshow(metadata.calibration_constant.data[...,0,0,0], vmin=0, vmax=2)\n",
    "                    data.append(metadata.calibration_constant.data)\n",
    "            except Exception as e:\n",
    "                print(\"Failed for const {} of {}: {}\".format(const, qm, e))\n",
    "                missing.append(im)\n",
    "       \n",
    "        if len(data) == 0:\n",
    "            print(\"No data for {} available, skipping\".format(const))\n",
    "            continue\n",
    "        data = np.array(data)\n",
    "                    \n",
    "        print(\"Interpolated {} modules for constants {}.\".format(len(missing), const))\n",
    "        if not \"BadPixel\" in const:\n",
    "            interpol = np.nanmedian(data, axis=0)\n",
    "        else:\n",
    "            interpol = np.zeros(data.shape[1:], np.uint32)\n",
    "            interpol[...] = BadPixels.INTERPOLATED.value\n",
    "            print(\"Badpixels interpolation will be set to INTERPOLATED\")\n",
    "        print(\"Interpolated shape is {} vs. {}\".format(interpol.shape, data[0].shape if len(data) else None))\n",
    "        for i in missing:\n",
    "            qm = \"Q{}M{}\".format(i//4+1, i%4+1)\n",
    "            module = getattr(detector, qm)\n",
    "            print(\"will add {}\".format(qm))\n",
    "            try:\n",
    "                metadata = ConstantMetaData()\n",
    "                cons = getattr(dconstants, const)()\n",
    "                metadata.calibration_constant = cons\n",
    "\n",
    "                # set the operating condition\n",
    "\n",
    "                cond =  Conditions.Dark if const in darkconst else Conditions.Illuminated\n",
    "\n",
    "                condition = getattr(cond, dtype)\n",
    "\n",
    "                args, varargs, varkw, defaults = inspect.getargspec(condition.__init__)\n",
    "                alist = []\n",
    "                plist = {}\n",
    "                for i, arg in enumerate(args[1:][::-1]):\n",
    "                    #if i < len(defaults):\n",
    "                    #    plist[arg] = parms[arg]\n",
    "                    #else:\n",
    "                    #    alist.append(parms[arg])\n",
    "                    plist[arg] = cparms[arg]\n",
    "\n",
    "\n",
    "                condition = condition(**plist)\n",
    "\n",
    "                metadata.detector_condition = condition\n",
    "\n",
    "                # specify the a version for this constant\n",
    "                \n",
    "                creation_time = datetime.datetime.now()\n",
    "                metadata.calibration_constant_version = Versions.Now(\n",
    "                    device=module)\n",
    "                \n",
    "                ctime = creation_time.isoformat() if not isinstance(creation_time, str) else creation_time\n",
    "                metadata.calibration_constant.data = interpol\n",
    "                metadata.send(cal_db_interface)                                \n",
    "            except Exception as e:\n",
    "                print(\"Failed for const {} of {}: {}\".format(const, qm, e))\n",
    "                "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}