diff --git a/ebpfcat/serial.py b/ebpfcat/serial.py index 2044fd590cc76a1e32d9e2c779ad43164d107639..c0af99c7ebdf59261a4bd0907178badcd08135d4 100644 --- a/ebpfcat/serial.py +++ b/ebpfcat/serial.py @@ -24,6 +24,7 @@ class Serial(Device): receive_request = TerminalVar() init_accept = TerminalVar() in_string = TerminalVar() + transmit_request = TerminalVar() receive_accept = TerminalVar() init_request = TerminalVar() @@ -40,78 +41,65 @@ class Serial(Device): self.out_string = channel.out_string self.buffer = Queue() - self.data_arrived = Event() def write(self, data): - self.buffer.put_nowait(data) + if data: + self.buffer.put_nowait(data) + + connected = None async def connect(self): - connected = Future() - self.task = ensure_future(self.run(connected)) + self.connected = Future() self.reader = StreamReader() - await connected + await self.connected return self.reader, self - async def run(self, connected): - while not self.init_accept: - self.init_request = True - await self.data_arrived.wait() - self.init_request = False - while self.init_accept: - await self.data_arrived.wait() - - connected.set_result(None) - - await gather(self.receive(), self.transmit()) - - async def receive(self): - ra = self.receive_accept - while True: - rr = self.receive_request - while rr == self.receive_request: - self.receive_accept = ra - await self.data_arrived.wait() - self.reader.feed_data(self.in_string) - ra = not ra - - async def transmit(self): - remainder = b"" - - async def inner(): - nonlocal remainder - s = remainder - size = len(remainder) - - while True: - remainder = s[22-size:] - yield s[:22-size] - size += len(s) - if (self.buffer.empty() and size > 0) or size > 22: - return - s = await self.buffer.get() - - while True: - ta = self.transmit_accept - tr = self.transmit_request - chunk = b"".join([s async for s in inner()]) - while ta == self.transmit_accept: - self.out_string = chunk - self.transmit_request = not tr - await self.data_arrived.wait() + last_transmit_request = False + last_receive_accept = False + current_transmit = None + remainder = b'' def update(self): - self.data_arrived.set() - self.data_arrived.clear() - - def get_chunk(self): - def inner(): - size = 0 - while size < 22 and len(self.buffer): - s = self.buffer.popleft() - if size + len(s) > 22: - self.buffer.appendleft(s[22-size:]) - yield s[:22-size] + self.init_request = False + if self.connected is None: + return + if not self.connected.done(): + if self.init_accept: + self.connected.set_result(None) + self.last_transmit_accept = self.transmit_accept + self.last_receive_request = self.receive_request + else: + self.init_request = True + return + + if self.last_receive_request != self.receive_request: + self.reader.feed_data(self.in_string) + self.last_receive_accept = not self.last_receive_accept + self.receive_accept = self.last_receive_accept + self.last_receive_request = self.receive_request + + if self.last_transmit_accept != self.transmit_accept: + self.current_transmit = None + self.last_transmit_accept = self.transmit_accept + + if self.current_transmit is None \ + and (self.remainder or not self.buffer.empty()): + n = len(self.remainder) + ret = [self.remainder] + while not self.buffer.empty(): + nxt = self.buffer.get_nowait() + n += len(nxt) + if n > 22: + ret.append(nxt[:22-n]) + self.remainder = nxt[22-n:] + break else: - yield - l += len(s) - return b"".join(inner()) + ret.append(nxt) + else: + self.remainder = b'' + self.current_transmit = b''.join(ret) + self.last_transmit_request = not self.last_transmit_request + + self.transmit_request = self.last_transmit_request + if self.current_transmit is not None: + self.out_string = self.current_transmit