Skip to content
Snippets Groups Projects
Commit 46cccb83 authored by Martin Teichmann's avatar Martin Teichmann
Browse files

add proper command line tools

parent 4a2ff553
No related branches found
No related tags found
2 merge requests!8Draft: move the index field to the end of packet,!6support higher-level EtherCat features
......@@ -128,8 +128,13 @@ class ObjectDescription:
def __getitem__(self, idx):
return self.entries[idx]
def __repr__(self):
return " ".join(f"[{k:X}: {v}]" for k, v in self.entries.items())
class ObjectEntry:
name = None
def __init__(self, desc):
self.desc = desc
......@@ -155,6 +160,13 @@ class ObjectEntry:
return await self.desc.terminal.sdo_write(d, self.desc.index,
self.valueInfo)
def __repr__(self):
if self.name is None:
return "[unread ObjectEntry]"
return f'"{self.name}" {self.dataType}:{self.bitLength} ' \
f'{self.objectAccess:X}'
def datasize(args, data):
out = calcsize("<" + "".join(arg for arg in args if isinstance(arg, str)))
......@@ -221,11 +233,11 @@ class Packet:
pos = 14 + self.DATAGRAM_HEADER
ret = []
for cmd, bits, *dgram in self.data:
ret.append((data[pos-self.DATAGRAM_HEADER],
data[pos:pos+len(bits)],
ret.append(unpack("<Bxh6x", data[pos-self.DATAGRAM_HEADER:pos])
+ (data[pos:pos+len(bits)],
unpack("<H", data[pos+len(bits):pos+len(bits)+2])[0]))
pos += self.DATAGRAM_HEADER + self.DATAGRAM_TAIL
return ''.join(f"{i}: {c} {f} {d}\n" for i, (c, d, f) in enumerate(ret))
return ''.join(f"{i}: {c} {a} {f} {d}\n" for i, (c, a, d, f) in enumerate(ret))
def full(self):
"""Is the data limit reached?"""
......@@ -322,6 +334,7 @@ class EtherCat(Protocol):
out += b"\0" * data
elif data is not None:
out += data
assert isinstance(pos, int) and isinstance(offset, int)
self.send_queue.put_nowait((cmd, out, idx, pos, offset, future))
ret = await future
if data is None:
......@@ -333,6 +346,14 @@ class EtherCat(Protocol):
else:
return ret
async def count(self):
"""Count the number of terminals on the bus"""
p = Packet()
p.append(ECCmd.APRD, b"\0\0", 0, 0, 0x10)
ret = await self.roundtrip_packet(p)
no, = unpack("<h", ret[16:18]) # number of terminals
return no
def connection_made(self, transport):
"""start the send loop once the connection is made"""
transport.get_extra_info("socket").bind(self.addr)
......@@ -471,7 +492,7 @@ class Terminal:
start, *args, **kwargs))
async def write(self, start, *args, **kwargs):
"""write data from the terminal at offset `start`
"""write data to the terminal at offset `start`
see `EtherCat.roundtrip` for details on more parameters"""
return (await self.ec.roundtrip(ECCmd.FPWR, self.position,
......@@ -689,42 +710,3 @@ class Terminal:
oe.name = data[8:].decode("utf8")
od.entries[i] = oe
return ret
async def main():
from .ebpfcat import install_ebpf
from .bpf import lookup_elem
ec = await EtherCat("eth0")
map_fd = await install_ebpf("eth0")
tin = Terminal(ec)
tout = Terminal(ec)
tdigi = Terminal(ec)
await gather(
tin.initialize(-1, 5),
tout.initialize(-2, 6),
tdigi.initialize(0, 22),
)
print("tin")
await tin.to_operational(),
print("tout")
await tout.to_operational(),
odlist = await tin.read_ODlist()
for o in odlist:
print(hex(o.index), o.name, o.maxSub)
for i, p in o.entries.items():
print(" ", i, p.name, "|", p.dataType, p.bitLength, p.objectAccess)
#sdo = await tin.sdo_read(o.index, i)
sdo = await p.read()
print(" ", sdo)
print("tdigi")
print("bla", lookup_elem(map_fd, b"AAAA", 4))
await tdigi.to_operational(),
print(tout.eeprom[10])
print(await tout.write(0x1100, "HHHH", 10000, 20000, 30000, 40000))
print(await tin.read(0x1180, "HHHHHHHH"))
if __name__ == "__main__":
loop = get_event_loop()
loop.run_until_complete(main())
from argparse import ArgumentParser
import asyncio
from functools import wraps
from struct import unpack
import sys
from .ethercat import EtherCat, Terminal, ECCmd
def entrypoint(func):
@wraps(func)
def wrapper():
asyncio.run(func())
return wrapper
@entrypoint
async def scanbus():
ec = EtherCat(sys.argv[1])
await ec.connect()
no = await ec.count()
for i in range(no):
r, = await ec.roundtrip(ECCmd.APRD, -i, 0x10, "H", 44)
print(i, r)
@entrypoint
async def info():
parser = ArgumentParser(
prog = "ec-info",
description = "Retrieve information from an EtherCat bus")
parser.add_argument("interface")
parser.add_argument("-t", "--terminal", type=int)
parser.add_argument("-i", "--ids", action="store_true")
parser.add_argument("-n", "--names", action="store_true")
parser.add_argument("-s", "--sdo", action="store_true")
parser.add_argument("-v", "--values", action="store_true")
args = parser.parse_args()
ec = EtherCat(args.interface)
await ec.connect()
if args.terminal is None:
terminals = range(await ec.count())
else:
# print(await ec.roundtrip(ECCmd.FPRW, 7, 0x10, "H", 0))
terminals = [args.terminal]
terms = [Terminal() for t in terminals]
for t in terms:
t.ec = ec
await asyncio.gather(*(t.initialize(-i, i + 7)
for i, t in zip(terminals, terms)))
for i, t in enumerate(terms):
print(f"terminal no {i}")
if args.ids:
print(f"{t.vendorId:X}:{t.productCode:X} "
f"revision {t.revisionNo:X} serial {t.serialNo}")
if args.names:
infos = t.eeprom[10]
i = 1
while i < len(infos):
print(infos[i+1 : i+infos[i]+1].decode("ascii"))
i += infos[i] + 1
if args.sdo:
await t.to_operational()
ret = await t.read_ODlist()
for k, v in ret.items():
print(f"{k:X}:")
for kk, vv in v.entries.items():
print(f" {kk:X}: {vv}")
if args.values:
r = await vv.read()
if isinstance(r, int):
print(f" {r:10} {r:8X}")
else:
print(f" {r}")
[project]
name = "ebpfcat"
version = "0.0.1"
dependencies = []
[project.scripts]
ec-scanbus = "ebpfcat.scripts:scanbus"
ec-info = "ebpfcat.scripts:info"
import setuptools
setuptools.setup(
packages=setuptools.find_packages()
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment