From ab4756dc8cd303dbb4ee6da145ee4bef02311389 Mon Sep 17 00:00:00 2001
From: Martin Teichmann <martin.teichmann@gmail.com>
Date: Sat, 4 Feb 2023 14:29:10 +0000
Subject: [PATCH] allow cyclic updates also for fast ethercat

even when using fast ethercat, some information may be perfectly
valid slow as well. This way we have the best of both worlds.
---
 ebpfcat/devices.py |   7 ++-
 ebpfcat/ebpfcat.py | 114 ++++++++++++++++++++++++++-------------------
 2 files changed, 72 insertions(+), 49 deletions(-)

diff --git a/ebpfcat/devices.py b/ebpfcat/devices.py
index c61fcb2..faec3e0 100644
--- a/ebpfcat/devices.py
+++ b/ebpfcat/devices.py
@@ -40,11 +40,16 @@ class AnalogInput(Device):
         self.data = data
 
     def program(self):
-        self.value = self.data
+        # it does not make much sense to copy data faster than
+        # we can process
+        return
 
     def update(self):
         self.value = self.data
 
+    def fast_update(self):
+        self.value = self.data
+
 
 class AnalogOutput(Device):
     """Generic analog output device
diff --git a/ebpfcat/ebpfcat.py b/ebpfcat/ebpfcat.py
index df21ca4..570f621 100644
--- a/ebpfcat/ebpfcat.py
+++ b/ebpfcat/ebpfcat.py
@@ -16,7 +16,7 @@
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 
 """The high-level API for EtherCAT loops"""
-from asyncio import ensure_future, gather, sleep
+from asyncio import ensure_future, gather, sleep, wait_for, TimeoutError
 from struct import pack, unpack, calcsize, pack_into, unpack_from
 from time import time
 from .arraymap import ArrayMap, ArrayGlobalVarDesc
@@ -201,13 +201,13 @@ class DeviceVar(ArrayGlobalVarDesc):
     def __get__(self, instance, owner):
         if instance is None:
             return self
-        elif instance.sync_group.current_data is None:
+        elif isinstance(instance.sync_group, FastSyncGroup):
             return super().__get__(instance, owner)
         else:
             return instance.__dict__.get(self.name, 0)
 
     def __set__(self, instance, value):
-        if instance.sync_group.current_data is None:
+        if isinstance(instance.sync_group, FastSyncGroup):
             super().__set__(instance, value)
         else:
             instance.__dict__[self.name] = value
@@ -226,6 +226,9 @@ class Device(SubProgram):
                 ret.add(pv.terminal)
         return ret
 
+    def fast_update(self):
+        pass
+
 
 class EBPFTerminal(Terminal):
     compatibility = None
@@ -272,35 +275,42 @@ class EtherXDP(XDP):
     license = "GPL"
 
     variables = ArrayMap()
+    dropcounter = variables.globalVar("I")
     counters = variables.globalVar("64I")
 
     rate = 0
 
     def program(self):
+        ETHERTYPE = 12
+        CMD0 = 16
+        IDX0 = 17
+        ADDR0 = 18
+
         with prandom(self.ebpf) & 0xffff < self.rate:
             self.dropcounter += 1
             self.ebpf.exit(XDPExitCode.DROP)
-        with self.packetSize > 24 as p, p.pH[12] == 0xA488, p.pB[16] == 0:
-            self.r3 = p.pI[18]
+        with self.packetSize > 24 as p, p.pH[ETHERTYPE] == 0xA488, \
+                p.pB[CMD0] == 0:
+            self.r3 = p.pI[ADDR0]  # use r3 for tail_call
             with self.counters.get_address(None, False, False) as (dst, _), \
                     self.r3 < FastEtherCat.MAX_PROGS:
-                self.mH[self.r[dst] + 4 * self.r3] += 1
-                p.pB[17] += 2
-                with p.pB[17] & 1 as is_regular:
-                    self.mB[self.r[dst] + 4 * self.r3 + 3] += 1
-                    self.mB[self.r[dst] + 4 * self.r3 + 2] = 0
-                with is_regular.Else():
-                    self.mB[self.r[dst] + 4 * self.r3 + 2] += 1
-                    self.mB[self.r[dst] + 4 * self.r3 + 3] = 0
-                    with self.mB[self.r[dst] + 4 * self.r3 + 2] > 3 as exceed:
-                        p.pB[17] += 1  # turn into regular package
-                    with exceed.Else():
+                self.r[dst] += 4 * self.r3
+                self.r4 = self.mB[self.r[dst]]
+                # we lost a packet
+                with p.pB[IDX0] == self.r4 as cond:
+                    self.mI[self.r[dst]] += 1 + (self.r4 & 1)
+                # normal case: two packets on the wire
+                with cond.Else(), ((p.pB[IDX0] + 1 & 0xff) == self.r4) \
+                                  | (p.pB[IDX0] == 0) as c2:
+                    self.mI[self.r[dst]] += 1
+                    with self.r4 & 1:  # last one was active
+                        p.pB[IDX0] = self.mB[self.r[dst]]
                         self.exit(XDPExitCode.TX)
-            self.r2 = self.get_fd(self.programs)
-            for i, o in enumerate(self.opcodes):
-                print(i, o)
-            self.call(FuncId.tail_call)
-
+                with c2.Else():
+                    self.exit(XDPExitCode.PASS)
+                p.pB[IDX0] = self.mB[self.r[dst]]
+                self.r2 = self.get_fd(self.programs)
+                self.call(FuncId.tail_call)
         self.exit(XDPExitCode.PASS)
 
 
@@ -330,31 +340,18 @@ class FastEtherCat(SimpleEtherCat):
         fd, _ = sg.load(log_level=1)
         update_elem(self.programs, pack("<I", index), pack("<I", fd), 0)
         self.sync_groups[index] = sg
-        sg.assembled = packet.assemble(index)
         return index
 
-    async def watchdog(self):
-        lastcounts = [0] * 64
-        while True:
-            t0 = time()
-            counts = self.ebpf.counters
-            for i, sg in self.sync_groups.items():
-                if ((counts[i] ^ lastcounts[i]) & 0xffff == 0
-                        or (counts[i] >> 24) > 3):
-                    self.send_packet(sg.assembled)
-                    print("sent", i)
-                lastcounts[i] = counts[i]
-            await sleep(0.001)
-
     async def connect(self):
         await super().connect()
         self.ebpf = EtherXDP()
         self.ebpf.programs = self.programs
         self.fd = await self.ebpf.attach(self.addr[0])
-        ensure_future(self.watchdog())
 
 
 class SyncGroupBase:
+    missed_counter = 0
+
     def __init__(self, ec, devices, **kwargs):
         super().__init__(**kwargs)
         self.ec = ec
@@ -368,23 +365,31 @@ class SyncGroupBase:
         self.terminals = {t: None for t in
                           sorted(terminals, key=lambda t: t.position)}
 
+    async def run(self):
+        data = self.asm_packet
+        while True:
+            self.ec.send_packet(data)
+            try:
+                data = await wait_for(self.ec.receive_index(self.packet_index),
+                                      timeout=0.01)
+            except TimeoutError:
+                self.missed_counter += 1
+                print("didn't receive in time", self.missed_counter)
+                continue
+            data = self.update_devices(data)
+            await sleep(0)
+
 
 class SyncGroup(SyncGroupBase):
     """A group of devices communicating at the same time"""
 
     packet_index = 1000
 
-    current_data = False  # None is used to indicate FastSyncGroup
-
-    async def run(self):
-        self.current_data = self.asm_packet
-        while True:
-            self.ec.send_packet(self.current_data)
-            data = await self.ec.receive_index(self.packet_index)
-            self.current_data = bytearray(data)
-            for dev in self.devices:
-                dev.update()
-            await sleep(0)
+    def update_devices(self, data):
+        self.current_data = bytearray(data)
+        for dev in self.devices:
+            dev.update()
+        return self.current_data
 
     def start(self):
         self.allocate()
@@ -422,9 +427,22 @@ class FastSyncGroup(SyncGroupBase, XDP):
                 print("JMP")
         self.exit(XDPExitCode.TX)
 
+    async def run(self):
+        self.ec.send_packet(self.asm_packet)
+        self.ec.send_packet(self.asm_packet)
+        await super().run()
+
+    def update_devices(self, data):
+        self.current_data = data
+        for dev in self.devices:
+            dev.fast_update()
+        return self.asm_packet
+
     def start(self):
         self.allocate()
-        self.ec.register_sync_group(self, self.packet)
+        self.packet_index = self.ec.register_sync_group(self, self.packet)
+        self.asm_packet = self.packet.assemble(self.packet_index)
+        ensure_future(self.run())
 
     def allocate(self):
         self.packet = Packet()
-- 
GitLab