datahub/client/consumer/consumer_coordinator.py (129 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from rwlock import RWLock
from datahub.exceptions import SubscriptionOfflineException, DatahubException, TimeoutException
from .sync_group_meta import SyncGroupMeta
from .consumer_heartbeat import ConsumerHeartbeat
from .offset_coordinator import OffsetCoordinator
from ..common.timer import Timer
from ..common.constant import Constant
class ConsumerCoordinator(OffsetCoordinator):
def __init__(self, project_name, topic_name, sub_id, consumer_config):
super(ConsumerCoordinator, self).__init__(project_name, topic_name, sub_id, consumer_config)
self._wr_lock = RWLock()
self._consumer_id = None
self._version_id = None
self._heart_beat = None
self._session_timeout = consumer_config.session_timeout
self._sync_group_meta = SyncGroupMeta()
self.__join_group_and_start_heartbeat()
def close(self):
super(ConsumerCoordinator, self).close()
with self._wr_lock.writer_lock:
self.__leave_group_and_stop_heartbeat()
self._logger.info("ConsumerCoordinator close success. key: {}".format(self._uniq_key))
def on_shard_change(self, add_shards, del_shards):
self._do_shard_change(add_shards, del_shards)
if self._offset_manager:
self._offset_manager.on_shard_release(del_shards)
if self._sync_group_meta:
self._sync_group_meta.on_shard_release(del_shards)
def on_shard_read_end(self, shard_ids):
super(ConsumerCoordinator, self).on_shard_read_end(shard_ids)
if self._sync_group_meta:
self._sync_group_meta.on_shard_read_end(shard_ids)
def on_offset_reset(self):
if self._offset_reset.compare_and_set(0, 1):
self._do_remove_all_shards()
if self._offset_manager:
self._offset_manager.on_offset_reset()
self._offset_reset.compare_and_set(1, 0)
def waiting_shard_assign(self):
if self._closed:
self._logger.warning("ConsumerCoordinator closed. key: {}".format(self._uniq_key))
raise DatahubException("ConsumerCoordinator closed")
with self._wr_lock.reader_lock:
return self._heart_beat is None or self._heart_beat.waiting_shard_assign()
def update_shard_info(self):
super(ConsumerCoordinator, self).update_shard_info()
with self._wr_lock.writer_lock:
if self._heart_beat is None or self._heart_beat.need_rejoin():
self.__leave_group_and_stop_heartbeat()
self.__join_group_and_start_heartbeat()
self.__sync_group()
def __join_group_and_start_heartbeat(self):
self.__join_group()
self.__start_heartbeat()
self._logger.info("Join group and start heartbeat success. key: {}".format(self.uniq_key))
def __leave_group_and_stop_heartbeat(self):
self.__leave_group()
self.__stop_heartbeat()
self._logger.info("Leave group and stop heartbeat success. key: {}".format(self.uniq_key))
def __sync_group(self):
if self._sync_group_meta and self._sync_group_meta.need_sync_group():
release = list(self._sync_group_meta.release_shards)
read_end = list(self._sync_group_meta.read_end_shards)
try:
self._meta_data.datahub_client.sync_group(
self._project_name,
self._topic_name,
self._sub_id,
self._consumer_id,
self._version_id,
release,
read_end
)
self._sync_group_meta.clear_shard_release()
self._logger.debug("SyncGroup success. key: {}, release: {}, read end: {}"
.format(self._uniq_key, release, read_end))
except DatahubException as e:
self._logger.warning("SyncGroup fail. key: {}, release: {}, read end: {}. {}"
.format(self._uniq_key, release, read_end, e))
raise e
def __join_group(self):
timer = Timer(Constant.MAX_JOIN_GROUP_TIMEOUT)
while not timer.is_expired():
try:
join_result = self._meta_data.datahub_client.join_group(
self._project_name,
self._topic_name,
self._sub_id,
self._session_timeout
)
self._consumer_id = join_result.consumer_id
self._version_id = join_result.version_id
self._session_timeout = join_result.session_timeout
self._gen_uniq_key(self._consumer_id)
self._logger.info("JoinGroup success. key: {}, consumer id: {}, version id: {}, session timeout: {}"
.format(self._uniq_key, self._consumer_id, self._version_id, self._session_timeout))
return
except SubscriptionOfflineException as e:
self._logger.warning("JoinGroup fail, subscription offline. key:{}. {}".format(self._uniq_key, e))
raise e
except DatahubException as e:
self._logger.warning("JoinGroup fail. retry again. key:{}. {}".format(self._uniq_key, e))
try:
timer.wait_expire(1)
except Exception as e:
raise e
raise TimeoutException("JoinGroup timeout. key: {}, elapsedMs: {}".format(self._uniq_key, timer.elapse()))
def __leave_group(self):
try:
self._meta_data.datahub_client.leave_group(
self._project_name,
self._topic_name,
self._sub_id,
self._consumer_id,
self._version_id
)
self._logger.info("LeaveGroup success. key:{}".format(self._uniq_key))
except DatahubException as e:
self._logger.warning("LeaveGroup fail. key:{}. {}".format(self._uniq_key, e))
def __start_heartbeat(self):
if not self._heart_beat:
self._heart_beat = ConsumerHeartbeat(self, self._sync_group_meta, self._consumer_id, self._version_id,
self._session_timeout / 1000)
self._logger.info("Start heartbeat success. key:{}".format(self._uniq_key))
def __stop_heartbeat(self):
if self._heart_beat:
self._heart_beat.close()
self._heart_beat = None
self._logger.info("Stop heartbeat success. key:{}".format(self._uniq_key))