Skip to content
Snippets Groups Projects

General overhaul

Merged Martin Teichmann requested to merge docs into master
1 file
+ 5
5
Compare changes
  • Side-by-side
  • Inline
+ 142
65
@@ -26,34 +26,41 @@ class ECCmd(Enum):
FRMW = 14 # Configured Read Multiple Write
class ECDatatype(Enum):
BOOLEAN = 0x1
INTEGER8 = 0x2
INTEGER16 = 0x3
INTEGER32 = 0x4
UNSIGNED8 = 0x5
UNSIGNED16 = 0x6
UNSIGNED32 = 0x7
REAL32 = 0x8
VISIBLE_STRING = 0x9
OCTET_STRING = 0xA
UNICODE_STRING = 0xB
TIME_OF_DAY = 0xC
TIME_DIFFERENCE = 0xD
DOMAIN = 0xF
INTEGER24 = 0x10
REAL64 = 0x11
INTEGER64 = 0x15
UNSIGNED24 = 0x16
UNSIGNED64 = 0x1B
BIT1 = 0x30
BIT2 = 0x31
BIT3 = 0x32
BIT4 = 0x33
BIT5 = 0x34
BIT6 = 0x35
BIT7 = 0x36
BIT8 = 0x37
class ECDataType(Enum):
def __new__(cls, value, fmt):
obj = object.__new__(cls)
obj._value_ = value
obj.fmt = fmt
return obj
BOOLEAN = 0x1, "?"
INTEGER8 = 0x2, "b"
INTEGER16 = 0x3, "h"
INTEGER32 = 0x4, "i"
UNSIGNED8 = 0x5, "B"
UNSIGNED16 = 0x6, "H"
UNSIGNED32 = 0x7, "I"
REAL32 = 0x8, "f"
VISIBLE_STRING = 0x9, None
OCTET_STRING = 0xA, None
UNICODE_STRING = 0xB, None
TIME_OF_DAY = 0xC, "I"
TIME_DIFFERENCE = 0xD, "i"
DOMAIN = 0xF, "i"
INTEGER24 = 0x10, "i"
REAL64 = 0x11, "d"
INTEGER64 = 0x15, "q"
UNSIGNED24 = 0x16, "i"
UNSIGNED64 = 0x1B, "Q"
BIT1 = 0x30, "B"
BIT2 = 0x31, "B"
BIT3 = 0x32, "B"
BIT4 = 0x33, "B"
BIT5 = 0x34, "B"
BIT6 = 0x35, "B"
BIT7 = 0x36, "B"
BIT8 = 0x37, "B"
UNKNOWN1 = 2048, None
UNKNOWN2 = 2049, None
class MBXType(Enum):
ERR = 0 # Error
@@ -74,15 +81,6 @@ class CoECmd(Enum):
RXPDO_RR = 7
SDOINFO = 8
class SDOCmd(Enum):
DOWN_INIT = 0x21
DOWN_EXP = 0x23
DOWN_INIT_CA = 0x31
UP_REQ = 0x40
UP_REQ_CA = 0x50
SEG_UP_REQ = 0x60
ABORT = 0x80
class ODCmd(Enum):
LIST_REQ = 1
LIST_RES = 2
@@ -102,11 +100,38 @@ class ODCmd(Enum):
class ObjectDescription:
pass
def __init__(self, terminal):
self.terminal = terminal
def __getitem__(self, idx):
return self.entries[idx]
class ObjectEntry:
pass
def __init__(self, desc):
self.desc = desc
async def read(self):
ret = await self.desc.terminal.sdo_read(self.desc.index, self.valueInfo)
if self.dataType in (ECDataType.VISIBLE_STRING,
ECDataType.UNICODE_STRING):
return ret.decode("utf8")
elif isinstance(self.dataType, int) or self.dataType.fmt is None:
return ret
else:
return unpack("<" + self.dataType.fmt, ret)[0]
async def write(self, data):
if self.dataType in (ECDataType.VISIBLE_STRING,
ECDataType.UNICODE_STRING):
d = data.encode("utf8")
elif isinstance(self.dataType, int) or self.dataType.fmt is None:
d = data
else:
d = pack("<" + self.dataType.fmt, data)
return await self.desc.terminal.sdo_write(d, self.desc.index,
self.valueInfo)
def datasize(args, data):
@@ -141,7 +166,7 @@ class Packet:
:type cmd: ECCmd
:param data: the data in the datagram
:param idx: the datagram index, unchanged by terminals
Depending on the command, one or two more parameters represent the
address, either terminal and offset for position or node addressing,
or one value for logical addressing."""
@@ -397,17 +422,16 @@ class Terminal:
ret, error = await self.ec.roundtrip(ECCmd.FPRD, self.position,
0x0130, "H2xH")
pos = order.index(ret)
s = 0x11
for state in order[pos:]:
s = 0x11
for state in order[pos+1:]:
await self.ec.roundtrip(ECCmd.FPWR, self.position,
0x0120, "H", state)
while s != state:
s, error = await self.ec.roundtrip(ECCmd.FPRD, self.position,
0x0130, "H2xH")
print('State', self.position, s, error)
if error != 0:
raise RuntimeError(f"AL register {error}")
async def get_error(self):
"""read the error register"""
return (await self.ec.roundtrip(ECCmd.FPRD, self.position,
@@ -542,36 +566,91 @@ class Terminal:
raise RuntimeError(f"expected {size} bytes, got {retsize}")
return b"".join(ret)
async def sdo_write(self, data, index, subindex=None):
if len(data) <= 4 and subindex is not None:
await self.mbx_send(
MBXType.COE, "HBHB4s", CoECmd.SDOREQ.value << 12,
ODCmd.DOWN_EXP.value | (((4 - len(data)) << 2) & 0xc),
index, subindex, data)
type, data = await self.mbx_recv()
if type is not MBXType.COE:
raise RuntimeError(f"expected CoE, got {type}")
coecmd, sdocmd, idx, subidx = unpack("<HBHB", data[:6])
if idx != index or subindex != subidx:
raise RuntimeError(f"requested index {index}, got {idx}")
if coecmd >> 12 != CoECmd.SDORES.value:
raise RuntimeError(f"expected CoE SDORES, got {coecmd>>12:x}")
else:
stop = min(len(data), self.mbx_out_sz - 16)
await self.mbx_send(
MBXType.COE, "HBHB4x", CoECmd.SDOREQ.value << 12,
ODCmd.DOWN_INIT_CA.value if subindex is None
else ODCmd.DOWN_INIT.value,
index, 1 if subindex is None else subindex,
data=data[:stop])
type, data = await self.mbx_recv()
if type is not MBXType.COE:
raise RuntimeError(f"expected CoE, got {type}")
coecmd, sdocmd, idx, subidx = unpack("<HBHB", data[:6])
if coecmd >> 12 != CoECmd.SDORES.value:
raise RuntimeError(f"expected CoE SDORES, got {coecmd>>12:x}")
if idx != index or subindex != subidx:
raise RuntimeError(f"requested index {index}, got {idx}")
toggle = 0
while stop < len(data):
start = stop
stop = min(len(data), start + self.mbx_out_sz - 9)
if stop == len(data):
if stop - start < 7:
cmd = 1 + (7-stop+start << 1)
d = data[start:stop] + b"\0" * (7 - stop + start)
else:
cmd = 1
d = data[start:stop]
await self.mbx_send(
MBXType.COE, "HBHB4x", CoECmd.SDOREQ.value << 12,
cmd + toggle, index,
1 if subindex is None else subindex, data=d)
type, data = await self.mbx_recv()
if type is not MBXType.COE:
raise RuntimeError(f"expected CoE, got {type}")
coecmd, sdocmd, idx, subidx = unpack("<HBHB", data[:6])
if coecmd >> 12 != CoECmd.SDORES.value:
raise RuntimeError(f"expected CoE SDORES")
if idx != index or subindex != subidx:
raise RuntimeError(f"requested index {index}")
toggle ^= 0x10
async def read_ODlist(self):
idxes = await self.coe_request(CoECmd.SDOINFO, ODCmd.LIST_REQ, "H", 1)
idxes = unpack("<" + "H" * int(len(idxes) // 2), idxes)
ret = []
ret = {}
for index in idxes:
data = await self.coe_request(CoECmd.SDOINFO, ODCmd.OD_REQ,
"H", index)
dtype, oc, ms = unpack("<HBB", data[:4])
dtype, ms, oc = unpack("<HBB", data[:4])
od = ObjectDescription()
od = ObjectDescription(self)
od.index = index
od.dataType = dtype # ECDataType(dtype)
od.maxSub = ms
od.name = data[4:].decode("utf8")
ret.append(od)
ret[od.index] = od
for od in ret:
for od in ret.values():
od.entries = {}
for i in range(od.maxSub):
try:
data = await self.coe_request(CoECmd.SDOINFO, ODCmd.OE_REQ,
"HBB", od.index, i, 7)
except RuntimeError:
# many OEs just do not have more description
continue
oe = ObjectEntry()
oe.valueInfo, oe.dataType, oe.bitLength, oe.objectAccess = \
for i in range(1 if od.maxSub > 0 else 0, od.maxSub + 1):
data = await self.coe_request(CoECmd.SDOINFO, ODCmd.OE_REQ,
"HBB", od.index, i, 7)
oe = ObjectEntry(od)
oe.valueInfo, dataType, oe.bitLength, oe.objectAccess = \
unpack("<HHHH", data[:8])
if dataType == 0:
continue
assert i == oe.valueInfo
oe.dataType = ECDataType(dataType)
oe.name = data[8:].decode("utf8")
od.entries[i] = oe
return ret
@@ -597,14 +676,12 @@ async def main():
await tout.to_operational(),
odlist = await tin.read_ODlist()
for o in odlist:
print(o.index, o.name)
print(hex(o.index), o.name, o.maxSub)
for i, p in o.entries.items():
print(" ", i, p.name, p.valueInfo, p.dataType, p.bitLength, p.objectAccess)
try:
sdo = await tin.sdo_read(o.index, i)
print(" ", sdo)
except RuntimeError as e:
print(" ", e)
print(" ", i, p.name, "|", p.dataType, p.bitLength, p.objectAccess)
#sdo = await tin.sdo_read(o.index, i)
sdo = await p.read()
print(" ", sdo)
print("tdigi")
print("bla", lookup_elem(map_fd, b"AAAA", 4))
await tdigi.to_operational(),
Loading