datahub/client/consumer/offset_coordinator.py (78 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import atomic from datahub.exceptions import DatahubException from .offset_manager import OffsetManager from ..common.offset_meta import ConsumeOffset from ..common.shard_coordinator import ShardCoordinator class OffsetCoordinator(ShardCoordinator): def __init__(self, project_name, topic_name, sub_id, consumer_config): super().__init__(project_name, topic_name, sub_id, consumer_config) self._sub_session_changed = False self._sub_offline = False self._sub_deleted = False self._offset_not_ack = False self._offset_reset = atomic.AtomicLong(0) self._auto_ack_offset = consumer_config.auto_ack_offset self._max_record_buffer_size = consumer_config.max_record_buffer_size self._fetch_limit = consumer_config.fetch_limit self._offset_manager = OffsetManager(self) def close(self): super().close() self._offset_manager.close() self._logger.info("OffsetCoordinator close success. key: {}".format(self._uniq_key)) def update_shard_info(self): if self._sub_deleted: raise DatahubException("Subscription has been deleted. key: {}".format(self._uniq_key)) if self._sub_session_changed: raise DatahubException("Subscription session has changed. key: {}".format(self._uniq_key)) if self._sub_offline: raise DatahubException("Subscription offline. key: {}".format(self._uniq_key)) if self._offset_not_ack: raise DatahubException("Offset has not been updated for a long time. key: {}".format(self._uniq_key)) super().update_shard_info() def on_shard_read_end(self, shard_ids): self._do_shard_change(None, shard_ids) def on_offset_reset(self): if self._offset_reset.compare_and_set(0, 1): if self.is_user_shard_assign(): self._do_shard_change(self._assign_shard_list, self._assign_shard_list) self._offset_reset.compare_and_set(1, 0) def waiting_shard_assign(self): return False def init_and_get_offset(self, shard_ids): client = self._meta_data.datahub_client try: init_result = client.init_and_get_subscription_offset(self._project_name, self._topic_name, self._sub_id, shard_ids) consume_offset_map = dict() for shard_id, offset in init_result.offsets.items(): consume_offset_map[shard_id] = ConsumeOffset( sequence=offset.sequence if offset.sequence < 0 else offset.sequence + 1, timestamp=offset.timestamp, batch_index=offset.batch_index, version_id=offset.version, session_id=offset.session_id ) self._logger.info("Init and get offset once success. key: {}, shard_id: {}, offset: {}".format(self._uniq_key, shard_id, offset)) self._offset_manager.set_offset_meta(consume_offset_map) return consume_offset_map except DatahubException as e: self._logger.warning("Init and get subscription offset fail. key: {}, {}".format(self._uniq_key, e)) raise e def send_record_offset(self, message_key): self._offset_manager.send_record_offset(message_key) def on_sub_offline(self): self._sub_offline = True def on_sub_session_changed(self): self._sub_session_changed = True def on_sub_deleted(self): self._sub_deleted = True def on_offset_not_ack(self): self._offset_not_ack = True @property def auto_ack_offset(self): return self._auto_ack_offset @property def fetch_limit(self): return self._fetch_limit @property def max_record_buffer_size(self): return self._max_record_buffer_size