datahub/client/consumer/consumer_heartbeat.py (105 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import threading
from datahub.exceptions import DatahubException, OffsetResetException
from ..common.timer import Timer
from ..common.constant import Constant
class ConsumerHeartbeat:
def __init__(self, coordinator, sync_group_meta, consumer_id, version_id, session_timeout):
super().__init__()
self._logger = logging.getLogger(ConsumerHeartbeat.__name__)
self._closed = False
self._offset_reset = False
self._need_rejoin = False
self._coordinator = coordinator
self._sync_group_meta = sync_group_meta
self._consumer_id = consumer_id
self._version_id = version_id
self._session_timeout = session_timeout
self._curr_shards = []
self._timer = Timer(Constant.MIN_HEARTBEAT_INTERVAL_TIMEOUT)
self._heartbeat_timeout = session_timeout/6
self.__start()
def close(self):
self._closed = True
self._timer.notify_all()
self._heart_beat_task.join()
def waiting_shard_assign(self):
return not self._sync_group_meta.get_valid_shards()
def need_rejoin(self):
if self._need_rejoin:
self._need_rejoin = False
return True
if self._offset_reset:
self._offset_reset = False
return True
elapse = self._timer.elapse()
is_expire = elapse > self._session_timeout
if is_expire:
self._logger.warning("ConsumerHeartbeat timeout. key:{}, elapsedMs:{}, sessionTimeoutMs:{}"
.format(self._coordinator.uniq_key, elapse, self._session_timeout))
return is_expire
def __start(self):
self._heart_beat_task = threading.Thread(target=self.__keep_heartbeat)
self._heart_beat_task.setName("Heartbeat")
self._heart_beat_task.start()
def __keep_heartbeat(self):
self._logger.info("ConsumerHeartbeat task start. key: {}, session timeout: {}, heartbeat timeout: {}"
.format(self._coordinator.uniq_key, self._session_timeout, self._heartbeat_timeout))
while not self._closed:
if self._timer.is_expired():
self.__heartbeat_once()
if self._sync_group_meta.get_valid_shards():
self._timer.reset(self._heartbeat_timeout)
else:
self._logger.warning("Heartbeat has not assign consumer plan, please wait. key:{}".format(self._coordinator.uniq_key))
self._timer.reset(Constant.MIN_HEARTBEAT_INTERVAL_TIMEOUT)
else:
try:
self._timer.wait_expire()
except Exception as e:
self._logger.warning("ConsumerHeartbeat stop. {}".format(e))
break
self._logger.info("ConsumerHeartbeat task stop. key:{}, sessionTimeoutMs:{}, heartbeatTimeoutMs:{}"
.format(self._coordinator.uniq_key, self._session_timeout, self._timer.timeout))
def __heartbeat_once(self):
if not self._closed:
release_shards = self._curr_shards
read_end_shards = list(self._sync_group_meta.read_end_shards)
try:
heartbeat_result = self._coordinator.meta_data.datahub_client.heart_beat(
self._coordinator.project_name,
self._coordinator.topic_name,
self._coordinator.sub_id,
self._consumer_id,
self._version_id,
release_shards,
read_end_shards
)
plan_version = heartbeat_result.plan_version
new_shards = heartbeat_result.shard_list
add_shards = [shard for shard in new_shards if shard not in self._curr_shards]
del_shards = [shard for shard in self._curr_shards if shard not in new_shards]
if len(add_shards) != 0 or len(del_shards) != 0:
self._logger.info("Consumer heartbeat with plan change. key:{}, version:{}, planVersion:{}, oldShards:{}, newShards:{}"
.format(self._coordinator.uniq_key, self._version_id, plan_version, self._curr_shards, new_shards))
self._coordinator.on_shard_change(add_shards, del_shards)
self._curr_shards = new_shards
self._sync_group_meta.on_heartbeat_done(new_shards)
self._logger.debug("Heartbeat success. key:{},version:{}, planVersion:{}, newShards:{}"
.format(self._coordinator.uniq_key, self._version_id, plan_version, new_shards))
except OffsetResetException as e:
self._logger.warning("Consumer heartbeat fail, offset reset. key:{}. {}".format(self._coordinator.uniq_key, e))
self._offset_reset = True
self._coordinator.on_offset_reset()
except DatahubException as e:
if "NoSuchSubscription" == e.error_code:
self._logger.warning("Consumer heartbeat fail, subscription deleted. key:{}. {}".format(self._coordinator.uniq_key, e))
self._coordinator.on_sub_deleted()
elif "NoSuchConsumer" == e.error_code:
self._logger.warning("Consumer heartbeat fail, consumer not in group. key:{}. {}".format(self._coordinator.uniq_key, e))
self._need_rejoin = True
else:
self._logger.warning("Consumer heartbeat fail in DatahubException. key:{}. {}".format(self._coordinator.uniq_key, e))
except Exception as e:
self._logger.warning("Consumer heartbeat fail. key:{}. {}".format(self._coordinator.uniq_key, e))
raise e