diff --git a/ebpfcat/ebpfcat.py b/ebpfcat/ebpfcat.py index 5218a6cfeffc2b5e4bd03e442c768a906220e924..61086004c1ee2338c4758cc2e41bb9ce7fa86cd1 100644 --- a/ebpfcat/ebpfcat.py +++ b/ebpfcat/ebpfcat.py @@ -17,6 +17,7 @@ """The high-level API for EtherCAT loops""" from asyncio import ensure_future, gather, sleep, wait_for, TimeoutError +from collections import defaultdict from contextlib import asynccontextmanager, contextmanager import os from struct import pack, unpack, calcsize, pack_into, unpack_from @@ -219,10 +220,15 @@ class Device(SubProgram): to serve a common goal. A terminal may be used by several devices. """ def get_terminals(self): - ret = set() + """return the terminals used by this device + + return a dictionary of terminal vs. a boolean indicating + whether access is read-write. + """ + ret = defaultdict(lambda: False) for pv in self.__dict__.values(): if isinstance(pv, (PacketVar, Struct)): - ret.add(pv.terminal) + ret[pv.terminal] |= pv.sm == 2 return ret def fast_update(self): @@ -246,7 +252,7 @@ class EBPFTerminal(Terminal): self.pdo_in_sz = int((inbits + 7) // 8) await self.write_pdo_sm() - def allocate(self, packet, readonly): + def allocate(self, packet, readwrite): """allocate space in packet for the pdos of this terminal return a dict that contains the starting offset for each @@ -256,15 +262,11 @@ class EBPFTerminal(Terminal): bases[3] = packet.size + packet.DATAGRAM_HEADER packet.append(ECCmd.FPRD, b"\0" * self.pdo_in_sz, 0, self.position, self.pdo_in_off) - if self.pdo_out_sz: + if readwrite and self.pdo_out_sz: bases[2] = packet.size + packet.DATAGRAM_HEADER - if readonly: - packet.on_the_fly.append((packet.size, ECCmd.FPWR)) - packet.append(ECCmd.NOP, b"\0" * self.pdo_out_sz, 0, - self.position, self.pdo_out_off) - else: - packet.append(ECCmd.FPWR, b"\0" * self.pdo_out_sz, 0, - self.position, self.pdo_out_off) + packet.on_the_fly.append((packet.size, ECCmd.FPWR)) + packet.append(ECCmd.FPWR, b"\0" * self.pdo_out_sz, 0, + self.position, self.pdo_out_off) return bases def update(self, data): @@ -316,7 +318,8 @@ class EtherXDP(XDP): self.exit(XDPExitCode.PASS) -class SimpleEtherCat(EtherCat): +class EtherCatBase: + # this class exists only to allow for testing def __init__(self, network, terminals): super().__init__(network) self.terminals = terminals @@ -325,7 +328,12 @@ class SimpleEtherCat(EtherCat): async def scan_bus(self): await gather(*[t.initialize(-i, i + 1) - for (i, t) in enumerate(self.terminals)]) + for (i, t) in enumerate(self.terminals)]) + + +class SimpleEtherCat(EtherCatBase, EtherCat): + pass + class FastEtherCat(SimpleEtherCat): MAX_PROGS = 64 @@ -378,13 +386,15 @@ class SyncGroupBase: self.ec = ec self.devices = devices - terminals = set() + terminals = defaultdict(lambda: False) for dev in self.devices: - terminals.update(dev.get_terminals()) + for t, rw in dev.get_terminals().items(): + terminals[t] |= rw dev.sync_group = self # sorting is only necessary for test stability - self.terminals = {t: None for t in - sorted(terminals, key=lambda t: t.position)} + self.terminals = {t: rw for t, rw in + sorted(terminals.items(), + key=lambda item: item[0].position)} async def to_operational(self): await gather(*[t.to_operational() for t in self.terminals]) @@ -395,13 +405,19 @@ class SyncGroupBase: self.ec.send_packet(data) try: data = await wait_for(self.ec.receive_index(self.packet_index), - timeout=0.01) + timeout=0.1) except TimeoutError: self.missed_counter += 1 print("didn't receive in time", self.missed_counter) continue data = self.update_devices(data) + def allocate(self): + self.packet = Packet() + self.packet.on_the_fly = [] + self.terminals = {t: t.allocate(self.packet, rw) + for t, rw in self.terminals.items()} + class SyncGroup(SyncGroupBase): """A group of devices communicating at the same time""" @@ -423,11 +439,6 @@ class SyncGroup(SyncGroupBase): ensure_future(self.to_operational()) return ret - def allocate(self): - self.packet = Packet() - self.terminals = {t: t.allocate(self.packet, False) - for t in self.terminals} - class FastSyncGroup(SyncGroupBase, XDP): license = "GPL" @@ -447,7 +458,11 @@ class FastSyncGroup(SyncGroupBase, XDP): async def run(self): with self.ec.register_sync_group(self) as self.packet_index: - self.asm_packet = self.packet.assemble(self.packet_index) + self.asm_packet = bytearray( + self.packet.assemble(self.packet_index)) + for pos, cmd in self.packet.on_the_fly: + self.asm_packet[pos] = ECCmd.NOP.value + # prime the pump: two packets to get things going self.ec.send_packet(self.asm_packet) self.ec.send_packet(self.asm_packet) await super().run() @@ -469,9 +484,3 @@ class FastSyncGroup(SyncGroupBase, XDP): def cancel(self): self.task.cancel() - - def allocate(self): - self.packet = Packet() - self.packet.on_the_fly = [] - self.terminals = {t: t.allocate(self.packet, True) - for t in self.terminals} diff --git a/ebpfcat/ethercat_test.py b/ebpfcat/ethercat_test.py index f091bc59438c2553f4c659616117c707c1ec8fe0..79fe8fe00090cf07f1670e9990c7ea13690d2f56 100644 --- a/ebpfcat/ethercat_test.py +++ b/ebpfcat/ethercat_test.py @@ -16,30 +16,36 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from ast import literal_eval -from asyncio import CancelledError, Future, get_event_loop, sleep, gather +from asyncio import as_completed, CancelledError, Event, Future, get_event_loop, sleep, gather +from contextlib import contextmanager from functools import wraps from itertools import count from struct import pack from unittest import TestCase, main, skip from .devices import AnalogInput, AnalogOutput, Motor -from .terminals import EL4104, EL3164, EK1814 +from .terminals import EL4104, EL3164, EK1814, Skip from .ethercat import ECCmd, Terminal from .ebpfcat import ( - FastSyncGroup, SyncGroup, TerminalVar, Device, EBPFTerminal, PacketDesc) + FastSyncGroup, SyncGroup, TerminalVar, Device, EBPFTerminal, PacketDesc, + EtherCatBase) from .ebpf import Instruction, Opcode as O H = bytes.fromhex -class MockEtherCat: +class MockEtherCatBase: def __init__(self, test): self.test = test with open(__file__.rsplit("/", 1)[0] + "/testdata.py", "r") as fin: self.test_data = literal_eval(fin.read()) + +class MockEtherCat(EtherCatBase, MockEtherCatBase): async def roundtrip(self, *args, data=None): + if self.expected is None: + return if data is not None: args += data, if not self.expected: @@ -53,14 +59,15 @@ class MockEtherCat: async def receive_index(self, index): self.test.assertEqual(index, self.expected.pop(0)) if not self.expected: - self.test.future.cancel() - for i in range(10): - await sleep(0) + self.test.data_needed.set() + await sleep(0) return self.results.pop(0) - def register_sync_group(self, sg, packet): + @contextmanager + def register_sync_group(self, sg): self.rsg = sg - return 0x33 + yield 0x33 + return class MockTerminal(Terminal): @@ -70,6 +77,7 @@ class MockTerminal(Terminal): data = self.ec.test_data[-relative] self.test_eeprom = data["eeprom"] self.test_sdo = data["sdo"] + await self.apply_eeprom() async def to_operational(self, state=8): @@ -101,7 +109,9 @@ class MockTerminal(Terminal): def mockAsync(f): @wraps(f) def wrapper(self): + self.data_needed = Event() get_event_loop().run_until_complete(f(self)) + self.assertTrue(self.data_needed.is_set()) return wrapper @@ -112,24 +122,28 @@ def mockTerminal(cls): class Tests(TestCase): + async def new_data(self): + for f in as_completed((self.data_needed.wait(), self.task)): + await f + return + @mockAsync async def test_input(self): ti = mockTerminal(EL3164) - ec = MockEtherCat(self) - ti.ec = ec + ec = MockEtherCat(self, [Skip(), ti]) ec.expected = [ - (ECCmd.FPWR, 4, 0x800, 0x80), - (ECCmd.FPWR, 4, 0x800, H('00108000260001018010800022000102' + (ECCmd.FPWR, 2, 0x800, 0x80), + (ECCmd.FPWR, 2, 0x800, H('00108000260001018010800022000102' '00110000040000038011100020000104')), - (ECCmd.FPWR, 4, 2070, 'B', 0), # disable sync manager - (ECCmd.FPWR, 4, 2066, 'H', 0), # set sync manager size - (ECCmd.FPWR, 4, 2070, 'B', False), # disable 0-length sync manager - (ECCmd.FPWR, 4, 2078, 'B', 0), # disable other sync manager - (ECCmd.FPWR, 4, 2074, 'H', 16), # set sync manager size - (ECCmd.FPWR, 4, 2078, 'B', True), # enable sync manager + (ECCmd.FPWR, 2, 2070, 'B', 0), # disable sync manager + (ECCmd.FPWR, 2, 2066, 'H', 0), # set sync manager size + (ECCmd.FPWR, 2, 2070, 'B', False), # disable 0-length sync manager + (ECCmd.FPWR, 2, 2078, 'B', 0), # disable other sync manager + (ECCmd.FPWR, 2, 2074, 'H', 16), # set sync manager size + (ECCmd.FPWR, 2, 2078, 'B', True), # enable sync manager ] ec.results = [None, None, None, None, None, None, None, None] - await ti.initialize(-1, 4) + await ec.scan_bus() ai = AnalogInput(ti.channel1.value) SyncGroup.packet_index = 0x66554433 sg = SyncGroup(ec, [ai]) @@ -138,7 +152,7 @@ class Tests(TestCase): H("2a10" # EtherCAT Header, length & type "0000334455660280000000000000" # ID datagram # in datagram - "04000400801110000000000000000000000000000000000000000000" + "04000200801110000000000000000000000000000000000000000000" "3333"), # padding 0x66554433, # index H("2a10" # EtherCAT Header, length & type @@ -160,9 +174,7 @@ class Tests(TestCase): "04000400801110000000123456780000000000000000000000000000" "3333"), # padding ] - self.future = Future() - with self.assertRaises(CancelledError): - await gather(self.future, self.task) + await self.new_data() self.assertEqual(ai.value, 0x7856) self.task.cancel() with self.assertRaises(CancelledError): @@ -171,21 +183,20 @@ class Tests(TestCase): @mockAsync async def test_output(self): ti = mockTerminal(EL4104) - ec = MockEtherCat(self) - ti.ec = ec + ec = MockEtherCat(self, [Skip(), Skip(), ti]) ec.expected = [ - (ECCmd.FPWR, 7, 0x800, 0x80), - (ECCmd.FPWR, 7, 0x800, H('0010800026000101801080002200010' + (ECCmd.FPWR, 3, 0x800, 0x80), + (ECCmd.FPWR, 3, 0x800, H('0010800026000101801080002200010' '200110800240001038011000000000004')), - (ECCmd.FPWR, 7, 2070, 'B', 0), # disable sync manager - (ECCmd.FPWR, 7, 2066, 'H', 8), # set sync manager size - (ECCmd.FPWR, 7, 2070, 'B', True), # enable sync manager - (ECCmd.FPWR, 7, 2078, 'B', 0), # disable other sync manager - (ECCmd.FPWR, 7, 2074, 'H', 0), # set sync manager size - (ECCmd.FPWR, 7, 2078, 'B', False), # disable 0-length sync manager + (ECCmd.FPWR, 3, 2070, 'B', 0), # disable sync manager + (ECCmd.FPWR, 3, 2066, 'H', 8), # set sync manager size + (ECCmd.FPWR, 3, 2070, 'B', True), # enable sync manager + (ECCmd.FPWR, 3, 2078, 'B', 0), # disable other sync manager + (ECCmd.FPWR, 3, 2074, 'H', 0), # set sync manager size + (ECCmd.FPWR, 3, 2078, 'B', False), # disable 0-length sync manager ] ec.results = [None, None, None, None, None, None, None, None] - await ti.initialize(-2, 7) + await ec.scan_bus() ao = AnalogOutput(ti.ch1_value) SyncGroup.packet_index = 0x55443322 sg = SyncGroup(ec, [ao]) @@ -193,17 +204,15 @@ class Tests(TestCase): ec.expected = [ H("2210" # EtherCAT Header, length & type "0000223344550280000000000000" # ID datagram - "0500070000110800000000000000000000000000" # out datagram + "0500030000110800000000000000000000000000" # out datagram "33333333333333333333"), # padding 0x55443322, # index ] ec.results = [ (8, 0), # return state 8, no error ] - self.future = Future() ao.value = 0x9876 - with self.assertRaises(CancelledError): - await gather(self.future, self.task) + await self.new_data() ec.expected = [ H("2210" # EtherCAT Header, length & type "0000223344550280000000000000" # ID datagram @@ -217,15 +226,12 @@ class Tests(TestCase): "0500070000110800000076980000000000000000" # out datagram "33333333333333333333"), # padding ] - self.future = Future() - with self.assertRaises(CancelledError): - await gather(self.future, self.task) self.task.cancel() with self.assertRaises(CancelledError): await self.task - @skip - def test_ebpf(self): + @mockAsync + async def test_ebpf(self): ti = mockTerminal(EL3164) to = mockTerminal(EL4104) td = mockTerminal(EK1814) @@ -245,26 +251,30 @@ class Tests(TestCase): self.ao = self.ai d = D() + + ec = MockEtherCat(self, [td, ti, to]) + ec.expected = None + await ec.scan_bus() d.ai = ti.channel1.value d.ao = to.ch1_value - d.di = td.ch1 - d.do = td.ch5 - - ec = MockEtherCat(self) + d.di = td.channel1 + d.do = td.channel5 sg = FastSyncGroup(ec, [d]) ec.expected = [ - bytes.fromhex("4610" # EtherCAT Header, length & type - "000033000000008000000000" # ID datagram - "04004400777701800000000000" # digi in - "05004400888801800000000000" # digi out - "0500550078560280000000000000" # ana out - "04007700cdab04000000000000000000") # ana in + bytes.fromhex("5810" # EtherCAT Header, length & type + "0000330000000280000000000000" # ID datagram + "04000100001001800000000000" # digi in + "00000100010f01800000000000" # digi out + "04000200801110800000000000000000000" + "000000000000000000000" # ana in + "0000030000110800000000000000000000000000" # ana out + ) + ] * 3 + [51] + ec.results = [#ec.expected[0], ec.expected[0], ] - task = sg.start() + self.task = sg.start() + await self.new_data() self.assertEqual(ec.rsg, sg) - task.cancel() - with self.assertRaises(CancelledError): - get_event_loop().run_until_complete(task) sg.program() self.maxDiff = None self.assertEqual(sg.opcodes, [ @@ -356,9 +366,9 @@ class Tests(TestCase): @skip def test_motor(self): class T(EBPFTerminal): - v = PacketDesc((0, 2), "H") - e = PacketDesc((1, 0), "H") - q = PacketDesc((0, 0), 0) + v = PacketDesc(2, 2, "H") + e = PacketDesc(3, 0, "H") + q = PacketDesc(2, 0, 0) t = T() t.pdo_in_sz = 2 t.pdo_in_off = 0x1234 @@ -370,6 +380,7 @@ class Tests(TestCase): m.encoder = t.e m.low_switch = m.high_switch = t.q me = MockEtherCat(self) + t.ec = me me.expected = [ bytes.fromhex("2c10" "000033000000008000000000"