Skip to content
Snippets Groups Projects
Commit d74a2f29 authored by Martin Teichmann's avatar Martin Teichmann
Browse files

factor out a read_object_entry

this way we can read individual object entries without reading
the entire list.
parent 100a0f2d
No related branches found
No related tags found
No related merge requests found
......@@ -65,6 +65,7 @@ class ECDataType(Enum):
obj._value_ = value
obj.fmt = fmt
return obj
INVALID = 0, None
BOOLEAN = 0x1, "?"
INTEGER8 = 0x2, "b"
INTEGER16 = 0x3, "h"
......@@ -166,11 +167,12 @@ class ObjectDescription:
class ObjectEntry:
name = None
def __init__(self, desc):
self.desc = desc
def __init__(self, terminal, index):
self.terminal = terminal
self.index = index
async def read(self):
ret = await self.desc.terminal.sdo_read(self.desc.index, self.valueInfo)
ret = await self.terminal.sdo_read(self.index, self.valueInfo)
if self.dataType in (ECDataType.VISIBLE_STRING,
ECDataType.UNICODE_STRING):
return ret.decode("utf8")
......@@ -188,8 +190,7 @@ class ObjectEntry:
else:
d = pack("<" + self.dataType.fmt, data)
return await self.desc.terminal.sdo_write(d, self.desc.index,
self.valueInfo)
return await self.terminal.sdo_write(d, self.index, self.valueInfo)
def __repr__(self):
if self.name is None:
......@@ -871,6 +872,21 @@ class Terminal:
raise EtherCatError(f"requested index {index}")
toggle ^= 0x10
async def read_object_entry(self, index, subidx):
"""read a object entry from the CoE self description"""
data = await self.coe_request(CoECmd.SDOINFO, ODCmd.OE_REQ,
"HBB", index, subidx, 7)
oe = ObjectEntry(self, index)
oe.valueInfo, dataType, oe.bitLength, oe.objectAccess = \
unpack_from("<BxHHH", data)
assert subidx == oe.valueInfo
if dataType < 2048:
oe.dataType = ECDataType(dataType)
else:
oe.dataType = dataType
oe.name = data[8:].decode("utf8")
return oe
async def read_ODlist(self):
idxes = await self.coe_request(CoECmd.SDOINFO, ODCmd.LIST_REQ, "H", 1)
idxes = unpack("<" + "H" * int(len(idxes) // 2), idxes)
......@@ -893,22 +909,12 @@ class Terminal:
od.entries = {}
for i in range(1 if od.maxSub > 0 else 0, od.maxSub + 1):
try:
data = await self.coe_request(CoECmd.SDOINFO, ODCmd.OE_REQ,
"HBB", od.index, i, 7)
oe = await self.read_object_entry(od.index, i)
except EtherCatError as e:
logging.info(f"problems reading SDO {od.index:x}:{i:x}:")
continue
oe = ObjectEntry(od)
oe.valueInfo, dataType, oe.bitLength, oe.objectAccess = \
unpack("<BxHHH", data[:8])
if dataType == 0:
if oe.dataType is ECDataType.INVALID:
continue
assert i == oe.valueInfo
if dataType < 2048:
oe.dataType = ECDataType(dataType)
else:
oe.dataType = dataType
oe.name = data[8:].decode("utf8")
od.entries[i] = oe
return ret
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment