diff --git a/ebpfcat/ebpfcat.py b/ebpfcat/ebpfcat.py index fb852d1acfbfb82f249cecb3cc29c60d43ca4045..05f4e22edddba367953fba7d88968293b44e5f5a 100644 --- a/ebpfcat/ebpfcat.py +++ b/ebpfcat/ebpfcat.py @@ -16,7 +16,8 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """The high-level API for EtherCAT loops""" -from asyncio import ensure_future, gather, sleep, wait_for, TimeoutError +from asyncio import ( + CancelledError, ensure_future, gather, sleep, wait_for, TimeoutError) from collections import defaultdict from contextlib import asynccontextmanager, AsyncExitStack, contextmanager import os @@ -454,18 +455,25 @@ class SyncGroupBase: async def run(self): data = self.asm_packet async with self.map_fmmu(): - ensure_future(self.to_operational()) - while True: - self.ec.send_packet(data) + task = ensure_future(self.to_operational()) + try: + while True: + self.ec.send_packet(data) + try: + data = await wait_for( + self.ec.receive_index(self.packet_index), + timeout=0.1) + except TimeoutError: + self.missed_counter += 1 + print("didn't receive in time", self.missed_counter) + continue + data = self.update_devices(data) + finally: + task.cancel() try: - data = await wait_for( - self.ec.receive_index(self.packet_index), - timeout=0.1) - except TimeoutError: - self.missed_counter += 1 - print("didn't receive in time", self.missed_counter) - continue - data = self.update_devices(data) + await task # should be done quickly, just here to not forget + except CancelledError: + pass def allocate(self): self.packet = SterilePacket() @@ -494,20 +502,15 @@ class SyncGroup(SyncGroupBase): return self.current_data async def to_operational(self): - try: - r = await gather(*[t.to_operational() for t in self.terminals]) - - while True: - for t in self.terminals: - state, error = await t.get_state() - if state != 8: # operational - print(f"ERROR AL register {error}") - await t.to_operational() - await sleep(1) - except Exception: - import traceback - traceback.print_exc() - raise + r = await gather(*[t.to_operational() for t in self.terminals]) + + while True: + for t in self.terminals: + state, error = await t.get_state() + if state != 8: # operational + print(f"ERROR AL register {error}") + await t.to_operational() + await sleep(1) def start(self): self.allocate() diff --git a/ebpfcat/ethercat_test.py b/ebpfcat/ethercat_test.py index c2fb898e5505fea7c48d30341da99be6a5cc4c26..64e1b6bd756ae1ffa31eaf63e84d3709576a77d3 100644 --- a/ebpfcat/ethercat_test.py +++ b/ebpfcat/ethercat_test.py @@ -49,7 +49,7 @@ class MockEtherCat(MockEtherCatBase): if data is not None: args += data, if not self.expected: - self.test.fail(f"missing {args}") + self.test.fail(f"unexpected {args}") self.test.assertEqual(args, self.expected.pop(0)) return self.results.pop(0) @@ -131,6 +131,7 @@ class Tests(TestCase): async def test_input(self): ec = MockEtherCat(self) ti = mockTerminal(ec, EL3164) + ti.use_fmmu = False terms = [Skip(ec), ti] ec.expected = [ (ECCmd.FPWR, 2, 0x800, 0x80), (ECCmd.FPWR, 2, 0x800, H('00108000260001018010800022000102' @@ -161,6 +162,7 @@ class Tests(TestCase): # in datagram "04000400801110000000123456780000000000000000000000000000" "3333"), # padding + (ECCmd.FPRD, 2, 304, 'H2xH'), # get_state 0x66554433, # index ] ec.results = [ @@ -169,6 +171,7 @@ class Tests(TestCase): # in datagram "04000400801110000000123456780000000000000000000000000000" "3333"), # padding + (8, 0), # return state 8, no error H("2a10" # EtherCAT Header, length & type "0000334455660280000000000000" # ID datagram # in datagram @@ -176,6 +179,7 @@ class Tests(TestCase): "3333"), # padding ] await self.new_data() + self.assertFalse(ec.expected or ec.results) self.assertEqual(ai.value, 0x7856) self.task.cancel() with self.assertRaises(CancelledError): @@ -185,6 +189,7 @@ class Tests(TestCase): async def test_output(self): ec = MockEtherCat(self) ti = mockTerminal(ec, EL4104) + ti.use_fmmu = False terms = [Skip(ec), Skip(ec), ti] ec.expected = [ (ECCmd.FPWR, 3, 0x800, 0x80), @@ -204,31 +209,30 @@ class Tests(TestCase): SyncGroup.packet_index = 0x55443322 sg = SyncGroup(ec, [ao]) self.task = sg.start() + self.assertFalse(ec.expected or ec.results) ec.expected = [ H("2210" # EtherCAT Header, length & type "0000223344550280000000000000" # ID datagram "0500030000110800000000000000000000000000" # out datagram "33333333333333333333"), # padding 0x55443322, # index - ] - ec.results = [ - (8, 0), # return state 8, no error - ] - ao.value = 0x9876 - await self.new_data() - ec.expected = [ H("2210" # EtherCAT Header, length & type "0000223344550280000000000000" # ID datagram - "0500070000110800000076980000000000000000" # out datagram + "0500030000110800000076980000000000000000" # out datagram "33333333333333333333"), # padding + (ECCmd.FPRD, 3, 304, 'H2xH'), # get_state 0x55443322, # index ] ec.results = [ H("2210" # EtherCAT Header, length & type "0000223344550280000000000000" # ID datagram - "0500070000110800000076980000000000000000" # out datagram + "0500030000110800000000000000000000000000" # out datagram "33333333333333333333"), # padding + (8, 0), # return state 8, no error ] + ao.value = 0x9876 + await self.new_data() + self.assertFalse(ec.expected or ec.results) self.task.cancel() with self.assertRaises(CancelledError): await self.task @@ -251,14 +255,17 @@ class Tests(TestCase): d = D() - terms = [td, ti, to] ec = MockEtherCat(self) ec.expected = None ti = mockTerminal(ec, EL3164) to = mockTerminal(ec, EL4104) td = mockTerminal(ec, EK1814) + ti.use_fmmu = False + to.use_fmmu = False + td.use_fmmu = False + terms = [td, ti, to] await gather(*[t.initialize(-i, i + 1) for (i, t) in enumerate(terms)]) d.ai = ti.channel1.value @@ -280,6 +287,7 @@ class Tests(TestCase): ] self.task = sg.start() await self.new_data() + self.assertFalse(ec.expected or ec.results) self.assertEqual(ec.rsg, sg) sg.program() self.maxDiff = None