From ee21ca65c7a05756e0162e9c221458bd9a63e3b2 Mon Sep 17 00:00:00 2001 From: Martin Teichmann <martin.teichmann@xfel.eu> Date: Tue, 2 Jan 2024 11:17:00 +0000 Subject: [PATCH] don't wait for a response before sending next until now we only had one asynchronous packet on the wire. This allows to send them as fast as possible, waiting for them in bulk. --- ebpfcat/ethercat.py | 59 ++++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/ebpfcat/ethercat.py b/ebpfcat/ethercat.py index 397cd56..647e4e6 100644 --- a/ebpfcat/ethercat.py +++ b/ebpfcat/ethercat.py @@ -27,8 +27,8 @@ Low-level access to EtherCAT this modules contains the code to actually talk to EtherCAT terminals. """ from asyncio import ( - ensure_future, Event, Future, gather, get_event_loop, Protocol, Queue, - Lock) + CancelledError, ensure_future, Event, Future, gather, get_event_loop, + Protocol, Queue, Lock) from contextlib import asynccontextmanager from enum import Enum, IntEnum from itertools import count @@ -305,26 +305,41 @@ class EtherCat(Protocol): This method runs while we are connected, takes the datagrams to be sent from a queue, packs them in a packet and ships them out. """ - packet = Packet() - dgrams = [] - while True: - *dgram, future = await self.send_queue.get() - lastsize = packet.size - packet.append(*dgram) - dgrams.append((lastsize + 10, packet.size - 2, future)) - if packet.full() or self.send_queue.empty(): - data = await self.roundtrip_packet(packet) - for start, stop, future in dgrams: - wkc, = unpack("<H", data[stop:stop+2]) - if wkc == 0: - future.set_exception( - EtherCatError("datagram was not processed")) - elif not future.done(): - future.set_result(data[start:stop]) - else: - logging.info("future already done, dropped datagram") - dgrams = [] - packet = Packet() + try: + packet = Packet() + dgrams = [] + while True: + *dgram, future = await self.send_queue.get() + lastsize = packet.size + packet.append(*dgram) + dgrams.append((lastsize + 10, packet.size - 2, future)) + if packet.full() or self.send_queue.empty(): + ensure_future(self.process_packet(dgrams, packet)) + dgrams = [] + packet = Packet() + except CancelledError: + raise + except Exception: + logging.exception("sendloop failed") + raise + + async def process_packet(self, dgrams, packet): + try: + data = await self.roundtrip_packet(packet) + for start, stop, future in dgrams: + wkc, = unpack("<H", data[stop:stop+2]) + if wkc == 0: + future.set_exception( + EtherCatError("datagram was not processed")) + elif not future.done(): + future.set_result(data[start:stop]) + else: + logging.info("future already done, dropped datagram") + except CancelledError: + raise + except Exception: + logging.exception("process_packet failed") + raise async def roundtrip_packet(self, packet): """Send a packet and return the response -- GitLab