def __heartbeat_once()

in datahub/client/consumer/consumer_heartbeat.py [0:0]


    def __heartbeat_once(self):
        if not self._closed:
            release_shards = self._curr_shards
            read_end_shards = list(self._sync_group_meta.read_end_shards)
            try:
                heartbeat_result = self._coordinator.meta_data.datahub_client.heart_beat(
                    self._coordinator.project_name,
                    self._coordinator.topic_name,
                    self._coordinator.sub_id,
                    self._consumer_id,
                    self._version_id,
                    release_shards,
                    read_end_shards
                )
                plan_version = heartbeat_result.plan_version
                new_shards = heartbeat_result.shard_list

                add_shards = [shard for shard in new_shards if shard not in self._curr_shards]
                del_shards = [shard for shard in self._curr_shards if shard not in new_shards]
                if len(add_shards) != 0 or len(del_shards) != 0:
                    self._logger.info("Consumer heartbeat with plan change. key:{}, version:{}, planVersion:{}, oldShards:{}, newShards:{}"
                                      .format(self._coordinator.uniq_key, self._version_id, plan_version, self._curr_shards, new_shards))
                    self._coordinator.on_shard_change(add_shards, del_shards)
                    self._curr_shards = new_shards
                    self._sync_group_meta.on_heartbeat_done(new_shards)
                self._logger.debug("Heartbeat success. key:{},version:{}, planVersion:{}, newShards:{}"
                                   .format(self._coordinator.uniq_key, self._version_id, plan_version, new_shards))
            except OffsetResetException as e:
                self._logger.warning("Consumer heartbeat fail, offset reset. key:{}. {}".format(self._coordinator.uniq_key, e))
                self._offset_reset = True
                self._coordinator.on_offset_reset()
            except DatahubException as e:
                if "NoSuchSubscription" == e.error_code:
                    self._logger.warning("Consumer heartbeat fail, subscription deleted. key:{}. {}".format(self._coordinator.uniq_key, e))
                    self._coordinator.on_sub_deleted()
                elif "NoSuchConsumer" == e.error_code:
                    self._logger.warning("Consumer heartbeat fail, consumer not in group. key:{}. {}".format(self._coordinator.uniq_key, e))
                    self._need_rejoin = True
                else:
                    self._logger.warning("Consumer heartbeat fail in DatahubException. key:{}. {}".format(self._coordinator.uniq_key, e))
            except Exception as e:
                self._logger.warning("Consumer heartbeat fail. key:{}. {}".format(self._coordinator.uniq_key, e))
                raise e