diff --git a/devices.py b/devices.py new file mode 100644 index 0000000000000000000000000000000000000000..07f8734a97c48fd0630fdde367b26a95e4fe5827 --- /dev/null +++ b/devices.py @@ -0,0 +1,15 @@ +from .ebpfcat import Device, FastSyncGroup, TerminalVar, DeviceVar + + +class AnalogInput(Device): + value = DeviceVar() + data = TerminalVar() + + def __init__(self, data): + self.data = data + + def program(self): + self.value = self.data + + def update(self): + self.value = self.data diff --git a/ebpfcat.py b/ebpfcat.py index fdc00c442f5d9769bc88b328633c7b5f5b28c9f4..10ae6a185b6efaa9e4dfaef1aaebe2075445d8f4 100644 --- a/ebpfcat.py +++ b/ebpfcat.py @@ -1,9 +1,257 @@ -from asyncio import ensure_future, sleep -from struct import unpack +from asyncio import ensure_future, gather, sleep +from struct import pack, unpack, calcsize from time import time -from .xdp import set_link_xdp_fd -from .ebpf import EBPF -from .bpf import ProgType, create_map, update_elem, prog_test_run, lookup_elem +from .arraymap import ArrayMap, ArrayGlobalVarDesc +from .ethercat import ECCmd, EtherCat, Packet, Terminal +from .ebpf import FuncId, MemoryDesc, SubProgram +from .xdp import XDP +from .hashmap import HashMap +from .bpf import ( + ProgType, MapType, create_map, update_elem, prog_test_run, lookup_elem) + + +class PacketDesc: + def __init__(self, position, size): + self.position = position + self.size = size + + def __get__(self, instance, owner): + if instance is None: + return self + return PacketVar(instance, self) + + +class PacketVar: + def __init__(self, terminal, desc): + self.terminal = terminal + self.desc = desc + + +class TerminalVar(MemoryDesc): + base_register = 9 + + def __init__(self): + super().__init__(bits=16) + + def __set__(self, instance, value): + if isinstance(value, PacketVar): + self.terminal = value.terminal + self.position = value.desc.position + instance.__dict__[self.name] = value + else: + super().__set__(instance.sync_group, value) + + def __get__(self, instance, owner): + if instance is None: + return self + pv = instance.__dict__.get(self.name) + if pv is None: + return None + elif instance.sync_group.current_data is None: + return super().__get__(instance.sync_group, owner) + else: + data = instance.sync_group.current_data + start = self.terminal.bases[self.position[0]] + self.position[1] + fmt = "<" + pv.desc.size + return unpack(fmt, data[start:start+calcsize(fmt)])[0] + + def __set_name__(self, name, owner): + self.name = name + + def addr(self, instance): + # 14 is Ethernet header + return self.terminal.bases[self.position[0]] + self.position[1] + 14 + + +class DeviceVar(ArrayGlobalVarDesc): + def __init__(self, size=4, signed=False): + super().__init__(FastSyncGroup.properties, size, signed) + + def __get__(self, instance, owner): + if instance is None: + return self + elif instance.sync_group.current_data is None: + return super().__get__(instance, owner) + else: + return instance.__dict__[self.name] + + def __set__(self, instance, value): + if instance.sync_group.current_data is None: + super().__set__(instance, value) + else: + instance.__dict__[self.name] = value + + +class Device(SubProgram): + def get_terminals(self): + ret = set() + for pv in self.__dict__.values(): + if isinstance(pv, PacketVar): + ret.add(pv.terminal) + return ret + + +class EBPFTerminal(Terminal): + compatibility = None + + def __init_subclass__(cls): + cls.pdo = {} + for c in cls.__mro__[::-1]: + for k, v in c.__dict__.items(): + if isinstance(v, PacketDesc): + cls.pdo[k] = v + + async def initialize(self, relative, absolute): + await super().initialize(relative, absolute) + if (self.compatibility is not None and + (self.vendorId, self.productCode) not in self.compatibility): + raise RuntimeError("Incompatible Terminal") + + def allocate(self, packet): + if self.pdo_in_sz: + self.bases = [packet.size + packet.DATAGRAM_HEADER] + packet.append(ECCmd.FPRD, b"\0" * self.pdo_in_sz, 0, + self.position, self.pdo_in_off) + else: + self.bases = [None] + if self.pdo_out_sz: + self.bases.append(packet.size + packet.DATAGRAM_HEADER) + packet.append(ECCmd.FPWR, b"\0" * self.pdo_out_sz, 0, + self.position, self.pdo_out_off) + + def update(self, data): + pass + + +class EBPFCat(XDP): + vars = HashMap() + + count = vars.globalVar() + ptype = vars.globalVar() + + def program(self): + #with self.If(self.packet16[12] != 0xA488): + # self.exit(2) + self.count += 1 + #self.ptype = self.packet32[18] + self.exit(2) + + +class EtherXDP(XDP): + license = "GPL" + + variables = HashMap() + count = variables.globalVar() + allcount = variables.globalVar() + + def program(self): + e = self + with e.packetSize > 24 as p, e.If(p.H[12] == 0xA488), \ + e.If(p.B[16] == 0): + e.count += 1 + e.r2 = e.get_fd(self.programs) + e.r3 = p.W[18] + e.call(FuncId.tail_call) + e.allcount += 1 + e.exit(2) + + +class FastEtherCat(EtherCat): + MAX_PROGS = 64 + + def __init__(self, network, terminals): + super().__init__(network) + self.terminals = terminals + for t in terminals: + t.ec = self + self.programs = create_map(MapType.PROG_ARRAY, 4, 4, self.MAX_PROGS) + self.sync_groups = {} + + def register_sync_group(self, sg): + index = len(self.sync_groups) + while index in self.sync_groups: + index = (index + 1) % self.MAX_PROGS + fd, _ = sg.load(log_level=1) + update_elem(self.programs, pack("<I", index), pack("<I", fd), 0) + self.sync_groups[index] = sg + return index + + async def scan_bus(self): + await gather(*[t.initialize(-i, i + 1) + for (i, t) in enumerate(self.terminals)]) + + async def connect(self): + await super().connect() + self.ebpf = EtherXDP() + self.ebpf.programs = self.programs + self.fd = await self.ebpf.attach(self.addr[0]) + + +class SyncGroup: + packet_index = 1000 + + current_data = False # None is used to indicate FastSyncGroup + + def __init__(self, ec, devices, **kwargs): + self.ec = ec + self.devices = devices + + self.terminals = set() + for dev in self.devices: + self.terminals.update(dev.get_terminals()) + dev.sync_group = self + + async def run(self): + await gather(*[t.to_operational() for t in self.terminals]) + while True: + self.ec.send_packet(self.asm_packet) + self.current_data = await self.ec.receive_index(self.packet_index) + for dev in self.devices: + dev.update() + + def start(self): + self.packet = Packet() + for term in self.terminals: + term.allocate(self.packet) + print(self.packet) + self.packet_index = SyncGroup.packet_index + SyncGroup.packet_index += 1 + self.asm_packet = self.packet.assemble(self.packet_index) + ensure_future(self.run()) + + +class FastSyncGroup(XDP): + license = "GPL" + + current_data = None + + properties = ArrayMap() + + def __init__(self, ec, devices, **kwargs): + super().__init__(subprograms=devices, **kwargs) + self.ec = ec + self.devices = devices + + self.terminals = set() + for dev in self.devices: + self.terminals.update(dev.get_terminals()) + dev.sync_group = self + + def program(self): + with self.packetSize >= self.packet.size + 14 as p: + for dev in self.devices: + dev.program() + self.exit(3) + + def start(self): + self.packet = Packet() + for term in self.terminals: + term.allocate(self.packet) + index = self.ec.register_sync_group(self) + self.ec.send_packet(self.packet.assemble(index)) + self.monitor = ensure_future(gather(*[t.to_operational() + for t in self.terminals])) + def script(): fd = create_map(MapType.HASH, 4, 4, 7) diff --git a/ethercat.py b/ethercat.py index 802b9bccde2482942e7f9d6b5452bccff1d99eab..791c33d88e8caf1a6131784bc22f9d4cb547a917 100644 --- a/ethercat.py +++ b/ethercat.py @@ -1,5 +1,6 @@ from asyncio import ensure_future, Event, Future, gather, get_event_loop, Protocol, Queue from enum import Enum +from random import randint from socket import socket, AF_PACKET, SOCK_DGRAM from struct import pack, unpack, calcsize @@ -115,24 +116,42 @@ def datasize(args, data): class Packet: MAXSIZE = 1000 # maximum size we use for an EtherCAT packet + ETHERNET_HEADER = 14 + DATAGRAM_HEADER = 10 + DATAGRAM_TAIL = 2 def __init__(self): self.data = [] - self.size = 2 + self.size = 14 def append(self, cmd, data, idx, *address): self.data.append((cmd, data, idx) + address) - self.size += len(data) + 12 + self.size += len(data) + self.DATAGRAM_HEADER + self.DATAGRAM_TAIL - def assemble(self): - ret = [pack("<H", self.size | 0x1000)] + def assemble(self, index): + ret = [pack("<HBBiHHH", self.size | 0x1000, 0, 0, index, 1 << 15, 0, 0)] for i, (cmd, data, *dgram) in enumerate(self.data, start=1): - ret.append(pack("<BBhHHH" if len(dgram) == 3 else "<BBhIH", + ret.append(pack("<BBhHHH" if len(dgram) == 3 else "<BBiHH", cmd.value, *dgram, len(data) | ((i < len(self.data)) << 15), 0)) ret.append(data) ret.append(b"\0\0") - return b"".join(ret) + return b''.join(ret) + + def __str__(self): + ret = "\n".join(f"{cmd} {data} {idx} {addr}" + for cmd, data, idx, *addr in self.data) + return "Packet([" + ret + "]" + + def disassemble(self, data): + pos = 14 + self.DATAGRAM_HEADER + ret = [] + for cmd, bits, *dgram in self.data: + ret.append((data[pos-self.DATAGRAM_HEADER], + data[pos:pos+len(bits)], + unpack("<H", data[pos+len(bits):pos+len(bits)+2])[0])) + pos += self.DATAGRAM_HEADER + self.DATAGRAM_TAIL + return ''.join(f"{i}: {c} {f} {d}\n" for i, (c, d, f) in enumerate(ret)) def full(self): return self.size > self.MAXSIZE @@ -145,28 +164,49 @@ class AsyncBase: return ret -class EtherCat(Protocol, AsyncBase): - async def __init__(self, network): +class EtherCat(Protocol): + def __init__(self, network): self.addr = (network, 0x88A4, 0, 0, b"\xff\xff\xff\xff\xff\xff") self.send_queue = Queue() - self.idle = Event() + self.wait_futures = {} + + async def connect(self): await get_event_loop().create_datagram_endpoint( lambda: self, family=AF_PACKET, proto=0xA488) async def sendloop(self): packet = Packet() + dgrams = [] while True: *dgram, future = await self.send_queue.get() lastsize = packet.size packet.append(*dgram) - self.dgrams.append((lastsize + 10, packet.size - 2, future)) + dgrams.append((lastsize + 10, packet.size - 2, future)) if packet.full() or self.send_queue.empty(): - self.idle.clear() - self.transport.sendto(packet.assemble(), self.addr) - await self.idle.wait() - assert len(self.dgrams) == 0 + data = await self.roundtrip_packet(packet) + for start, stop, future in dgrams: + future.set_result(data[start:stop]) + dgrams = [] packet = Packet() + async def roundtrip_packet(self, packet): + index = randint(2000, 1000000000) + while index in self.wait_futures: + index = randint(2000, 1000000000) + self.send_packet(packet.assemble(index)) + return await self.receive_index(index) + + async def receive_index(self, index): + future = Future() + self.wait_futures[index] = future + try: + return await future + finally: + del self.wait_futures[index] + + def send_packet(self, packet): + self.transport.sendto(packet, self.addr) + async def roundtrip(self, cmd, pos, offset, *args, data=None, idx=0): future = Future() fmt = "<" + "".join(arg for arg in args[:-1] if isinstance(arg, str)) @@ -192,25 +232,20 @@ class EtherCat(Protocol, AsyncBase): def connection_made(self, transport): transport.get_extra_info("socket").bind(self.addr) self.transport = transport - self.dgrams = [] - self.idle.set() ensure_future(self.sendloop()) def datagram_received(self, data, addr): - for start, stop, future in self.dgrams: - future.set_result(data[start:stop]) - self.dgrams = [] - self.idle.set() + index, = unpack("<I", data[4:8]) + self.wait_futures[index].set_result(data) class Terminal: - def __init__(self, ethercat): - self.ec = ethercat - async def initialize(self, relative, absolute): await self.ec.roundtrip(ECCmd.APWR, relative, 0x10, "H", absolute) self.position = absolute + await self.set_state(0x11) + await self.set_state(1) async def read_eeprom(no, fmt): return unpack(fmt, await self.eeprom_read_one(no)) @@ -269,9 +304,13 @@ class Terminal: ret, = await self.ec.roundtrip(ECCmd.FPRD, self.position, 0x0130, "H") return ret + async def get_state(self): + ret, = await self.ec.roundtrip(ECCmd.FPRD, self.position, 0x0130, "H") + return ret + async def to_operational(self): """try to bring the terminal to operational state""" - order = [1, 2, 4] #, 8] + order = [1, 2, 4, 8] ret, error = await self.ec.roundtrip( ECCmd.FPRD, self.position, 0x0130, "H2xH") if ret & 0x10: diff --git a/terminals.py b/terminals.py new file mode 100644 index 0000000000000000000000000000000000000000..4ed4df38e58aefe39b78209ee7b5ad9fc3cead96 --- /dev/null +++ b/terminals.py @@ -0,0 +1,16 @@ +from .ebpfcat import EBPFTerminal, PacketDesc + + +class Generic(EBPFTerminal): + pass + + +class EL3164(EBPFTerminal): + ch1_attrs = PacketDesc((0, 0), 'H') + ch2_attrs = PacketDesc((0, 4), 'H') + ch3_attrs = PacketDesc((0, 8), 'H') + ch4_attrs = PacketDesc((0, 12), 'H') + ch1_value = PacketDesc((0, 2), 'H') + ch2_value = PacketDesc((0, 6), 'H') + ch3_value = PacketDesc((0, 10), 'H') + ch4_value = PacketDesc((0, 14), 'H') diff --git a/testsystem.py b/testsystem.py new file mode 100644 index 0000000000000000000000000000000000000000..9772f021c1e2b0c8bea88ab368e7d7dd642050b1 --- /dev/null +++ b/testsystem.py @@ -0,0 +1,38 @@ +from asyncio import gather, sleep, ensure_future +from .terminals import EL3164, Generic +from .devices import AnalogInput +from .ebpfcat import FastEtherCat, FastSyncGroup, SyncGroup + +tdigi = Generic() +tout = Generic() +tin = EL3164() + +ec = FastEtherCat("eth0", [tdigi, tin, tout]) + + +async def monitor(ec): + while True: + print("M", ec.ebpf.count, ec.ebpf.allcount, await tin.get_state()) + await sleep(0.1) + + +async def main(): + await ec.connect() + await ec.scan_bus() + #ensure_future(monitor(ec)) + + ai = AnalogInput(tin.ch1_value) + fsg = FastSyncGroup(ec, [ai]) + #fsg = SyncGroup(ec, [ai]) + + fsg.start() + + for i in range(10): + await sleep(0.1) + fsg.properties.read() + print(i, ai.value, ec.ebpf.count, ec.ebpf.allcount) + +if __name__ == "__main__": + from asyncio import get_event_loop + loop = get_event_loop() + loop.run_until_complete(main()) diff --git a/xdp.py b/xdp.py index 0a9753a8e695a5433cc72a6d982e9c279c1077cc..138bc12e3ee12a2530b3d8f346eaa55904d0f987 100644 --- a/xdp.py +++ b/xdp.py @@ -121,7 +121,7 @@ class XDP(EBPF): async def attach(self, network): ifindex = if_nametoindex(network) - fd = self.load() + fd, _ = self.load(log_level=1) future = Future() transport, proto = await get_event_loop().create_datagram_endpoint( lambda: XDRFD(ifindex, fd, future),