def __sync_offsets()

in datahub/client/consumer/offset_manager.py [0:0]


    def __sync_offsets(self):
        for shard_id, request_queue in self._offset_request_queue_map.items():
            request = None
            while len(request_queue) > 0 and request_queue[0].is_ready():
                request = request_queue[0]
                request_queue.popleft()

            if request:
                meta = self._offset_meta_map.get(shard_id)
                if not meta:
                    self._logger.warning("OffsetMeta not found. key:{}, shard_id:{}".format(self._uniq_key, shard_id))
                    raise DatahubException("OffsetMeta not found")
                consume_offset = request.message_key.offset
                self._last_offset_map[shard_id] = OffsetWithBatchIndex(
                    consume_offset.sequence,
                    consume_offset.timestamp,
                    meta.version_id,
                    meta.session_id,
                    consume_offset.batch_index
                )
                self._logger.debug("Sync offset once success. key: {}, shard_id: {}".format(self._uniq_key, shard_id))
            else:
                if len(request_queue) > 0:      # 最先入队列的Request依然没有Ready
                    curr_timeout = int(time.time())
                    diff = curr_timeout - request_queue[0].timestamp
                    if diff > Constant.NOT_ACK_WARNING_TIMEOUT:
                        self._logger.warning("Record not ack for {} s. key:{}, shard_id:{}, currTs:{}, offset:{}"
                                             .format(diff, self._uniq_key, shard_id, curr_timeout, request_queue[0].message_key.to_string()))
                        if diff > Constant.NOT_ACK_WARNING_TIMEOUT * 10:
                            self._coordinator.on_offset_not_ack()