in datahub/client/consumer/consumer_heartbeat.py [0:0]
def __heartbeat_once(self):
if not self._closed:
release_shards = self._curr_shards
read_end_shards = list(self._sync_group_meta.read_end_shards)
try:
heartbeat_result = self._coordinator.meta_data.datahub_client.heart_beat(
self._coordinator.project_name,
self._coordinator.topic_name,
self._coordinator.sub_id,
self._consumer_id,
self._version_id,
release_shards,
read_end_shards
)
plan_version = heartbeat_result.plan_version
new_shards = heartbeat_result.shard_list
add_shards = [shard for shard in new_shards if shard not in self._curr_shards]
del_shards = [shard for shard in self._curr_shards if shard not in new_shards]
if len(add_shards) != 0 or len(del_shards) != 0:
self._logger.info("Consumer heartbeat with plan change. key:{}, version:{}, planVersion:{}, oldShards:{}, newShards:{}"
.format(self._coordinator.uniq_key, self._version_id, plan_version, self._curr_shards, new_shards))
self._coordinator.on_shard_change(add_shards, del_shards)
self._curr_shards = new_shards
self._sync_group_meta.on_heartbeat_done(new_shards)
self._logger.debug("Heartbeat success. key:{},version:{}, planVersion:{}, newShards:{}"
.format(self._coordinator.uniq_key, self._version_id, plan_version, new_shards))
except OffsetResetException as e:
self._logger.warning("Consumer heartbeat fail, offset reset. key:{}. {}".format(self._coordinator.uniq_key, e))
self._offset_reset = True
self._coordinator.on_offset_reset()
except DatahubException as e:
if "NoSuchSubscription" == e.error_code:
self._logger.warning("Consumer heartbeat fail, subscription deleted. key:{}. {}".format(self._coordinator.uniq_key, e))
self._coordinator.on_sub_deleted()
elif "NoSuchConsumer" == e.error_code:
self._logger.warning("Consumer heartbeat fail, consumer not in group. key:{}. {}".format(self._coordinator.uniq_key, e))
self._need_rejoin = True
else:
self._logger.warning("Consumer heartbeat fail in DatahubException. key:{}. {}".format(self._coordinator.uniq_key, e))
except Exception as e:
self._logger.warning("Consumer heartbeat fail. key:{}. {}".format(self._coordinator.uniq_key, e))
raise e