Skip to content
Snippets Groups Projects
DB_Constants_to_HDF5_NBC.ipynb 8.23 KiB
Newer Older
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Constants from DB to HDF5 #\n",
    "\n",
    "Version 0.1, Author: S. Hauf\n",
    "\n",
    "Currently available instances are LPD1M1 and AGIPD1M1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "detector_instance = \"LPD1M1\"  # the detector instance to get constants for e.g. LPD1M1, required\n",
    "out_file = \"/gpfs/exfel/data/scratch/haufs/test/test.h5\"  # HDF5 file to output constants into, required\n",
    "valid_at = \"\"  # ISO formatted date for which constants shoudl be valid. Leave empty to get most current ones\n",
    "cal_db_interface = \"tcp://max-exfl015:5005\"\n",
    "modules = [-1]  # modules to get data from, in terms of numerical quadrant indices, range allowed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "dtype = \"LPD\" if \"LPD\" in detector_instance.upper() else \"AGIPD\"\n",
    "darkconst = [\"Offset\", \"Noise\", \"SlopesPC\", \"SlopesCI\", \"BadPixelsDark\", \"BadPixelsPC\", \"BadPixelsCI\"]\n",
    "skip = [\"BadPixels\"]\n",
    "\n",
    "overwrites = {\"LPD\": {\"SlopesFF\": {\"memory_cells\": 1},\n",
    "                      \"BadPixelsFF\": {\"memory_cells\": 1}}}\n",
    "\n",
    "if modules[0] == -1:\n",
    "    modules = list(range(16))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import copy\n",
    "import datetime\n",
    "import inspect\n",
    "\n",
    "import h5py\n",
    "from iCalibrationDB import Conditions, ConstantMetaData, Constants, Detectors, Versions\n",
    "\n",
    "\n",
    "def extend_parms(detector_instance):\n",
    "    import inspect\n",
    "\n",
    "    from iCalibrationDB import Conditions\n",
    "    existing = set()\n",
    "    def extract_parms(cls):\n",
    "        args, varargs, varkw, defaults = inspect.getargspec(cls.__init__)\n",
    "        pList = []        \n",
    "        for i, arg in enumerate(args[1:][::-1]):\n",
    "            if arg in existing:\n",
    "                continue\n",
    "            \n",
    "            existing.add(arg)\n",
    "            \n",
    "            if i < len(defaults):\n",
    "                default = defaults[::-1][i]                \n",
    "                if str(default).isdigit():\n",
    "                    pList.append(\"{} = {}\".format(arg, default))\n",
    "                elif default is None or default == \"None\":\n",
    "                    pList.append(\"{} = \\\"None\\\"\".format(arg))\n",
    "                else:\n",
    "                    pList.append(\"{} = \\\"{}\\\"\".format(arg, default))\n",
    "            else:\n",
    "                pList.append(\"{} = 0.  # required\".format(arg))\n",
    "        return set(pList[::-1])  # mandatories first\n",
    "    dtype = \"LPD\" if \"LPD\" in detector_instance.upper() else \"AGIPD\"\n",
    "    all_conditions = set()\n",
    "    for c in dir(Conditions):\n",
    "        if c[:2] != \"__\":\n",
    "            condition = getattr(Conditions, c)\n",
    "            parms = extract_parms(getattr(condition, dtype))\n",
    "            [all_conditions.add(p) for p in parms]\n",
    "    return \"\\n\".join(all_conditions)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "det = getattr(Detectors, detector_instance)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "ip = get_ipython()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "first_cell = next(ip.history_manager.get_range(ip.history_manager.get_last_session_id(), 1, 2, raw=True))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "_, _, code = first_cell\n",
    "code = code.split(\"\\n\")\n",
    "parms = {}\n",
    "for c in code:\n",
    "    n, v = c.split(\"=\")\n",
    "    n = n.strip()\n",
    "    v = v.strip()\n",
    "    try:\n",
    "        parms[n] = float(v)\n",
    "    except:\n",
    "        parms[n] = str(v) if not isinstance(v, str) else v\n",
    "    if parms[n] == \"None\" or parms[n] == \"'None'\":\n",
    "        parms[n] = None"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "source": [
    "ofile = h5py.File(out_file, \"w\")\n",
    "\n",
    "detector = getattr(Detectors, detector_instance)\n",
    "for i in modules:\n",
    "    qm = \"Q{}M{}\".format(i//4+1, i%4+1)\n",
    "    module = getattr(detector, qm)\n",
    "    dconstants = getattr(Constants, dtype)\n",
    "    for const in dir(dconstants):\n",
    "        if const[:2] != \"__\":\n",
    "            \n",
    "            if const in skip:\n",
    "                continue\n",
    "            \n",
    "            cparms = copy.copy(parms)\n",
    "            if dtype in overwrites:\n",
    "                do = overwrites[dtype]\n",
    "                if const in do:\n",
    "                    for arg, v in do[const].items():\n",
    "                        cparms[arg] = v\n",
    "            \n",
    "            try:\n",
    "                metadata = ConstantMetaData()\n",
    "                cons = getattr(dconstants, const)()\n",
    "                metadata.calibration_constant = cons\n",
    "\n",
    "                # set the operating condition\n",
    "\n",
    "                cond =  Conditions.Dark if const in darkconst else Conditions.Illuminated\n",
    "\n",
    "                condition = getattr(cond, dtype)\n",
    "\n",
    "                args, varargs, varkw, defaults = inspect.getargspec(condition.__init__)\n",
    "                alist = []\n",
    "                plist = {}\n",
    "                for i, arg in enumerate(args[1:][::-1]):\n",
    "                    #if i < len(defaults):\n",
    "                    #    plist[arg] = parms[arg]\n",
    "                    #else:\n",
    "                    #    alist.append(parms[arg])\n",
    "                    plist[arg] = cparms[arg]\n",
    "\n",
    "\n",
    "                condition = condition(**plist)\n",
    "\n",
    "                metadata.detector_condition = condition\n",
    "\n",
    "                # specify the a version for this constant\n",
    "                if valid_at is None or valid_at == \"\":\n",
    "                    creation_time = datetime.datetime.now()\n",
    "                    metadata.calibration_constant_version = Versions.Now(\n",
    "                        device=module)\n",
    "                else:\n",
    "                    metadata.calibration_constant_version = Versions.Timespan(\n",
    "                        device=module,\n",
    "                        start=valid_at)\n",
    "                    creation_time = valid_at\n",
    "\n",
    "                ctime = creation_time.isoformat() if not isinstance(creation_time, str) else creation_time\n",
    "\n",
    "                metadata.retrieve(cal_db_interface, when=ctime)\n",
    "                \n",
    "                ofile[\"{}/{}/data\".format(qm, const)] = metadata.calibration_constant.data\n",
    "            except Exception as e:\n",
    "                print(\"Failed for const {} of {}: {}\".format(const, qm, e))\n",
    "ofile.close()         "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}