Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • teichman/ebpfcat
1 result
Show changes
Commits on Source (2)
......@@ -467,7 +467,7 @@ class SterilePacket(Packet):
def append(self, cmd, *args, counter=1):
super().append(cmd, *args)
self.counters[self.size - 2] = counter
self.counters[self.size - 2] = {counter}
def sterile(self, index):
ret = bytearray(self.assemble(index))
......@@ -560,6 +560,9 @@ class SyncGroupBase:
continue
data = self.update_devices(data)
newtime = monotonic()
if newtime - lasttime > self.cycletime:
logging.warning('cycletime exceeded (%f ms)',
(newtime - lasttime) * 1000)
await sleep(self.cycletime - (newtime - lasttime))
lasttime = monotonic()
finally:
......@@ -594,11 +597,12 @@ class SyncGroup(SyncGroupBase):
def update_devices(self, data):
self.current_data = bytearray(data)
for pos, count in self.packet.counters.items():
if data[pos] != count:
for pos, counts in self.packet.counters.items():
if data[pos] not in counts:
logging.warning(
'EtherCAT datagram was processe %i times, should be %i',
data[pos], count)
'EtherCAT datagram processed %i times, should be in %s',
data[pos], counts)
counts.add(data[pos])
self.current_data[pos] = 0
for dev in self.devices:
dev.update()
......
......@@ -24,6 +24,7 @@ class Serial(Device):
receive_request = TerminalVar()
init_accept = TerminalVar()
in_string = TerminalVar()
transmit_request = TerminalVar()
receive_accept = TerminalVar()
init_request = TerminalVar()
......@@ -40,78 +41,65 @@ class Serial(Device):
self.out_string = channel.out_string
self.buffer = Queue()
self.data_arrived = Event()
def write(self, data):
self.buffer.put_nowait(data)
if data:
self.buffer.put_nowait(data)
connected = None
async def connect(self):
connected = Future()
self.task = ensure_future(self.run(connected))
self.connected = Future()
self.reader = StreamReader()
await connected
await self.connected
return self.reader, self
async def run(self, connected):
while not self.init_accept:
self.init_request = True
await self.data_arrived.wait()
self.init_request = False
while self.init_accept:
await self.data_arrived.wait()
connected.set_result(None)
await gather(self.receive(), self.transmit())
async def receive(self):
ra = self.receive_accept
while True:
rr = self.receive_request
while rr == self.receive_request:
self.receive_accept = ra
await self.data_arrived.wait()
self.reader.feed_data(self.in_string)
ra = not ra
async def transmit(self):
remainder = b""
async def inner():
nonlocal remainder
s = remainder
size = len(remainder)
while True:
remainder = s[22-size:]
yield s[:22-size]
size += len(s)
if (self.buffer.empty() and size > 0) or size > 22:
return
s = await self.buffer.get()
while True:
ta = self.transmit_accept
tr = self.transmit_request
chunk = b"".join([s async for s in inner()])
while ta == self.transmit_accept:
self.out_string = chunk
self.transmit_request = not tr
await self.data_arrived.wait()
last_transmit_request = False
last_receive_accept = False
current_transmit = None
remainder = b''
def update(self):
self.data_arrived.set()
self.data_arrived.clear()
def get_chunk(self):
def inner():
size = 0
while size < 22 and len(self.buffer):
s = self.buffer.popleft()
if size + len(s) > 22:
self.buffer.appendleft(s[22-size:])
yield s[:22-size]
self.init_request = False
if self.connected is None:
return
if not self.connected.done():
if self.init_accept:
self.connected.set_result(None)
self.last_transmit_accept = self.transmit_accept
self.last_receive_request = self.receive_request
else:
self.init_request = True
return
if self.last_receive_request != self.receive_request:
self.reader.feed_data(self.in_string)
self.last_receive_accept = not self.last_receive_accept
self.receive_accept = self.last_receive_accept
self.last_receive_request = self.receive_request
if self.last_transmit_accept != self.transmit_accept:
self.current_transmit = None
self.last_transmit_accept = self.transmit_accept
if self.current_transmit is None \
and (self.remainder or not self.buffer.empty()):
n = len(self.remainder)
ret = [self.remainder]
while not self.buffer.empty():
nxt = self.buffer.get_nowait()
n += len(nxt)
if n > 22:
ret.append(nxt[:22-n])
self.remainder = nxt[22-n:]
break
else:
yield
l += len(s)
return b"".join(inner())
ret.append(nxt)
else:
self.remainder = b''
self.current_transmit = b''.join(ret)
self.last_transmit_request = not self.last_transmit_request
self.transmit_request = self.last_transmit_request
if self.current_transmit is not None:
self.out_string = self.current_transmit