import ast import queue import sys import threading from unittest.mock import patch import pytest import yaml import zmq from webservice.update_config import ( _find_cycle, _add_available_configs_to_arg_parser, _create_new_config_from_args_input, main, ) def fake_zmq_server(context, q): socket = context.socket(zmq.REP) socket.bind("inproc://socket") while True: q.put(socket.recv()) socket.send(yaml.dump("SUCCESS", default_flow_style=False).encode()) def test_main_sys_exit(capsys): """Test update config main function.""" with patch.object( sys, "argv", [ "update_config", "--karabo-id", "SPB_DET_AGIPD1M-1", "--webservice-address", "inproc://socket", "--dark", ] ): with pytest.raises(SystemExit): main() out, _ = capsys.readouterr() assert out == "Need to define all required fields\n" EXPECTED_ZMQ_REQ = [ 'update_conf', 'SASEX', 'SPB_DET_AGIPD1M-1', 'SPB', '000000', '000000', '{"correct": {"SPB": {"SPB_DET_AGIPD1M-1": {"rel-gain": true}}}, "data-mapping": {"SPB_DET_AGIPD1M-1": {"karabo-da": ["AGIPD00"]}}}', # noqa 'False', None, ] def test_main(capsys): """Test update_config.py main function.""" q = queue.Queue() context = zmq.Context() t = threading.Thread( target=fake_zmq_server, args=(context, q), daemon=True) t.start() with patch.object( sys, "argv", [ "update_conf", "--karabo-id", "SPB_DET_AGIPD1M-1", "--karabo-da", "AGIPD00", "--proposal", "000000", "--cycle", "000000", "--rel-gain", "true", "--webservice-address", "inproc://socket", "--correct", "--verbose" ], ): with patch("zmq.Context", return_value=context): main() out, _ = capsys.readouterr() assert "SUCCESS" in out # The last component is a an unpredictable MUNGE token; ignored here: assert ast.literal_eval( q.get(timeout=1).decode('utf-8'))[:-1] == EXPECTED_ZMQ_REQ[:-1] EXPECTED_CONF = [ { 'common-mode': {'type': bool}, 'force-hg-if-below': {'type': int}, 'rel-gain': {'type': bool}, 'xray-gain': {'type': bool}, 'blc-noise': {'type': bool}, 'blc-set-min': {'type': bool}, 'blc-stripes': {'type': bool}, 'zero-nans': {'type': bool}, 'zero-orange': {'type': bool}, 'max-pulses': {'type': list, 'msg': 'Range list of maximum pulse indices ' '(--max-pulses start end step). ' '3 max input elements. '}, 'use-litframe-finder': {'type': str}, 'litframe-device-id': {'type': str}, 'energy-threshold': {'type': int}, 'no-common-mode': {'type': bool}, 'no-rel-gain': {'type': bool}, 'no-xray-gain': {'type': bool}, 'no-blc-noise': {'type': bool}, 'no-blc-set-min': {'type': bool}, 'no-blc-stripes': {'type': bool}, 'no-zero-nans': {'type': bool}, 'no-zero-orange': {'type': bool} }, { 'karabo-da': { 'type': list, 'choices': [ 'AGIPD00', 'AGIPD01', 'AGIPD02', 'AGIPD03', 'AGIPD04', 'AGIPD05', 'AGIPD06', 'AGIPD07', 'AGIPD08', 'AGIPD09', 'AGIPD10', 'AGIPD11', 'AGIPD12', 'AGIPD13', 'AGIPD14', 'AGIPD15' ], 'msg': 'Choices: [AGIPD00 ... AGIPD15]. ' } } ] # args_1 rel_gain == True args_1 = { "karabo_id": "SPB_DET_AGIPD1M-1", "proposal": 000000, "cycle": 000000, "correct": True, "apply": False, "webservice_port": "tcp://max-exfl-cal001:5555", "instrument": None, "force_hg_if_below": None, "rel_gain": True, "xray_gain": None, "blc_noise": None, "blc_set_min": None, "blc_stripes": None, "zero_nans": None, "zero_orange": None, "max_pulses": None, "no_rel_gain": None, "no_xray_gain": None, "no_blc_noise": None, "no_blc_set_min": None, "no_blc_stripes": None, "no_zero_nans": None, "no_zero_orange": None, "karabo_da": None, } # args_2 args_2 = dict(args_1) args_2["no_rel_gain"] = True def test_add_available_configs_to_arg_parser(): """Test creating available configuration dictionary with update booleans.""" available_conf = _add_available_configs_to_arg_parser( karabo_id="SPB_DET_AGIPD1M-1", action="correct") assert available_conf == EXPECTED_CONF expected_1 = {'correct': {'SPB': {'SPB_DET_AGIPD1M-1': {'rel-gain': True}}}} param_1 = ("SPB", args_1, expected_1) expected_2 = {'correct': {'CALLAB': {'SPB_DET_AGIPD1M-1': {'rel-gain': False}}}} # noqa param_2 = ("CALLAB", args_2, expected_2) @pytest.mark.parametrize("instrument,args,expected", [param_1, param_2]) def test_create_new_config_from_args_input(instrument, args, expected): """Test update config creation from input arguments.""" new_conf = _create_new_config_from_args_input( instrument=instrument, args=args, available_conf=EXPECTED_CONF, ) assert new_conf == expected def test_find_cycle(tmp_path): proposal_path = tmp_path / 'CALLAB' / '202301' / 'p002003' proposal_path.mkdir(parents=True, exist_ok=True) assert _find_cycle('2003', tmp_path) == '202301' assert _find_cycle('002003', tmp_path) == '202301' with pytest.raises(ValueError): # Not existing proposal. _find_cycle('2004', tmp_path) with pytest.raises(ValueError): # Not a number. _find_cycle('p2004', tmp_path)