in datahub/client/consumer/offset_coordinator.py [0:0]
def init_and_get_offset(self, shard_ids):
client = self._meta_data.datahub_client
try:
init_result = client.init_and_get_subscription_offset(self._project_name, self._topic_name, self._sub_id, shard_ids)
consume_offset_map = dict()
for shard_id, offset in init_result.offsets.items():
consume_offset_map[shard_id] = ConsumeOffset(
sequence=offset.sequence if offset.sequence < 0 else offset.sequence + 1,
timestamp=offset.timestamp,
batch_index=offset.batch_index,
version_id=offset.version,
session_id=offset.session_id
)
self._logger.info("Init and get offset once success. key: {}, shard_id: {}, offset: {}".format(self._uniq_key, shard_id, offset))
self._offset_manager.set_offset_meta(consume_offset_map)
return consume_offset_map
except DatahubException as e:
self._logger.warning("Init and get subscription offset fail. key: {}, {}".format(self._uniq_key, e))
raise e