Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • teichman/ebpfcat
1 result
Show changes
Commits on Source (2)
...@@ -467,7 +467,7 @@ class SterilePacket(Packet): ...@@ -467,7 +467,7 @@ class SterilePacket(Packet):
def append(self, cmd, *args, counter=1): def append(self, cmd, *args, counter=1):
super().append(cmd, *args) super().append(cmd, *args)
self.counters[self.size - 2] = counter self.counters[self.size - 2] = {counter}
def sterile(self, index): def sterile(self, index):
ret = bytearray(self.assemble(index)) ret = bytearray(self.assemble(index))
...@@ -560,6 +560,9 @@ class SyncGroupBase: ...@@ -560,6 +560,9 @@ class SyncGroupBase:
continue continue
data = self.update_devices(data) data = self.update_devices(data)
newtime = monotonic() newtime = monotonic()
if newtime - lasttime > self.cycletime:
logging.warning('cycletime exceeded (%f ms)',
(newtime - lasttime) * 1000)
await sleep(self.cycletime - (newtime - lasttime)) await sleep(self.cycletime - (newtime - lasttime))
lasttime = monotonic() lasttime = monotonic()
finally: finally:
...@@ -594,11 +597,12 @@ class SyncGroup(SyncGroupBase): ...@@ -594,11 +597,12 @@ class SyncGroup(SyncGroupBase):
def update_devices(self, data): def update_devices(self, data):
self.current_data = bytearray(data) self.current_data = bytearray(data)
for pos, count in self.packet.counters.items(): for pos, counts in self.packet.counters.items():
if data[pos] != count: if data[pos] not in counts:
logging.warning( logging.warning(
'EtherCAT datagram was processe %i times, should be %i', 'EtherCAT datagram processed %i times, should be in %s',
data[pos], count) data[pos], counts)
counts.add(data[pos])
self.current_data[pos] = 0 self.current_data[pos] = 0
for dev in self.devices: for dev in self.devices:
dev.update() dev.update()
......
...@@ -24,6 +24,7 @@ class Serial(Device): ...@@ -24,6 +24,7 @@ class Serial(Device):
receive_request = TerminalVar() receive_request = TerminalVar()
init_accept = TerminalVar() init_accept = TerminalVar()
in_string = TerminalVar() in_string = TerminalVar()
transmit_request = TerminalVar() transmit_request = TerminalVar()
receive_accept = TerminalVar() receive_accept = TerminalVar()
init_request = TerminalVar() init_request = TerminalVar()
...@@ -40,78 +41,65 @@ class Serial(Device): ...@@ -40,78 +41,65 @@ class Serial(Device):
self.out_string = channel.out_string self.out_string = channel.out_string
self.buffer = Queue() self.buffer = Queue()
self.data_arrived = Event()
def write(self, data): def write(self, data):
self.buffer.put_nowait(data) if data:
self.buffer.put_nowait(data)
connected = None
async def connect(self): async def connect(self):
connected = Future() self.connected = Future()
self.task = ensure_future(self.run(connected))
self.reader = StreamReader() self.reader = StreamReader()
await connected await self.connected
return self.reader, self return self.reader, self
async def run(self, connected): last_transmit_request = False
while not self.init_accept: last_receive_accept = False
self.init_request = True current_transmit = None
await self.data_arrived.wait() remainder = b''
self.init_request = False
while self.init_accept:
await self.data_arrived.wait()
connected.set_result(None)
await gather(self.receive(), self.transmit())
async def receive(self):
ra = self.receive_accept
while True:
rr = self.receive_request
while rr == self.receive_request:
self.receive_accept = ra
await self.data_arrived.wait()
self.reader.feed_data(self.in_string)
ra = not ra
async def transmit(self):
remainder = b""
async def inner():
nonlocal remainder
s = remainder
size = len(remainder)
while True:
remainder = s[22-size:]
yield s[:22-size]
size += len(s)
if (self.buffer.empty() and size > 0) or size > 22:
return
s = await self.buffer.get()
while True:
ta = self.transmit_accept
tr = self.transmit_request
chunk = b"".join([s async for s in inner()])
while ta == self.transmit_accept:
self.out_string = chunk
self.transmit_request = not tr
await self.data_arrived.wait()
def update(self): def update(self):
self.data_arrived.set() self.init_request = False
self.data_arrived.clear() if self.connected is None:
return
def get_chunk(self): if not self.connected.done():
def inner(): if self.init_accept:
size = 0 self.connected.set_result(None)
while size < 22 and len(self.buffer): self.last_transmit_accept = self.transmit_accept
s = self.buffer.popleft() self.last_receive_request = self.receive_request
if size + len(s) > 22: else:
self.buffer.appendleft(s[22-size:]) self.init_request = True
yield s[:22-size] return
if self.last_receive_request != self.receive_request:
self.reader.feed_data(self.in_string)
self.last_receive_accept = not self.last_receive_accept
self.receive_accept = self.last_receive_accept
self.last_receive_request = self.receive_request
if self.last_transmit_accept != self.transmit_accept:
self.current_transmit = None
self.last_transmit_accept = self.transmit_accept
if self.current_transmit is None \
and (self.remainder or not self.buffer.empty()):
n = len(self.remainder)
ret = [self.remainder]
while not self.buffer.empty():
nxt = self.buffer.get_nowait()
n += len(nxt)
if n > 22:
ret.append(nxt[:22-n])
self.remainder = nxt[22-n:]
break
else: else:
yield ret.append(nxt)
l += len(s) else:
return b"".join(inner()) self.remainder = b''
self.current_transmit = b''.join(ret)
self.last_transmit_request = not self.last_transmit_request
self.transmit_request = self.last_transmit_request
if self.current_transmit is not None:
self.out_string = self.current_transmit