import argparse
import ast
import asyncio
import copy
import glob
import inspect
import json
import locale
import logging
import os
import shlex
import sqlite3
import sys
import urllib.parse
from asyncio import get_event_loop, shield
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import requests
import yaml
import zmq
import zmq.asyncio
import zmq.auth.thread
from git import InvalidGitRepositoryError, Repo
from metadata_client.metadata_client import MetadataClient

try:
    from .config import webservice as config
    from .messages import MDC, Errors, MigrationError, Success
except ImportError:
    from config import webservice as config
    from messages import MDC, Errors, MigrationError, Success


def init_job_db(config):
    """ Initialize the sqlite job database

    A new database is created if no pre-existing one is present. A single
    table is created: jobs, which has columns:

        rid, id, proposal, run, flg, status

    :param config: the configuration parsed from the webservice YAML config
    :return: a sqlite3 connection instance to the database
    """
    logging.info("Initializing database")
    conn = sqlite3.connect(config['web-service']['job-db'])
    conn.execute("PRAGMA foreign_keys = ON")

    conn.executescript("""
        CREATE TABLE IF NOT EXISTS requests(
            req_id INTEGER PRIMARY KEY,
            mymdc_id,
            proposal TEXT,
            run INTEGER,
            action,
            timestamp
        );
        CREATE TABLE IF NOT EXISTS executions(
            exec_id INTEGER PRIMARY KEY,
            req_id REFERENCES requests(req_id),
            command TEXT,
            det_type,
            karabo_id,
            success
        );
        CREATE INDEX IF NOT EXISTS exec_by_req ON executions(req_id);
        CREATE TABLE IF NOT EXISTS slurm_jobs(
            job_id INTEGER PRIMARY KEY,
            exec_id REFERENCES executions(exec_id),
            status,
            finished,
            elapsed
        );
        CREATE INDEX IF NOT EXISTS job_by_exec ON slurm_jobs(exec_id);
        CREATE INDEX IF NOT EXISTS job_by_finished ON slurm_jobs(finished);
    """)
    conn.row_factory = sqlite3.Row
    return conn


def init_md_client(config: Dict[str, Dict[str, str]]) -> MetadataClient:
    """Initialize an MDC client connection.

    :param config: the configuration parsed from the webservice YAML config
    :return: an MDC client connection
    """
    mdconf = config['metadata-client']
    client_conn = MetadataClient(client_id=mdconf['user-id'],
                                 client_secret=mdconf['user-secret'],
                                 user_email=mdconf['user-email'],
                                 token_url=mdconf['token-url'],
                                 refresh_url=mdconf['refresh-url'],
                                 auth_url=mdconf['auth-url'],
                                 scope=mdconf['scope'],
                                 base_api_url=mdconf['base-api-url'])
    return client_conn


def init_config_repo(config):
    """ Make sure the configuration repo is present and up-to-data

    :param config: the configuration defined in the `config-repo` section
    """
    # FIXME: keep this as func, but call this with asyncio.thread
    os.makedirs(config['local-path'], exist_ok=True)
    # check if it is a repo
    try:
        repo = Repo(config['local-path'])
    except InvalidGitRepositoryError:
        repo = Repo.clone_from(config['url'], config['local-path'])
    try:
        repo.remote().pull()
    except Exception:
        pass
    logging.info("Config repo is initialized")




async def upload_config(config, yaml, instrument, cycle, proposal) -> bytes:
    """ Upload a new configuration YAML

    :param config: the configuration defined in the `config-repo` section
        of the webservice.yaml configuration.
    :param yaml: the YAML contents to update
    :param instrument: instrument for which the update is for
    :param cycle: the facility cylce the update is for
    :param proposal: the proposal the update is for

    The YAML contents will be placed into a file at

        {config.local-path}/{cycle}/{proposal}.yaml

    If it exists it is overwritten and then the new version is pushed to
    the configuration git repo.
    """
    # FIXME: keep this as func, but call this with asyncio.thread
    repo = Repo(config['local-path'])
    # assure we are on most current version
    repo.remote().pull()
    prop_dir = os.path.join(repo.working_tree_dir, cycle)
    os.makedirs(prop_dir, exist_ok=True)
    with open("{}/{}.yaml".format(prop_dir, proposal), "w") as f:
        f.write(yaml)
    fpath = "{}/{}.yaml".format(prop_dir, proposal)
    repo.index.add([fpath])
    repo.index.commit(
        "Update to proposal p{}: {}".format(proposal,
                                            datetime.now().isoformat()))
    repo.remote().push()
    logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal))
    return Success.UPLOADED_CONFIG.format(cycle, proposal).encode()


def merge(source: Dict, destination: Dict) -> Dict:
    """Deep merge two dictionaries.

    :param source: source dictionary to merge into destination
    :param destination: destination dictionary which is being merged in
    :return: the updated destination dictionary

    Taken from:
    https://stackoverflow.com/questions/20656135/
                                       python-deep-merge-dictionary-data
    """
    for key, value in source.items():
        if isinstance(value, dict):
            # check if destination (e.g. karabo-id) has none value
            if key in destination.keys() and destination[key] is None:
                destination[key] = {}
            # get node or create one
            node = destination.setdefault(key, {})
            merge(value, node)
        else:
            destination[key] = value

    return destination


def change_config(config, updated_config, instrument,
                  cycle, proposal, apply=False) -> bytes:
    """ Change the configuration of a proposal

    If no proposal specific configuration yet exists, one is first created
    based on the default configuration of the proposal

    Changes are committed to git.

    :param config: repo config as given in YAML config file
    :param updated_config: a dictionary containing the updated config
    :param instrument: the instrument to change the config for
    :param cycle: the cycle to change config for
    :param proposal: the proposal to change config for
    :param apply: set to True to actually commit a change, otherwise a dry-run
                  is performed
    :return: The updated config to the requesting zmq socket
    """
    repo = Repo(config['local-path'])
    repo.remote().pull()
    # A cycle directory is created, if doesn't exists.
    cyc_dir = Path(repo.working_tree_dir, cycle)
    cyc_dir.mkdir(exist_ok=True)
    fpath = Path(cyc_dir, f"{int(proposal):06d}.yaml")

    # In case cycle/proposal.yaml doesn't exist,
    # new file is created based on default.yaml
    if not fpath.exists():
        with open(f"{repo.working_tree_dir}/default.yaml", "r") as f:
            defconf = yaml.safe_load(f)
            subconf = {}
            # Propsal.yaml should have dark, correct and data-mapping keys
            # with all detector dictionaries of the changed instrument.
            for action in ["dark", "correct"]:
                subconf[action] = {}
                subconf[action][instrument] = defconf[action][instrument]

            # Copy data-mapping for all detectors of an instrument.
            subconf["data-mapping"] = {}
            for k_id in defconf["dark"][instrument].keys():
                print("karabo-id", k_id)
                subconf["data-mapping"][k_id] = defconf["data-mapping"][k_id]
            with open(fpath, "w") as wf:
                wf.write(yaml.safe_dump(subconf, default_flow_style=False))

    new_conf = None
    with open(fpath, "r") as rf:
        existing_conf = yaml.safe_load(rf)
        new_conf = merge(updated_config, existing_conf)

    if apply:
        # Apply updated configuration to the proposal.yaml
        # and push it to the calibration_configurations remote reporsitory.
        with open(fpath, "w") as wf:
            wf.write(yaml.safe_dump(new_conf, default_flow_style=False))
        repo.index.add([str(fpath)])
        repo.index.commit(
            f"Update to proposal YAML: {datetime.now().isoformat()}")
        repo.remote().push()
    logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal))
    return yaml.safe_dump(new_conf, default_flow_style=False).encode()


async def run_proc_async(cmd: List[str]) -> Tuple[Optional[int], bytes, bytes]:
    """Run a subprocess to completion using asyncio, capturing stdout

    Returns the numeric exit code, stdout and stderr (bytes)
    """
    proc = await asyncio.create_subprocess_exec(
        *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await proc.communicate()
    return proc.returncode, stdout, stderr


def query_rid(conn, rid) -> bytes:
    c = conn.execute(
        "SELECT job_id, status FROM slurm_jobs "
        "INNER JOIN executions USING (exec_id) "
        "INNER JOIN requests USING (req_id) "
        "WHERE mymdc_id = ?", (rid,)
    )
    statuses = []
    for job_id, status in c.fetchall():
        logging.debug("Job %s has status %s", job_id, status)
        statuses.append(status)

    if statuses:
        return "\n".join(statuses).encode()
    return b'NA'


def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]:
    """Convert a dictionary to a list of arguments.

       Values that are not strings will be cast.
       Lists will be converted to several strings following their `--key`
       flag.
       Booleans will be converted to a `--key` flag, where `key` is the
       dictionary key.
    """
    for key, value in config.items():
        if ' ' in key or (isinstance(value, str) and ' ' in value):
            raise ValueError('Spaces are not allowed', key, value)

        if isinstance(value, list):
            cmd.append(f"--{key}")
            cmd += [str(v) for v in value]
        elif isinstance(value, bool):
            if value:
                cmd += ["--{}".format(key)]
        else:
            if value in ['""', "''"]:
                value = ""
            cmd += ["--{}".format(key), str(value)]

    return cmd




async def run_action(job_db, cmd, mode, proposal, run, exec_id) -> str:
    """Run action command (CORRECT or DARK).

    :param job_db: jobs database
    :param cmd: to run, should be a in list for as expected by subprocess.run
    :param mode: "prod" or "sim", in the latter case nothing will be executed
                 but the command will be logged
    :param proposal: proposal the command was issued for
    :param run: run the command was issued for
    :param exec_id: Execution ID in the local jobs database

    Returns a formatted Success or Error message indicating outcome of the
    execution.
    """
    if mode == "prod":
        logging.info(" ".join(cmd))
        retcode, stdout, stderr = await run_proc_async(cmd)
        if retcode != 0:
            logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, retcode))
            logging.error(f"Failed process stdout:\n%s\nFailed process stderr:\n%s",
                          stdout.decode(errors='replace'), stderr.decode(errors='replace'))
            return Errors.JOB_LAUNCH_FAILED.format(cmd, retcode)

        if "DARK" in cmd:
            message = Success.START_CHAR.format(proposal, run)
        else:
            message = Success.START_CORRECTION.format(proposal, run)

        # Save submitted jobs to persistent database.
        c = job_db.cursor()  # FIXME: asyncio
        rstr = stdout.decode()

        for r in rstr.split("\n"):
            if "Submitted the following SLURM jobs:" in r:
                _, jobids = r.split(":")

                jobs = []
                for jobid in jobids.split(','):
                    jobs.append((int(jobid.strip()), exec_id))
                c.executemany(
                    "INSERT INTO slurm_jobs VALUES (?, ?, 'PD', 0, 0)",
                    jobs
                )
        job_db.commit()

    else:  # mode == "sim"
        if "DARK" in cmd:
            message = Success.START_CHAR_SIM.format(proposal, run)
        else:
            message = Success.START_CORRECTION_SIM.format(proposal, run)

    logging.info(message)
    logging.info((" ".join(cmd)).replace(',', '').replace("'", ""))
    return message


async def wait_on_transfer(
    mdc,
    run: int,
    proposal: str,
    run_dir: Path,
    max_tries: int = 300,
    sleep_completion: int = 10,
) -> Optional[List[str]]:
    """Query MyMDC to get run migration status.

    This coro queries MyMDC to get the storage spaces where the data is
    available from (known as repositories.)
    If the data is available on offline or dCache storage, this coro will
    return. Else, it will loop until available.

    If the data is not avalailable after all retries, a MigrationError will be
    raised.
    A ValueError can also be raised if a repository is not supported in this
    webservice.

    :param run: the run id
    :param proposal: the proposal id
    :param max_tries: maximum number of tries to check for migration completed
    :param sleep_completion: sleep time in seconds between checks
    :return repos: a list of repositories in which the data is available
    :raise MigrationError: when the data is not on offline or dCache within the
                           maximum max_tries * sleep_completion time
    :raise ValueError: if the reply from MyMDC is not 200 ok or no data was
                       returned.
    """
    loop = get_event_loop()
    for iteration in range(max_tries):
        response = await loop.run_in_executor(
            None,
            mdc.get_runs_by_proposal_number_api,
            proposal,
            run
        )

        if response.status_code != 200:
            raise ValueError(
                "FAILED: MyMDC replied with "
                f"{response.status_code}: {response.reason}"
            )

        if not response.json():
            raise ValueError(
                "FAILED: MyMDC replied empty response for "
                f"proposal {proposal}, run {run}"
            )

        run_details, = response.json()['runs']
        repositories = list(run_details['repositories'].keys())
        # List of locations where the data is stored, values are:
        #  {"XFEL_GPFS_ONLINE_SASE_X", "XFEL_GPFS_OFFLINE_RAW_CC", "DESY_DCACHE_RAW_CC"}
        #
        # where `X` is the SASE number

        logging.debug(f"{run_dir=}, {run_dir.resolve()=}, {repositories=}")

        # Resolving is done here in case the symlink target changed during the loop
        run_dir = run_dir.resolve()

        required_repository = None
        if run_dir.parts[1:4] == ("pnfs", "xfel.eu", "exfel"):
            required_repository = "DESY_DCACHE_RAW"
        elif run_dir.parts[1:4] == ("gpfs", "exfel", "d"):
            required_repository = "XFEL_GPFS_OFFLINE"
        else:
            logging.warning(
                f"Proposal {proposal} run {run} resolved path is not relative "
                f"to `/pnfs/xfel.eu/exfel` or `/gpfs/exfel/d`: {run_dir=}"
            )
            return

        logging.debug(f"{required_repository=}")

        if any(required_repository in r for r in repositories):
            return repositories

        logging.info(
            f"Proposal {proposal} run {run} not migrated yet. "
            f"Will try again ({iteration}/{max_tries})"
        )

        await asyncio.sleep(sleep_completion)

    raise MigrationError.timeout(
        run, proposal, f"{max_tries*sleep_completion}s"
    )


async def wait_transfers(
    mdc: MetadataClient,
    runs: List[str],
    proposal: str,
    cycle: int,
    instrument: str,
) -> bool:
    """Wait for multiple runs to be transferred to Maxwell.

    :param runs: Run numbers to wait for
    :param proposal: Proposal number
    :return: True if all runs transferred, false on timeout
    """
    logging.debug(f"Waiting for: proposal {proposal}, runs {runs}")

    in_folder = Path(
        config['correct']['in-folder'].format(
            instrument=instrument, cycle=cycle, proposal=proposal
        )
    )

    coros = []
    for run in runs:
        run_dir = (in_folder / f"r{int(run):04d}")
        coros.append(wait_on_transfer(mdc, run, proposal, run_dir))

    try:
        await asyncio.gather(*coros)
    except (MigrationError, ValueError):
        logging.error(
            Errors.TRANSFER_EVAL_FAILED.format(proposal, runs), exc_info=True
        )
        return False

    logging.info("Transfer complete: proposal %s, runs %s", proposal, runs)
    return True


def check_files(in_folder: str,
                runs: List[int],
                karabo_das: List[str]) -> bool:
    """Check if files for given karabo-das exists for given runs

    :param in_folder: Input folder
    :param runs: List of runs
    :param karabo_das: List of karabo-das
    :return: True if files are there
    """
    files_exists = True
    for runnr in runs:
        rpath = Path(in_folder, 'r{:04d}'.format(int(runnr)))
        fl = rpath.glob('*.h5')
        if not any(da in match.name for match in fl for da in karabo_das):
            files_exists = False
    return files_exists

async def get_slurm_partition(mdc: MetadataClient,
                            action: str,
                            proposal_number: Union[int, str]) -> str:
    """Check MyMDC for the proposal status and select the appropriate slurm
       partition.

       The partition is either upex-high (for darks) or upex-middle (for
       corrections) if the proposal is 'R'eady or 'A'ctive.
       In other cases, the jobs default to the exfel partition.

       :param mdc: an authenticated MyMDC client
       :param action: the type of action ('correct' or 'dark')
       :param proposal_number: the proposal number
       :return: 'exfel' on closed proposals,
                'upex-high' if dark request on active proposals,
                'upex-middle' if correct request on active proposals.
    """
    # See
    # https://git.xfel.eu/ITDM/metadata_catalog/-/blob/develop/app/models/proposal.rb
    # for possible proposals states.

    loop = get_event_loop()
    response = await shield(loop.run_in_executor(None,
                                                 mdc.get_proposal_by_number_api,
                                                 proposal_number))
    if response.status_code != 200:
        logging.error(f'Failed to check MDC for proposal "{proposal_number}" '
                      'status. ASSUMING CLOSED')
        logging.error(Errors.MDC_RESPONSE.format(response))

    partition = 'exfel'
    status = response.json().get('flg_beamtime_status', 'whoopsie')

    if status in ['R', 'A']:
        partition = 'upex-high' if action == 'dark' else 'upex-middle'

    logging.debug(f"Using {partition} for {proposal_number} because {status}")

    return partition


async def get_slurm_nice(partition: str, instrument: str,
                         cycle: Union[int, str], job_penalty: int = 2,
                         commissioning_penalty: int = 1250) -> int:
    """Compute priority adjustment based on cycle and number of running
       jobs.

       The nice value is computed with
           base_penalty + job_penalty * num_jobs**2

       base_penalty is 0 for user proposals and commissioning_penalty
       for commissioning proposals. The number of jobs is computed by
       calling `squeue` and counting based on job name.

       The default penalty values give commissioning proposals a
       penalty of 25 running jobs.

       :param partition: Partition to run jobs in.
       :param instrument: Instrument to run jobs for.
       :param cycle: Cycle of proposal to run jobs for.
       :param job_penalty: Scaling factor per job, 2 by default.
       :param commissioning_penalty: Base penalty for commissioning,
           1250 by default.
       :return: Nice value to be passed to sbatch --nice
    """

    if partition == 'exfel':
        return 0  # Don't apply degressive priority on exfel.

    # List all names for jobs running in the specified partition.
    returncode, job_names, stderr = await run_proc_async(
        ['squeue', '-h', '-o', '%.20j', '-p', partition, '--me'])

    if returncode != 0:
        logging.error(f'Non-zero return code {returncode} from '
                      f'`squeue` upon counting number of jobs')
        logging.warning(f"{stderr=}")
        return 0  # Fallback if something went wrong.

    # Base value depending on proposal type using cycle, assuming that
    # user proposals follow the pattern xxxx0y, while different kinds of
    # commissioning proposals use xxxx2y or xxxx3y.
    base_nice = 0 if str(cycle)[4] == '0' else commissioning_penalty

    # Count number of jobs
    num_jobs = sum((1 for job in job_names.decode('ascii').split('\n')
                    if f'correct_{instrument}' in job))

    return base_nice + num_jobs**2 * job_penalty


async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str,
                             out_path: str, report_path: str):
    """Update data paths in MyMDC to provide Globus access

    DarkRequests have the capability to provide a link to the report via
    Globus. For this, the absolute path to the report needs to be provided.

    Additionally, the input path to the raw data, as well as the output path to
    the processed data can be specified.

    :param mdc: an authenticated MyMDC client
    :param rid: the DarkRequest id to update
    :param in_path: the absolute path to the raw data being calibrated
    :param out_path: the absolute path to the resulting processed data
    :param report_path: the absolute path to the PDF report
    """
    data = {'dark_run': {'output_path': out_path,
                         'input_path': in_path,
                         'report_url': report_path}}
    loop = get_event_loop()
    response = await shield(loop.run_in_executor(None, mdc.update_dark_run_api,
                                                 rid, data))
    if response.status_code != 200:
        logging.error("Failed to update MDC dark report path for run id %s", rid)
        logging.error(Errors.MDC_RESPONSE.format(response))


async def update_mdc_status(mdc: MetadataClient, action: str,
                            rid: int, message: str):
    """Update MyMDC statuses

    The message content and destination depend on the Action type.
    A message to MyMDC expects a status flag, which differ between Correction
    and DarkRequest.

    Correction can have NA (Not Available), R(unning), A(vailable)
    DarkRequest can have F(inished), E(rror), R(equested),
                         IP (In Progress), T(imeout)
    The flag is extracted from the message.
    Further informations are available at:
    https://git.xfel.eu/gitlab/detectors/pycalibration/wikis/MyMDC-Communication
    """
    if message.split(':')[0] in ('FAILED', 'WARN'):  # Errors
        flag = 'NA' if action == 'correct' else 'E'
    elif message.split(':')[0] == 'SUCCESS':  # Success
        flag = 'R' if action == 'correct' else 'IP'
        if 'Uploaded' in message or 'Finished' in message:
            flag = 'A' if action == 'correct' else 'F'
    else:  # MDC Timeout
        flag = 'NA' if action == 'correct' else 'T'

    if action == 'correct':
        func = mdc.update_run_api
        data = {'flg_cal_data_status': flag, 'cal_pipeline_reply': message}

    elif action == 'dark_request':
        func = mdc.update_dark_run_api
        data = {'dark_run': {'flg_status': flag, 'calcat_feedback': message}}
    else:
        raise ValueError(f"Unexpected action: {action}")

    loop = get_event_loop()
    response = await shield(loop.run_in_executor(None, func, rid, data))

    if response.status_code != 200:
        logging.error("Failed to update MDC status for action %s, run id %s",
                      action, rid)
        logging.error(Errors.MDC_RESPONSE.format(response))


def _orca_passthrough(
    proposal_number = None,
    runs = None,
    action = None,
    route = "execute",
    **kwargs,
):
    """ Passes through requests received by the webservice to Orca for use during
    the transition to the new webservice

    Due to network restrictions on Maxwell, this sends post requests to localhost on
    port 42751 (a random port number), so either Orca should be running locally or a
    ssh tunnel should be set up with `ssh -L 42751:localhost:42751 TARGET-SERVER -N &`
    """
    try:
        base_url = "http://localhost"
        port = "42751"

        args = []

        args.append(f"action={action}") if action else None
        args.append(f"runs={','.join(str(r) for r in runs)}") if runs else None

        args.extend([f"{k}={v}" for k, v in kwargs.items()])

        url = f"{base_url}:{port}/{route}/{proposal_number}?{'&'.join(filter(None, args))}"
    except Exception:
        logging.warning("error building orca passthrough request", exc_info=True)
        return None

    try:
        requests.post(url)
    except requests.ConnectionError:
        logging.warning("Could not connect to orca to send passthrough request")
    except Exception:
        logging.error(f"orca post request error for url {url}", exc_info=True)


class ActionsServer:
    def __init__(self, config, mode):
        self.config = config
        self.mode = mode

        init_config_repo(config['config-repo'])
        self.job_db = init_job_db(config)
        self.mdc = init_md_client(config)

        # Set up a ZMQ socket to listen for requests
        self.zmq_ctx = zmq.asyncio.Context()
        auth = zmq.auth.thread.ThreadAuthenticator(self.zmq_ctx)
        if mode == "prod-auth":
            auth.start()
            auth.allow(config['web-service']['allowed-ips'])

        self.socket = self.zmq_ctx.socket(zmq.REP)
        self.socket.zap_domain = b'global'
        self.socket.bind("{}:{}".format(config['web-service']['bind-to'],
                                        config['web-service']['port']))

    async def run(self):
        """The main server loop

        The main server loop handles remote requests via a ZMQ interface.

        Requests are serialised as the repr of a Python list/tuple, with the
        first element identifying the action, and subsequent elements arguments
        to be passed to that action. The number & meaning of arguments depends
        on the action.
        """
        while True:
            req = await self.socket.recv()
            logging.debug("Raw request data: %r", req)
            try:
                resp = await self.handle_one_req(req)
            except Exception as e:
                logging.error("Unexpected error handling request", exc_info=e)
                resp = Errors.OTHER_ERROR.format(e).encode()

            logging.debug("Sending response: %r", resp)
            await self.socket.send(resp)

    async def handle_one_req(self, raw_req: bytes) -> bytes:
        """Handle one request, and return the reply to be sent"""
        try:  # protect against unparseable requests
            req = ast.literal_eval(raw_req.decode('utf-8'))
        except (SyntaxError, ValueError) as e:
            logging.error(str(e))
            return Errors.REQUEST_FAILED.encode()

        if len(req) < 2:  # catch parseable but malformed requests
            logging.error(Errors.REQUEST_MALFORMED.format(req))
            return Errors.REQUEST_MALFORMED.format(req).encode()

        action, *payload = req

        if action not in self.accepted_actions:
            logging.warning(Errors.UNKNOWN_ACTION.format(action))
            return Errors.UNKNOWN_ACTION.format(action).encode()

        logging.info("Handling request for action %s", action)
        logging.debug('Running action %s, payload %r', action, payload)

        handler = getattr(self, 'handle_' + action.replace('-', '_'))

        # Verify that requests contains the right number of parameters
        sig = inspect.signature(handler)
        try:
            sig.bind(*payload)
        except TypeError:
            logging.error(
                "Wrong number of arguments for action %s", action, exc_info=True
            )
            return Errors.REQUEST_MALFORMED.format(req).encode()

        res = handler(*payload)
        if asyncio.iscoroutine(res):
            res = await res
        return res

    accepted_actions = {
        'correct',
        'dark_request',
        'query-rid',
        'upload-yaml',
        'update_conf',
        'check_connection',
    }

    # Handler methods for each available action ------------------------------

    async def handle_check_connection(self, *args):
        """Inform the caller that the service is up and running.

        MyMDC will send the same information as provided to dark or correct
        requests, but nothing is done with this information.
        """
        return b'success'

    async def handle_correct(
            self, rid, _sase, instrument, cycle, proposal, runnr, priority
    ):
        """Launch detector correction

        :param rid: is the runid within the MDC database
        :param _sase: is the sase beamline
        :param instrument: is the instrument
        :param cycle: is the facility cycle
        :param proposal: is the proposal id
        :param runnr: the run number in integer form, i.e. without leading "r"
        :param priority: unused, retained for compatibility

        This will trigger a correction process to be launched for that run in
        the given cycle and proposal.
        """
        request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
        try:
            runnr = runnr.strip('r')

            proposal = self._normalise_proposal_num(proposal)
            pconf_full = self.load_proposal_config(cycle, proposal)

            with self.job_db:
                cur = self.job_db.execute(
                    "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)",
                    (rid, proposal, int(runnr), request_time)
                )
                req_id = cur.lastrowid

            _orca_passthrough(
                proposal_number=proposal,
                runs=[runnr],
                route="execute",
            )

            data_conf = pconf_full['data-mapping']
            if instrument in pconf_full['correct']:
                pconf = pconf_full['correct'][instrument]
            else:
                logging.info(f'Instrument {instrument} is unknown')
                return Errors.NOT_CONFIGURED.encode()

            in_folder = self.config['correct']['in-folder'].format(
                instrument=instrument, cycle=cycle, proposal=proposal)
            out_folder = self.config['correct']['out-folder'].format(
                instrument=instrument, cycle=cycle, proposal=proposal,
                run='r{:04d}'.format(int(runnr))
            )

        except Exception as e:
            msg = Errors.JOB_LAUNCH_FAILED.format('correct', e)
            logging.error(msg, exc_info=e)
            asyncio.ensure_future(
                update_mdc_status(self.mdc, 'correct', rid, msg)
            )
            return msg.encode()

        queued_msg = Success.QUEUED.format(proposal, [runnr])
        logging.debug(queued_msg)

        async def _continue():
            """Runs in the background after we reply to the 'correct' request"""
            await update_mdc_status(self.mdc, 'correct', rid, queued_msg)
            try:
                transfer_complete = await wait_transfers(
                    self.mdc, [runnr], proposal, cycle, instrument
                )
                if not transfer_complete:
                    # Timed out
                    await update_mdc_status(self.mdc, 'correct', rid,
                                            MDC.MIGRATION_TIMEOUT)
                    return

                rpath = "{}/r{:04d}/".format(in_folder, int(runnr))

                # Prepare configs for all detectors in run
                fl = glob.glob(f"{rpath}/*.h5")
                detectors = {}
                for karabo_id in pconf:
                    dconfig = data_conf[karabo_id]

                    # check for files according to mapping in raw run dir.
                    if any(y in x for x in fl
                           for y in dconfig['karabo-da']):
                        thisconf = copy.copy(dconfig)
                        if isinstance(pconf[karabo_id], dict):
                            thisconf.update(copy.copy(pconf[karabo_id]))
                        thisconf["in-folder"] = in_folder
                        thisconf["out-folder"] = out_folder
                        thisconf["karabo-id"] = karabo_id
                        thisconf["run"] = runnr

                        detectors[karabo_id] = thisconf

            except Exception as corr_e:
                logging.error("Error during correction", exc_info=corr_e)
                await update_mdc_status(self.mdc, 'correct', rid,
                                        Errors.REQUEST_FAILED)
                return

            for karabo_id in list(detectors.keys()):
                # Check for any detectors for which corrections are
                # disabled manually.
                if detectors[karabo_id].pop('disable-correct', False):
                    logging.warning(f'Skipping disabled detector {karabo_id}')
                    del detectors[karabo_id]

            if len(detectors) == 0:
                msg = Errors.NOTHING_TO_DO.format(rpath)
                logging.warning(msg)
                await update_mdc_status(self.mdc, 'correct', rid, msg)
                return

            ret, _ = await self.launch_jobs(
                [runnr], req_id, detectors, 'correct', instrument, cycle, proposal,
                request_time,
            )
            await update_mdc_status(self.mdc, 'correct', rid, ret)
            loop = get_event_loop()
            await loop.run_in_executor(
                None, self.mdc.update_run_api,
                rid, {'cal_last_begin_at': datetime.now(tz=timezone.utc).isoformat()}
            )
        # END of part to run after sending reply

        asyncio.ensure_future(_continue())

        return queued_msg.encode()

    async def handle_dark_request(
            self, rid, _sase, instrument, cycle, proposal, karabo_id,
            operation_mode, *extra
    ):
        """Launch dark run processing

        :param rid: is the runid within the MDC database
        :param _sase: is the sase beamline
        :param instrument: is the instrument
        :param cycle: is the facility cycle
        :param proposal: is the proposal id
        :param karabo_id: is the detector karabo id
        :param operation_mode: is the detector's operation mode, as defined in
               CalCat
        :param pdu_names: physical detector units for each modules
        :param karabo_das: the Data Agreggators representing which detector
               modules to calibrate
        :param runnr: is the run number in integer form, i.e. without leading
                     "r"
        """
        request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
        try:
            pdus, karabo_das, wait_runs = ast.literal_eval(','.join(extra))

            karabo_das = [val.strip() for val in karabo_das]
            runs = [str(val) for val in wait_runs]  # FIX: could this be int instead of str?

            proposal = self._normalise_proposal_num(proposal)

            _orca_passthrough(
                proposal_number=proposal,
                runs=runs,
                action="dark",
                route="execute",
                karabo_id=karabo_id,
            )

            with self.job_db:
                cur = self.job_db.execute(
                    "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'DARK', ?)",
                    (rid, proposal, int(wait_runs[-1]), request_time)
                )
                req_id = cur.lastrowid

            pconf_full = self.load_proposal_config(cycle, proposal)

            data_conf = pconf_full['data-mapping']
            if instrument in pconf_full['dark']:
                pconf = pconf_full['dark'][instrument]
            else:
                logging.info(f'Instrument {instrument} is unknown')
                return Errors.NOT_CONFIGURED.encode()

            in_folder = self.config['dark']['in-folder'].format(
                instrument=instrument, cycle=cycle, proposal=proposal)
            out_folder = self.config['dark']['out-folder'].format(
                instrument=instrument, cycle=cycle, proposal=proposal,
                runs='_'.join(runs))
            out_folder = str(Path(out_folder, karabo_id.replace('-', '_')))

        except Exception as e:
            msg = Errors.JOB_LAUNCH_FAILED.format('dark_request', e)
            logging.error(msg, exc_info=e)
            asyncio.ensure_future(
                update_mdc_status(self.mdc, 'dark_request', rid, msg)
            )
            return msg.encode()

        queued_msg = Success.QUEUED.format(proposal, runs)
        logging.debug(queued_msg)

        async def _continue():
            """Runs in the background after we reply to the 'dark_request' request"""
            await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg)
            transfer_complete = await wait_transfers(
                self.mdc, runs, proposal, cycle, instrument
            )
            if not transfer_complete:
                # Timed out
                await update_mdc_status(
                    self.mdc, 'dark_request', rid, MDC.MIGRATION_TIMEOUT
                )
                return

            # Notebooks require one or three runs, depending on the
            # detector type and operation mode.
            triple = any(det in karabo_id for det in
                         ["LPD", "AGIPD", "JUNGFRAU", "JF", "JNGFR"])

            # This fails silently if the hardcoded strings above are
            # ever changed (triple = False) but the underlying notebook
            # still expects run-high/run-med/run-low.
            if triple and len(runs) == 1:
                runs_dict = {'run-high': runs[0],
                             'run-med': '0',
                             'run-low': '0'}
            elif triple and len(runs) == 3:
                runs_dict = {'run-high': runs[0],
                             'run-med': runs[1],
                             'run-low': runs[2]}
            else:  # single
                runs_dict = {'run': runs[0]}

            # We assume that MyMDC does not allow dark request if the data
            # is not migrated, thus skipping some validation here.
            thisconf = copy.copy(data_conf[karabo_id])

            # Pop internal key to avoid propagation to xfel-calibrate.
            thisconf.pop('disable-correct', None)

            if (karabo_id in pconf
                    and isinstance(pconf[karabo_id], dict)):
                thisconf.update(copy.copy(pconf[karabo_id]))

            thisconf['in-folder'] = in_folder
            thisconf['out-folder'] = out_folder
            thisconf['karabo-id'] = karabo_id
            thisconf['karabo-da'] = karabo_das
            thisconf['operation-mode'] = operation_mode

            thisconf.update(runs_dict)

            detectors = {karabo_id: thisconf}

            ret, report_path = await self.launch_jobs(
                runs, req_id, detectors, 'dark', instrument, cycle, proposal,
                request_time
            )
            await update_mdc_status(self.mdc, 'dark_request', rid, ret)
            if len(report_path) == 0:
                logging.warning("Failed to identify report path for dark_request")
            else:
                if len(report_path) > 1:
                    logging.warning(
                        "More than one report path is returned. "
                        "Updating myMDC with the first report path only."
                    )
                await update_darks_paths(
                    self.mdc, rid, in_folder,
                    out_folder, report_path[0]
                )
        # END of part to run after sending reply

        asyncio.ensure_future(_continue())

        return queued_msg.encode()

    def handle_query_rid(self, rid):
        return query_rid(self.job_db, rid)

    def handle_upload_yaml(self, _sase, instrument, cycle, proposal, this_yaml):
        """Reconfigure detector calibration for the specified proposal

        :param _sase: is the sase beamline
        :param instrument: is the instrument
        :param cycle: is the facility cycle
        :param proposal: is the proposal id
        :param this_yaml: is url-encoded (quotes and spaces) representation of
                     new YAML file

        This will create or replace the existing YAML configuration for the
        proposal and cycle with the newly sent one, and then push it to the git
        configuration repo.
        """
        this_yaml = urllib.parse.unquote_plus(this_yaml)
        return upload_config(
            self.config['config-repo'], this_yaml, instrument, cycle, proposal
        )

    async def handle_update_conf(
            self, sase, karabo_id, instrument, cycle, proposal, config_yaml,
            apply
    ):
        updated_config = None
        try:
            updated_config = json.loads(config_yaml)
            return change_config(
                self.config['config-repo'],
                updated_config,
                instrument,
                cycle,
                proposal,
                apply.upper() == "TRUE",
            )
        except Exception as e:
            err_msg = (f"Failure applying config for {proposal}:"
                       f" {e}: {updated_config}")
            logging.error(err_msg, exc_info=e)
            return yaml.dump(err_msg, default_flow_style=False).encode()

    # Helper methods for handlers ---------------------------------------------

    @staticmethod
    def _normalise_proposal_num(p: str) -> str:
        return "{:06d}".format(int(p.strip('p')))

    def load_proposal_config(self, cycle, proposal) -> Dict:
        # Read calibration configuration from yaml
        conf_file = Path(
            self.config['config-repo']['local-path'], cycle, f'{proposal}.yaml'
        )
        if not conf_file.exists():
            conf_file = Path(
                self.config['config-repo']['local-path'], "default.yaml"
            )

        logging.debug("Loading config for cycle %s, proposal %s from %s",
                      cycle, proposal, conf_file)

        with open(conf_file, "r") as f:
            return yaml.load(f.read(), Loader=yaml.FullLoader)

    async def launch_jobs(
            self, run_nrs, req_id, detectors, action, instrument, cycle, proposal,
            request_time
    ) -> Tuple[str, List[str]]:
        report = []
        ret = []

        partition = await get_slurm_partition(self.mdc, action, proposal)
        nice = await get_slurm_nice(
            partition, instrument, cycle,
            commissioning_penalty=self.config[action]['commissioning-penalty'],
            job_penalty=self.config[action]['job-penalty'])

        # run xfel_calibrate
        for karabo_id, dconfig in detectors.items():
            detector = dconfig['detector-type']
            del dconfig['detector-type']
            cmd = self.config[action]['cmd'].format(
                detector=detector,
                sched_prio=nice,
                partition=partition,
                action=action, instrument=instrument,
                cycle=cycle, proposal=proposal,
                runs="_".join([f"r{r}" for r in run_nrs]),
                time_stamp=datetime.now().strftime('%y%m%d_%H%M%S'),
                det_instance=karabo_id,
                request_time=request_time
            ).split()
            cmd = parse_config(cmd, dconfig)

            with self.job_db:
                cur = self.job_db.execute(
                    "INSERT INTO executions VALUES (NULL, ?, ?, ?, ?, NULL)",
                    (req_id, shlex.join(cmd), detector, karabo_id)
                )
                exec_id = cur.lastrowid

            # TODO: Add detector info in returned run action status.
            ret.append(await run_action(
                self.job_db, cmd, self.mode,
                proposal, run_nrs[-1], exec_id
            ))
            if '--report-to' in cmd[:-1]:
                report_idx = cmd.index('--report-to') + 1
                report.append(cmd[report_idx] + '.pdf')
        # return string without a tailing comma.
        return ", ".join(ret), report


def main(argv: Optional[List[str]] = None):
    if argv is None:
        argv = sys.argv[1:]

    # Ensure files are opened as UTF-8 by default, regardless of environment.
    locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8'))

    parser = argparse.ArgumentParser(
        description='Start the calibration webservice'
    )
    parser.add_argument('--config-file', type=str, default=None)
    parser.add_argument('--mode', type=str, default="sim", choices=['sim', 'prod']) # noqa
    parser.add_argument('--log-file', type=str, default='./web.log')
    parser.add_argument(
        '--log-level', type=str, default="INFO", choices=['INFO', 'DEBUG', 'ERROR'] # noqa
    )

    args = parser.parse_args(argv)
    config_file = args.config_file
    log_file = args.log_file
    log_level = args.log_level
    mode = args.mode

    if config_file is not None:
        config.configure(includes_for_dynaconf=[Path(config_file).absolute()])

    fmt = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] %(message)s' # noqa
    logging.basicConfig(
        filename=log_file,
        level=getattr(logging, log_level),
        format=fmt
    )

    # Launch the ZMQ server to handle requests for calibration
    server = ActionsServer(config, mode)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server.run())
    loop.close()


if __name__ == "__main__":
    main()