import argparse
import ast
import asyncio
import copy
import getpass
import glob
import inspect
import json
import locale
import logging
import os
import sqlite3
import sys
import time
import urllib.parse
from asyncio import get_event_loop, shield
from datetime import datetime
from pathlib import Path
from subprocess import PIPE, run
from threading import Thread
from typing import Any, Dict, List, Optional

import yaml
import zmq
import zmq.asyncio
import zmq.auth.thread
from git import InvalidGitRepositoryError, Repo
from kafka import KafkaProducer
from kafka.errors import KafkaError
from metadata_client.metadata_client import MetadataClient

try:
    from .messages import MDC, Errors, Success
except ImportError:
    from messages import MDC, Errors, Success


def init_job_db(config):
    """ Initialize the sqlite job database

    A new database is created if no pre-existing one is present. A single
    table is created: jobs, which has columns:

        rid, id, proposal, run, flg, status

    :param config: the configuration parsed from the webservice YAML config
    :return: a sqlite3 connection instance to the database
    """
    logging.info("Initializing database")
    conn = sqlite3.connect(config['web-service']['job-db'])
    conn.execute(
        "CREATE TABLE IF NOT EXISTS "
        "jobs(rid, jobid, proposal, run, status, time, det, act)"
    )
    return conn


def init_md_client(config: Dict[str, Dict[str, str]]) -> MetadataClient:
    """Initialize an MDC client connection.

    :param config: the configuration parsed from the webservice YAML config
    :return: an MDC client connection
    """
    mdconf = config['metadata-client']
    client_conn = MetadataClient(client_id=mdconf['user-id'],
                                 client_secret=mdconf['user-secret'],
                                 user_email=mdconf['user-email'],
                                 token_url=mdconf['token-url'],
                                 refresh_url=mdconf['refresh-url'],
                                 auth_url=mdconf['auth-url'],
                                 scope=mdconf['scope'],
                                 base_api_url=mdconf['base-api-url'])
    return client_conn


def init_config_repo(config):
    """ Make sure the configuration repo is present and up-to-data

    :param config: the configuration defined in the `config-repo` section
    """
    # FIXME: keep this as func, but call this with asyncio.thread
    os.makedirs(config['local-path'], exist_ok=True)
    # check if it is a repo
    try:
        repo = Repo(config['local-path'])
    except InvalidGitRepositoryError:
        repo = Repo.clone_from(config['url'], config['local-path'])
    try:
        repo.remote().pull()
    except Exception:
        pass
    logging.info("Config repo is initialized")


def init_kafka_producer(config):
    try:
        return KafkaProducer(
            bootstrap_servers=config['kafka']['brokers'],
            value_serializer=lambda d: json.dumps(d).encode('utf-8'),
            max_block_ms=2000,  # Don't get stuck trying to send Kafka messages
        )
    except KafkaError:
        logging.warning("Problem initialising Kafka producer; "
                        "Kafka notifications will not be sent.", exc_info=True)
        return NoOpProducer()


class NoOpProducer:
    """Fills in for Kafka producer object when setting that up fails"""
    def send(self, topic, value):
        pass


async def upload_config(config, yaml, instrument, cycle, proposal) -> bytes:
    """ Upload a new configuration YAML

    :param config: the configuration defined in the `config-repo` section
        of the webservice.yaml configuration.
    :param yaml: the YAML contents to update
    :param instrument: instrument for which the update is for
    :param cycle: the facility cylce the update is for
    :param proposal: the proposal the update is for

    The YAML contents will be placed into a file at

        {config.local-path}/{cycle}/{proposal}.yaml

    If it exists it is overwritten and then the new version is pushed to
    the configuration git repo.
    """
    # FIXME: keep this as func, but call this with asyncio.thread
    repo = Repo(config['local-path'])
    # assure we are on most current version
    repo.remote().pull()
    prop_dir = os.path.join(repo.working_tree_dir, cycle)
    os.makedirs(prop_dir, exist_ok=True)
    with open("{}/{}.yaml".format(prop_dir, proposal), "w") as f:
        f.write(yaml)
    fpath = "{}/{}.yaml".format(prop_dir, proposal)
    repo.index.add([fpath])
    repo.index.commit(
        "Update to proposal p{}: {}".format(proposal,
                                            datetime.now().isoformat()))
    repo.remote().push()
    logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal))
    return Success.UPLOADED_CONFIG.format(cycle, proposal).encode()


def merge(source: Dict, destination: Dict) -> Dict:
    """Deep merge two dictionaries.

    :param source: source dictionary to merge into destination
    :param destination: destination dictionary which is being merged in
    :return: the updated destination dictionary

    Taken from:
    https://stackoverflow.com/questions/20656135/
                                       python-deep-merge-dictionary-data
    """
    for key, value in source.items():
        if isinstance(value, dict):
            # check if destination (e.g. karabo-id) has none value
            if key in destination.keys() and destination[key] is None:
                destination[key] = {}
            # get node or create one
            node = destination.setdefault(key, {})
            merge(value, node)
        else:
            destination[key] = value

    return destination


def change_config(config, updated_config, instrument,
                  cycle, proposal, apply=False) -> bytes:
    """ Change the configuration of a proposal

    If no proposal specific configuration yet exists, one is first created
    based on the default configuration of the proposal

    Changes are committed to git.

    :param config: repo config as given in YAML config file
    :param updated_config: a dictionary containing the updated config
    :param instrument: the instrument to change the config for
    :param cycle: the cycle to change config for
    :param proposal: the proposal to change config for
    :param apply: set to True to actually commit a change, otherwise a dry-run
                  is performed
    :return: The updated config to the requesting zmq socket
    """
    repo = Repo(config['local-path'])
    repo.remote().pull()
    # A cycle directory is created, if doesn't exists.
    cyc_dir = Path(repo.working_tree_dir, cycle)
    cyc_dir.mkdir(exist_ok=True)
    fpath = Path(cyc_dir, f"{int(proposal):06d}.yaml")

    # In case cycle/proposal.yaml doesn't exist,
    # new file is created based on default.yaml
    if not fpath.exists():
        with open(f"{repo.working_tree_dir}/default.yaml", "r") as f:
            defconf = yaml.safe_load(f)
            subconf = {}
            # Propsal.yaml should have dark, correct and data-mapping keys
            # with all detector dictionaries of the changed instrument.
            for action in ["dark", "correct"]:
                subconf[action] = {}
                subconf[action][instrument] = defconf[action][instrument]

            # Copy data-mapping for all detectors of an instrument.
            subconf["data-mapping"] = {}
            for k_id in defconf["dark"][instrument].keys():
                print("karabo-id", k_id)
                subconf["data-mapping"][k_id] = defconf["data-mapping"][k_id]
            with open(fpath, "w") as wf:
                wf.write(yaml.safe_dump(subconf, default_flow_style=False))

    new_conf = None
    with open(fpath, "r") as rf:
        existing_conf = yaml.safe_load(rf)
        new_conf = merge(updated_config, existing_conf)

    if apply:
        # Apply updated configuration to the proposal.yaml
        # and push it to the calibration_configurations remote reporsitory.
        with open(fpath, "w") as wf:
            wf.write(yaml.safe_dump(new_conf, default_flow_style=False))
        repo.index.add([str(fpath)])
        repo.index.commit(
            f"Update to proposal YAML: {datetime.now().isoformat()}")
        repo.remote().push()
    logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal))
    return yaml.safe_dump(new_conf, default_flow_style=False).encode()


async def run_proc_async(cmd: List[str]) -> (int, bytes):
    """Run a subprocess to completion using asyncio, capturing stdout

    Returns the numeric exit code and stdout (bytes)
    """
    proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE)
    stdout, _ = await proc.communicate()
    return proc.returncode, stdout


def slurm_status(filter_user=True):
    """ Return the status of slurm jobs by calling squeue

    :param filter_user: set to true to filter ony jobs from current user
    :return: a dictionary indexed by slurm jobid and containing a tuple
             of (status, run time) as values.
    """
    cmd = ["squeue"]
    if filter_user:
        cmd += ["-u", getpass.getuser()]
    res = run(cmd, stdout=PIPE)
    if res.returncode == 0:
        rlines = res.stdout.decode().split("\n")
        statii = {}
        for r in rlines[1:]:
            try:
                jobid, _, _, _, status, runtime, _, _ = r.split()
                jobid = jobid.strip()
                statii[jobid] = status, runtime
            except ValueError:  # not enough values to unpack in split
                pass
        return statii


def slurm_job_status(jobid):
    """ Return the status of slurm job

    :param jobid: Slurm job Id
    :return: Slurm state, Elapsed.
    """
    cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"]

    res = run(cmd, stdout=PIPE)
    if res.returncode == 0:
        rlines = res.stdout.decode().split("\n")

        logging.debug("Job {} state {}".format(jobid, rlines[2].split()))
        if len(rlines[2].split()) == 3:
            return rlines[2].replace("+", "").split()
    return "NA", "NA", "NA"


def query_rid(conn, rid) -> bytes:
    c = conn.cursor()
    c.execute("SELECT * FROM jobs WHERE rid LIKE ?", (rid,))
    combined = {}
    for r in c.fetchall():
        rid, jobid, proposal, run, status, time_, _ = r
        logging.debug(
            "Job {}, proposal {}, run {} has status {}".format(jobid,
                                                               proposal,
                                                               run,
                                                               status))
        cflg, cstatus = combined.get(rid, ([], []))
        if status in ['R', 'PD']:
            flg = 'R'
        elif status == 'NA':
            flg = 'NA'
        else:
            flg = 'A'

        cflg.append(flg)
        cstatus.append(status)
        combined[rid] = cflg, cstatus

    flg_order = {"R": 2, "A": 1, "NA": 0}
    msg = ""
    for rid, value in combined.items():
        flgs, statii = value
        flg = max(flgs, key=lambda i: flg_order[i])
        msg += "\n".join(statii)
    if msg == "":
        msg = 'NA'
    return msg.encode()


def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]:
    """Convert a dictionary to a list of arguments.

       Values that are not strings will be cast.
       Lists will be converted to several strings following their `--key`
       flag.
       Booleans will be converted to a `--key` flag, where `key` is the
       dictionary key.
    """
    for key, value in config.items():
        if ' ' in key or (isinstance(value, str) and ' ' in value):
            raise ValueError('Spaces are not allowed', key, value)

        if isinstance(value, list):
            cmd.append(f"--{key}")
            cmd += [str(v) for v in value]
        elif isinstance(value, bool):
            if value:
                cmd += ["--{}".format(key)]
        else:
            if value in ['""', "''"]:
                value = ""
            cmd += ["--{}".format(key), str(value)]

    return cmd


def update_job_db(config):
    """ Update the job database and send out updates to MDC

    This runs in its own thread.

    :param config: configuration parsed from webservice YAML
    """
    logging.info("Starting config db handling")
    conn = init_job_db(config)
    mdc = init_md_client(config)
    kafka_prod = init_kafka_producer(config)
    kafka_topic = config['kafka']['topic']
    time_interval = int(config['web-service']['job-update-interval'])

    while True:
        statii = slurm_status()
        # Check that slurm is giving proper feedback
        if statii is None:
            time.sleep(time_interval)
            continue
        try:
            c = conn.cursor()
            c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ")
            combined = {}
            logging.debug("SLURM info {}".format(statii))

            for r in c.fetchall():
                rid, jobid, proposal, run, status, _time, det, action = r
                logging.debug("DB info {}".format(r))

                cflg, cstatus, *_ = combined.setdefault((rid, action), (
                    [], [], proposal, run, det
                ))
                if jobid in statii:
                    slstatus, runtime = statii[jobid]
                    query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?"
                    c.execute(query, (slstatus, runtime, jobid))

                    cflg.append('R')
                    cstatus.append("{}-{}".format(slstatus, runtime))
                else:
                    _, sltime, slstatus = slurm_job_status(jobid)
                    query = "UPDATE jobs SET status=? WHERE jobid LIKE ?"
                    c.execute(query, (slstatus, jobid))

                    if slstatus == 'COMPLETED':
                        cflg.append("A")
                    else:
                        cflg.append("NA")
                    cstatus.append(slstatus)
            conn.commit()

            flg_order = {"R": 2, "A": 1, "NA": 0}
            dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'}
            for rid, action in combined:
                if int(rid) == 0:  # this job was not submitted from MyMDC
                    continue
                flgs, statii, proposal, run, det = combined[rid, action]
                # sort by least done status
                flg = max(flgs, key=lambda i: flg_order[i])
                if flg != 'R':
                    logging.info(
                        "Jobs finished - action: %s, run id: %s, status: %s",
                        action, rid, flg,
                    )
                    if action == 'CORRECT':
                        try:
                            kafka_prod.send(kafka_topic, {
                                'event': 'correction_complete',
                                'proposal': proposal,
                                'run': run,
                                'detector': det,
                                'success': (flg == 'A'),  # A for Available
                            })
                        except KafkaError:
                            logging.warning("Error sending Kafka notification",
                                            exc_info=True)
                msg = "\n".join(statii)
                msg_debug = f"Update MDC {rid}, {msg}"
                logging.debug(msg_debug.replace('\n', ', '))
                if action == 'CORRECT':
                    response = mdc.update_run_api(rid,
                                                  {'flg_cal_data_status': flg,
                                                   'cal_pipeline_reply': msg})
                else:  # action == 'DARK' but it's dark_request
                    data = {'dark_run': {'flg_status': dark_flags[flg],
                                         'calcat_feedback': msg}}
                    response = mdc.update_dark_run_api(rid, data)
                if response.status_code != 200:
                    logging.error("Failed to update MDC for action %s, rid %s",
                                  action, rid)
                    logging.error(Errors.MDC_RESPONSE.format(response))
        except Exception:
            logging.error("Failure to update job DB", exc_info=True)

        time.sleep(time_interval)


async def copy_untouched_files(file_list):
    """ Copy those files which are not touched by the calibration
    to the output directory.

    :param file_list: The list of files to copy

    Copying is done via an asyncio subprocess call
    """
    for f in file_list:
        of = f.replace("raw", "proc").replace("RAW", "CORR")
        cmd = ["rsync", "-av", f, of]
        await asyncio.subprocess.create_subprocess_shell(" ".join(cmd))
        logging.info(f"Copying {f} to {of}")


async def run_action(job_db, cmd, mode, proposal, run, rid) -> str:
    """Run action command (CORRECT or DARK).

    :param job_db: jobs database
    :param cmd: to run, should be a in list for as expected by subprocess.run
    :param mode: "prod" or "sim", in the latter case nothing will be executed
                 but the command will be logged
    :param proposal: proposal the command was issued for
    :param run: run the command was issued for
    :param rid: run id in the MDC

    Returns a formatted Success or Error message indicating outcome of the
    execution.
    """
    if mode == "prod":
        logging.info(" ".join(cmd))
        retcode, stdout = await run_proc_async(cmd)
        if retcode != 0:
            logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, retcode))
            return Errors.JOB_LAUNCH_FAILED.format(cmd, retcode)

        if "DARK" in cmd:
            message = Success.START_CHAR.format(proposal, run)
        else:
            message = Success.START_CORRECTION.format(proposal, run)

        # Save submitted jobs to persistent database.
        c = job_db.cursor()  # FIXME: asyncio
        rstr = stdout.decode()

        for r in rstr.split("\n"):
            if "Submitted job:" in r:
                _, jobid = r.split(":")
                c.execute(
                    "INSERT INTO jobs VALUES (?, ?, ?, ?, 'PD', ?, ?, ?)",
                    (rid, jobid.strip(), proposal, run,
                     datetime.now().isoformat(), cmd[3], cmd[4])
                )
        job_db.commit()

    else:  # mode == "sim"
        if "DARK" in cmd:
            message = Success.START_CHAR_SIM.format(proposal, run)
        else:
            message = Success.START_CORRECTION_SIM.format(proposal, run)

    logging.info(message)
    logging.info((" ".join(cmd)).replace(',', '').replace("'", ""))
    return message


async def wait_on_transfer(rpath, max_tries=300) -> bool:
    """
    Wait on data files to be transferred to Maxwell

    :param rpath: Folder, which contains data files migrated to Maxwell
    :param max_tries: Maximum number of checks if files are transferred
    :return: True if files are transferred
    """
    # TODO: Make use of MyMDC to request whether the run has been copied.
    # It is not sufficient to know that the files are on disk, but also to
    # check the copy is finished (ie. that the files are complete).
    if 'pnfs' in os.path.realpath(rpath):
        return True
    tries = 0

    # FIXME: if not kafka, then do event-driven, no sleep
    # wait until folder gets created
    while not os.path.exists(rpath):
        if tries > max_tries:
            return False
        tries += 1
        await asyncio.sleep(10)

    # FIXME: if not kafka, then do event-driven, no sleep
    # wait until files are migrated
    while True:
        retcode, stdout = await run_proc_async([
            "getfattr", "-n", "user.status", rpath
        ])
        if retcode == 0 and 'status="online"' not in stdout.decode().lower():
            return True
        if tries > max_tries:
            return False
        tries += 1
        await asyncio.sleep(10)


async def wait_transfers(
        wait_runs: List[str], in_folder: str, proposal: str
) -> bool:
    """Wait for multiple runs to be transferred to Maxwell.

    :param wait_runs: Run numbers to wait for
    :param in_folder: Proposal raw directory containing runs
    :param proposal: Proposal number
    :return: True if all runs transferred, false on timeout
    """
    logging.debug(f"Waiting for: proposal {proposal}, runs {wait_runs}")

    # FIXME: this loop should be an asyncio.gather
    for runnr in wait_runs:
        rpath = "{}/r{:04d}/".format(in_folder, int(runnr))
        transfer_complete = await wait_on_transfer(rpath)
        if not transfer_complete:
            logging.error(
                Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr)
            )
            return False

    logging.info(
        "Transfer complete: proposal %s, runs %s", proposal, wait_runs
    )
    return True


def check_files(in_folder: str,
                runs: List[int],
                karabo_das: List[str]) -> bool:
    """Check if files for given karabo-das exists for given runs

    :param in_folder: Input folder
    :param runs: List of runs
    :param karabo_das: List of karabo-das
    :return: True if files are there
    """
    files_exists = True
    for runnr in runs:
        rpath = Path(in_folder, 'r{:04d}'.format(int(runnr)))
        fl = rpath.glob('*.h5')
        if not any(da in match.name for match in fl for da in karabo_das):
            files_exists = False
    return files_exists


async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str,
                             out_path: str, report_path: str):
    """Update data paths in MyMDC to provide Globus access

    DarkRequests have the capability to provide a link to the report via
    Globus. For this, the absolute path to the report needs to be provided.

    Additionally, the input path to the raw data, as well as the output path to
    the processed data can be specified.

    :param mdc: an authenticated MyMDC client
    :param rid: the DarkRequest id to update
    :param in_path: the absolute path to the raw data being calibrated
    :param out_path: the absolute path to the resulting processed data
    :param report_path: the absolute path to the PDF report
    """
    data = {'dark_run': {'output_path': out_path,
                         'input_path': in_path,
                         'globus_url': report_path}}
    loop = get_event_loop()
    response = await shield(loop.run_in_executor(None, mdc.update_dark_run_api,
                                                 rid, data))
    if response.status_code != 200:
        logging.error("Failed to update MDC dark report path for run id %s", rid)
        logging.error(Errors.MDC_RESPONSE.format(response))


async def update_mdc_status(mdc: MetadataClient, action: str,
                            rid: int, message: str):
    """Update MyMDC statuses

    The message content and destination depend on the Action type.
    A message to MyMDC expects a status flag, which differ between Correction
    and DarkRequest.

    Correction can have NA (Not Available), R(unning), A(vailable)
    DarkRequest can have F(inished), E(rror), R(equested),
                         IP (In Progress), T(imeout)
    The flag is extracted from the message.
    Further informations are available at:
    https://git.xfel.eu/gitlab/detectors/pycalibration/wikis/MyMDC-Communication
    """
    if message.split(':')[0] in ('FAILED', 'WARN'):  # Errors
        flag = 'NA' if action == 'correct' else 'E'
    elif message.split(':')[0] == 'SUCCESS':  # Success
        flag = 'R' if action == 'correct' else 'IP'
        if 'Uploaded' in message or 'Finished' in message:
            flag = 'A' if action == 'correct' else 'F'
    else:  # MDC Timeout
        flag = 'NA' if action == 'correct' else 'T'

    if action == 'correct':
        func = mdc.update_run_api
        data = {'flg_cal_data_status': flag, 'cal_pipeline_reply': message}

    elif action == 'dark_request':
        func = mdc.update_dark_run_api
        data = {'dark_run': {'flg_status': flag, 'calcat_feedback': message}}
    else:
        raise ValueError(f"Unexpected action: {action}")

    loop = get_event_loop()
    response = await shield(loop.run_in_executor(None, func, rid, data))

    if response.status_code != 200:
        logging.error("Failed to update MDC status for action %s, run id %s",
                      action, rid)
        logging.error(Errors.MDC_RESPONSE.format(response))


class ActionsServer:
    def __init__(self, config, mode):
        self.config = config
        self.mode = mode

        init_config_repo(config['config-repo'])
        self.job_db = init_job_db(config)
        self.mdc = init_md_client(config)

        # Set up a ZMQ socket to listen for requests
        self.zmq_ctx = zmq.asyncio.Context()
        auth = zmq.auth.thread.ThreadAuthenticator(self.zmq_ctx)
        if mode == "prod-auth":
            auth.start()
            auth.allow(config['web-service']['allowed-ips'])

        self.socket = self.zmq_ctx.socket(zmq.REP)
        self.socket.zap_domain = b'global'
        self.socket.bind("{}:{}".format(config['web-service']['bind-to'],
                                        config['web-service']['port']))

    async def run(self):
        """The main server loop

        The main server loop handles remote requests via a ZMQ interface.

        Requests are serialised as the repr of a Python list/tuple, with the
        first element identifying the action, and subsequent elements arguments
        to be passed to that action. The number & meaning of arguments depends
        on the action.
        """
        while True:
            req = await self.socket.recv()
            logging.debug("Raw request data: %r", req)
            try:
                resp = await self.handle_one_req(req)
            except Exception as e:
                logging.error("Unexpected error handling request", exc_info=e)
                resp = Errors.OTHER_ERROR.format(e).encode()

            logging.debug("Sending response: %r", resp)
            await self.socket.send(resp)

    async def handle_one_req(self, raw_req: bytes) -> bytes:
        """Handle one request, and return the reply to be sent"""
        try:  # protect against unparseable requests
            req = ast.literal_eval(raw_req.decode('utf-8'))
        except (SyntaxError, ValueError) as e:
            logging.error(str(e))
            return Errors.REQUEST_FAILED.encode()

        if len(req) < 2:  # catch parseable but malformed requests
            logging.error(Errors.REQUEST_MALFORMED.format(req))
            return Errors.REQUEST_MALFORMED.format(req).encode()

        action, *payload = req

        if action not in self.accepted_actions:
            logging.warning(Errors.UNKNOWN_ACTION.format(action))
            return Errors.UNKNOWN_ACTION.format(action).encode()

        logging.info("Handling request for action %s", action)
        logging.debug('Running action %s, payload %r', action, payload)

        handler = getattr(self, 'handle_' + action.replace('-', '_'))

        # Verify that requests contains the right number of parameters
        sig = inspect.signature(handler)
        try:
            sig.bind(*payload)
        except TypeError:
            logging.error(
                "Wrong number of arguments for action %s", action, exc_info=True
            )
            return Errors.REQUEST_MALFORMED.format(req).encode()

        res = handler(*payload)
        if asyncio.iscoroutine(res):
            res = await res
        return res

    accepted_actions = {
        'correct',
        'dark_request',
        'query-rid',
        'upload-yaml',
        'update_conf',
    }

    # Handler methods for each available action ------------------------------

    async def handle_correct(
            self, rid, _sase, instrument, cycle, proposal, runnr, priority
    ):
        """Launch detector correction

        :param rid: is the runid within the MDC database
        :param _sase: is the sase beamline
        :param instrument: is the instrument
        :param cycle: is the facility cycle
        :param proposal: is the proposal id
        :param runnr: is the run number in integer form, e.g. without leading
                     "r"

        This will trigger a correction process to be launched for that run in
        the given cycle and proposal.
        """
        request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
        try:
            runnr = runnr.strip('r')

            proposal = self._normalise_proposal_num(proposal)
            pconf_full = self.load_proposal_config(cycle, proposal)

            data_conf = pconf_full['data-mapping']
            if instrument in pconf_full['correct']:
                pconf = pconf_full['correct'][instrument]
            else:
                logging.info(f'Instrument {instrument} is unknown')
                return Errors.NOT_CONFIGURED.encode()

            in_folder = self.config['correct']['in-folder'].format(
                instrument=instrument, cycle=cycle, proposal=proposal)
            out_folder = self.config['correct']['out-folder'].format(
                instrument=instrument, cycle=cycle, proposal=proposal,
                run='r{:04d}'.format(int(runnr))
            )

        except Exception as e:
            msg = Errors.JOB_LAUNCH_FAILED.format('correct', e)
            logging.error(msg, exc_info=e)
            asyncio.ensure_future(
                update_mdc_status(self.mdc, 'correct', rid, msg)
            )
            return msg.encode()

        queued_msg = Success.QUEUED.format(proposal, [runnr])
        logging.debug(queued_msg)

        async def _continue():
            """Runs in the background after we reply to the 'correct' request"""
            await update_mdc_status(self.mdc, 'correct', rid, queued_msg)
            try:
                transfer_complete = await wait_transfers(
                    [runnr], in_folder, proposal
                )
                if not transfer_complete:
                    # Timed out
                    await update_mdc_status(self.mdc, 'correct', rid,
                                            MDC.MIGRATION_TIMEOUT)
                    return

                rpath = "{}/r{:04d}/".format(in_folder, int(runnr))

                # Prepare configs for all detectors in run
                fl = glob.glob(f"{rpath}/*.h5")
                corr_file_list = set()
                copy_file_list = set(fl)
                detectors = {}
                for karabo_id in pconf:
                    dconfig = data_conf[karabo_id]
                    # check for files according to mapping in raw run dir.
                    if any(y in x for x in fl
                           for y in dconfig['karabo-da']):
                        for karabo_da in dconfig['karabo-da']:
                            tfl = glob.glob(f"{rpath}/*{karabo_da}*.h5")
                            corr_file_list = corr_file_list.union(set(tfl))
                        thisconf = copy.copy(dconfig)
                        if isinstance(pconf[karabo_id], dict):
                            thisconf.update(copy.copy(pconf[karabo_id]))
                        thisconf["in-folder"] = in_folder
                        thisconf["out-folder"] = out_folder
                        thisconf["karabo-id"] = karabo_id
                        thisconf["run"] = runnr
                        if priority:
                            thisconf["priority"] = str(priority)

                        detectors[karabo_id] = thisconf
                copy_file_list = copy_file_list.difference(corr_file_list)
                asyncio.ensure_future(copy_untouched_files(copy_file_list))
            except Exception as corr_e:
                logging.error("Error during correction", exc_info=corr_e)
                await update_mdc_status(self.mdc, 'correct', rid,
                                        Errors.REQUEST_FAILED)
                return

            if len(detectors) == 0:
                msg = Errors.NOTHING_TO_DO.format(rpath)
                logging.warning(msg)
                await update_mdc_status(self.mdc, 'correct', rid, msg)
                return

            ret, _ = await self.launch_jobs(
                [runnr], rid, detectors, 'correct', instrument, cycle, proposal,
                request_time,
            )
            await update_mdc_status(self.mdc, 'correct', rid, ret)
        # END of part to run after sending reply

        asyncio.ensure_future(_continue())

        return queued_msg.encode()

    async def handle_dark_request(
            self, rid, _sase, instrument, cycle, proposal, karabo_id,
            operation_mode, *extra
    ):
        """Launch dark run processing

        :param rid: is the runid within the MDC database
        :param _sase: is the sase beamline
        :param instrument: is the instrument
        :param cycle: is the facility cycle
        :param proposal: is the proposal id
        :param karabo_id: is the detector karabo id
        :param operation_mode: is the detector's operation mode, as defined in
               CalCat
        :param pdu_names: physical detector units for each modules
        :param karabo_das: the Data Agreggators representing which detector
               modules to calibrate
        :param runnr: is the run number in integer form, i.e. without leading
                     "r"
        """
        request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
        try:
            pdus, karabo_das, wait_runs = ast.literal_eval(','.join(extra))

            karabo_das = [val.strip() for val in karabo_das]
            runs = [str(val) for val in wait_runs]

            proposal = self._normalise_proposal_num(proposal)

            pconf_full = self.load_proposal_config(cycle, proposal)

            data_conf = pconf_full['data-mapping']
            if instrument in pconf_full['dark']:
                pconf = pconf_full['dark'][instrument]
            else:
                logging.info(f'Instrument {instrument} is unknown')
                return Errors.NOT_CONFIGURED.encode()

            in_folder = self.config['dark']['in-folder'].format(
                instrument=instrument, cycle=cycle, proposal=proposal)
            out_folder = self.config['dark']['out-folder'].format(
                instrument=instrument, cycle=cycle, proposal=proposal,
                runs='_'.join(runs))
            out_folder = str(Path(out_folder, karabo_id.replace('-', '_')))

        except Exception as e:
            msg = Errors.JOB_LAUNCH_FAILED.format('dark_request', e)
            logging.error(msg, exc_info=e)
            asyncio.ensure_future(
                update_mdc_status(self.mdc, 'dark_request', rid, msg)
            )
            return msg.encode()

        queued_msg = Success.QUEUED.format(proposal, runs)
        logging.debug(queued_msg)

        async def _continue():
            """Runs in the background after we reply to the 'dark_request' request"""
            await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg)

            transfer_complete = await wait_transfers(
                runs, in_folder, proposal
            )
            if not transfer_complete:
                # Timed out
                await update_mdc_status(
                    self.mdc, 'dark_request', rid, MDC.MIGRATION_TIMEOUT
                )
                return

            # Notebooks require one or three runs, depending on the
            # detector type and operation mode.
            triple = any(det in karabo_id for det in
                         ["LPD", "AGIPD", "JUNGFRAU", "JF", "JNGFR"])

            if triple and len(runs) == 1:
                runs_dict = {'run-high': runs[0],
                             'run-med': '0',
                             'run-low': '0'}
            elif triple and len(runs) == 3:
                runs_dict = {'run-high': runs[0],
                             'run-med': runs[1],
                             'run-low': runs[2]}
            else:  # single
                runs_dict = {'run': runs[0]}

            # We assume that MyMDC does not allow dark request if the data
            # is not migrated, thus skipping some validation here.
            thisconf = copy.copy(data_conf[karabo_id])

            if (karabo_id in pconf
                    and isinstance(pconf[karabo_id], dict)):
                thisconf.update(copy.copy(pconf[karabo_id]))

            thisconf['in-folder'] = in_folder
            thisconf['out-folder'] = out_folder
            thisconf['karabo-id'] = karabo_id
            thisconf['karabo-da'] = karabo_das
            thisconf['operation-mode'] = operation_mode

            thisconf.update(runs_dict)

            detectors = {karabo_id: thisconf}

            ret, report_path = await self.launch_jobs(
                runs, rid, detectors, 'dark', instrument, cycle, proposal,
                request_time
            )
            await update_mdc_status(self.mdc, 'dark_request', rid, ret)
            if len(report_path) == 0:
                logging.warning("Failed to identify report path for dark_request")
            else:
                if len(report_path) > 1:
                    logging.warning(
                        "More than one report path is returned. "
                        "Updating myMDC with the first report path only."
                    )
                await update_darks_paths(
                    self.mdc, rid, in_folder,
                    out_folder, report_path[0]
                )
        # END of part to run after sending reply

        asyncio.ensure_future(_continue())

        return queued_msg.encode()

    def handle_query_rid(self, rid):
        return query_rid(self.job_db, rid)

    def handle_upload_yaml(self, _sase, instrument, cycle, proposal, this_yaml):
        """Reconfigure detector calibration for the specified proposal

        :param _sase: is the sase beamline
        :param instrument: is the instrument
        :param cycle: is the facility cycle
        :param proposal: is the proposal id
        :param this_yaml: is url-encoded (quotes and spaces) representation of
                     new YAML file

        This will create or replace the existing YAML configuration for the
        proposal and cycle with the newly sent one, and then push it to the git
        configuration repo.
        """
        this_yaml = urllib.parse.unquote_plus(this_yaml)
        return upload_config(
            self.config['config-repo'], this_yaml, instrument, cycle, proposal
        )

    async def handle_update_conf(
            self, sase, karabo_id, instrument, cycle, proposal, config_yaml,
            apply
    ):
        updated_config = None
        try:
            updated_config = json.loads(config_yaml)
            return change_config(
                self.config['config-repo'],
                updated_config,
                instrument,
                cycle,
                proposal,
                apply.upper() == "TRUE",
            )
        except Exception as e:
            err_msg = (f"Failure applying config for {proposal}:"
                       f" {e}: {updated_config}")
            logging.error(err_msg, exc_info=e)
            return yaml.dump(err_msg, default_flow_style=False).encode()

    # Helper methods for handlers ---------------------------------------------

    @staticmethod
    def _normalise_proposal_num(p: str) -> str:
        return "{:06d}".format(int(p.strip('p')))

    def load_proposal_config(self, cycle, proposal) -> Dict:
        # Read calibration configuration from yaml
        conf_file = Path(
            self.config['config-repo']['local-path'], cycle, f'{proposal}.yaml'
        )
        if not conf_file.exists():
            conf_file = Path(
                self.config['config-repo']['local-path'], "default.yaml"
            )

        logging.debug("Loading config for cycle %s, proposal %s from %s",
                      cycle, proposal, conf_file)

        with open(conf_file, "r") as f:
            return yaml.load(f.read(), Loader=yaml.FullLoader)

    async def launch_jobs(
            self, run_nrs, rid, detectors, action, instrument, cycle, proposal,
            request_time
    ) -> (str, List[str]):
        report = []
        ret = []
        # run xfel_calibrate
        for karabo_id, dconfig in detectors.items():
            detector = dconfig['detector-type']
            del dconfig['detector-type']
            cmd = self.config[action]['cmd'].format(
                detector=detector,
                sched_prio=str(self.config[action]['sched-prio']),
                action=action, instrument=instrument,
                cycle=cycle, proposal=proposal,
                runs="_".join([f"r{r}" for r in run_nrs]),
                time_stamp=datetime.now().strftime('%y%m%d_%H%M%S'),
                det_instance=karabo_id,
                request_time=request_time
            ).split()

            cmd = parse_config(cmd, dconfig)
            # TODO: Add detector info in returned run action status.
            ret.append(await run_action(
                self.job_db, cmd, self.mode,
                proposal, run_nrs[-1], rid
            ))
            if '--report-to' in cmd[:-1]:
                report_idx = cmd.index('--report-to') + 1
                report.append(cmd[report_idx] + '.pdf')
        # return string without a tailing comma.
        return ", ".join(ret), report


def main(argv: Optional[List[str]] = None):
    if argv is None:
        argv = sys.argv[1:]

    # Ensure files are opened as UTF-8 by default, regardless of environment.
    locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8'))

    parser = argparse.ArgumentParser(
        description='Start the calibration webservice'
    )
    parser.add_argument('--config-file', type=str, default='./webservice.yaml')
    parser.add_argument('--mode', type=str, default="sim", choices=['sim', 'prod'])
    parser.add_argument('--log-file', type=str, default='./web.log')
    parser.add_argument(
        '--log-level', type=str, default="INFO", choices=['INFO', 'DEBUG', 'ERROR']
    )

    args = parser.parse_args(argv)
    config_file = args.config_file
    log_file = args.log_file
    log_level = args.log_level
    mode = args.mode

    with open(config_file, "r") as f:
        config = yaml.safe_load(f.read())

    fmt = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] %(message)s'
    logging.basicConfig(
        filename=log_file,
        level=getattr(logging, log_level),
        format=fmt
    )

    # Update job statuses from Slurm in a separate thread
    slurm_monitor_thread = Thread(
        target=update_job_db, args=(config,), daemon=True
    )
    slurm_monitor_thread.start()

    # Launch the ZMQ server to handle requests for calibration
    server = ActionsServer(config, mode)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server.run())
    loop.close()


if __name__ == "__main__":
    main()