Skip to content
Snippets Groups Projects
Commit 0a452dc5 authored by Martin Teichmann's avatar Martin Teichmann
Browse files

don't wait for a response before sending next

until now we only had one asynchronous packet on the wire.
This allows to send them as fast as possible, waiting for them
in bulk.
parent 5f40f0d7
No related branches found
No related tags found
No related merge requests found
......@@ -27,8 +27,8 @@ Low-level access to EtherCAT
this modules contains the code to actually talk to EtherCAT terminals.
"""
from asyncio import (
ensure_future, Event, Future, gather, get_event_loop, Protocol, Queue,
Lock)
CancelledError, ensure_future, Event, Future, gather, get_event_loop,
Protocol, Queue, Lock)
from contextlib import asynccontextmanager
from enum import Enum, IntEnum
from itertools import count
......@@ -305,26 +305,41 @@ class EtherCat(Protocol):
This method runs while we are connected, takes the datagrams
to be sent from a queue, packs them in a packet and ships them
out. """
packet = Packet()
dgrams = []
while True:
*dgram, future = await self.send_queue.get()
lastsize = packet.size
packet.append(*dgram)
dgrams.append((lastsize + 10, packet.size - 2, future))
if packet.full() or self.send_queue.empty():
data = await self.roundtrip_packet(packet)
for start, stop, future in dgrams:
wkc, = unpack("<H", data[stop:stop+2])
if wkc == 0:
future.set_exception(
EtherCatError("datagram was not processed"))
elif not future.done():
future.set_result(data[start:stop])
else:
logging.info("future already done, dropped datagram")
dgrams = []
packet = Packet()
try:
packet = Packet()
dgrams = []
while True:
*dgram, future = await self.send_queue.get()
lastsize = packet.size
packet.append(*dgram)
dgrams.append((lastsize + 10, packet.size - 2, future))
if packet.full() or self.send_queue.empty():
ensure_future(self.process_packet(dgrams, packet))
dgrams = []
packet = Packet()
except CancelledError:
raise
except Exception:
logging.exception("sendloop failed")
raise
async def process_packet(self, dgrams, packet):
try:
data = await self.roundtrip_packet(packet)
for start, stop, future in dgrams:
wkc, = unpack("<H", data[stop:stop+2])
if wkc == 0:
future.set_exception(
EtherCatError("datagram was not processed"))
elif not future.done():
future.set_result(data[start:stop])
else:
logging.info("future already done, dropped datagram")
except CancelledError:
raise
except Exception:
logging.exception("process_packet failed")
raise
async def roundtrip_packet(self, packet):
"""Send a packet and return the response
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment