in groups/arm/ggd/servo/servode.py [0:0]
def bulk_read(self, read_blocks):
"""
:param read_blocks: a list of dicts in the following format which
describe which registers to read from which servos.
{ "blocks": [
{ "servo_id": sid, "register": register },
{ "servo_id": sid, "register": register },
...
]
}
:return: new read_blocks dict now with values included.
{ "blocks": [
{ "servo_id": sid, "register": register,
"value": value, "ts": timestamp },
{ "servo_id": sid, "register": register,
"value": value, "ts": timestamp },
...
]
}
"""
if self.servo_type == AX_12_TYPE and \
self.protocol_version == PROTOCOL_V:
raise NotImplementedError("AX-12 Servos do not support bulk_read.")
response = {"blocks": []}
group_num = groupBulkRead(self.port_num, self.protocol_version)
log.info("[bulk_read] read group_num:{0}".format(group_num))
for block in read_blocks['blocks']:
# loop through blocks to add each as a param to the bulk_read group
sid = block['servo_id']
register = block['register']
last_result = groupBulkReadAddParam(
group_num, sid,
dxl_control[register]['address'],
dxl_control[register]['comm_bytes']
)
if last_result != 1:
err = "[bulk_read] add read param fail on servo_id:{0}".format(
sid
)
log.error(err)
raise IOError(err)
groupBulkReadTxRxPacket(group_num)
ts = datetime.datetime.now().isoformat()
last_result = getLastTxRxResult(self.port_num,
self.protocol_version)
if last_result != COMM_SUCCESS:
printTxRxResult(self.protocol_version, last_result)
for block in read_blocks['blocks']:
# loop through each block to see if the bulk result is available
sid = block['servo_id']
register = block['register']
last_result = groupBulkReadIsAvailable(
group_num, sid,
dxl_control[register]['address'],
dxl_control[register]['comm_bytes']
)
if last_result != 1:
err = "[bulk_read] bulk_read not available servo_id:{0}".format(
sid
)
log.error(err)
raise IOError(err)
blocks = list()
for block in read_blocks['blocks']:
sid = block['servo_id']
register = block['register']
val = groupBulkReadGetData(
group_num, sid,
dxl_control[register]['address'],
dxl_control[register]['comm_bytes']
)
blocks.append({
"servo_id": sid, "register": register,
"value": val, "ts": ts
})
response['blocks'] = blocks
return response