in datahub/client/consumer/offset_manager.py [0:0]
def __sync_offsets(self):
for shard_id, request_queue in self._offset_request_queue_map.items():
request = None
while len(request_queue) > 0 and request_queue[0].is_ready():
request = request_queue[0]
request_queue.popleft()
if request:
meta = self._offset_meta_map.get(shard_id)
if not meta:
self._logger.warning("OffsetMeta not found. key:{}, shard_id:{}".format(self._uniq_key, shard_id))
raise DatahubException("OffsetMeta not found")
consume_offset = request.message_key.offset
self._last_offset_map[shard_id] = OffsetWithBatchIndex(
consume_offset.sequence,
consume_offset.timestamp,
meta.version_id,
meta.session_id,
consume_offset.batch_index
)
self._logger.debug("Sync offset once success. key: {}, shard_id: {}".format(self._uniq_key, shard_id))
else:
if len(request_queue) > 0: # 最先入队列的Request依然没有Ready
curr_timeout = int(time.time())
diff = curr_timeout - request_queue[0].timestamp
if diff > Constant.NOT_ACK_WARNING_TIMEOUT:
self._logger.warning("Record not ack for {} s. key:{}, shard_id:{}, currTs:{}, offset:{}"
.format(diff, self._uniq_key, shard_id, curr_timeout, request_queue[0].message_key.to_string()))
if diff > Constant.NOT_ACK_WARNING_TIMEOUT * 10:
self._coordinator.on_offset_not_ack()