in consumer/push_consumer.go [442:492]
func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo {
info := internal.NewConsumerRunningInfo()
pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
topic := key.(string)
info.SubscriptionData[value.(*internal.SubscriptionData)] = true
status := internal.ConsumeStatus{
PullRT: pc.stat.getPullRT(pc.consumerGroup, topic).avgpt,
PullTPS: pc.stat.getPullTPS(pc.consumerGroup, topic).tps,
ConsumeRT: pc.stat.getConsumeRT(pc.consumerGroup, topic).avgpt,
ConsumeOKTPS: pc.stat.getConsumeOKTPS(pc.consumerGroup, topic).tps,
ConsumeFailedTPS: pc.stat.getConsumeFailedTPS(pc.consumerGroup, topic).tps,
ConsumeFailedMsgs: pc.stat.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum,
}
info.StatusTable[topic] = status
return true
})
pc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
pInfo := pq.currentInfo()
pInfo.CommitOffset, _ = pc.storage.readWithException(&mq, _ReadMemoryThenStore)
info.MQTable[mq] = pInfo
return true
})
if stack {
var buffer strings.Builder
err := pprof.Lookup("goroutine").WriteTo(&buffer, 2)
if err != nil {
rlog.Error("error when get stack ", map[string]interface{}{
"error": err,
})
} else {
info.JStack = buffer.String()
}
}
nsAddr := ""
for _, value := range pc.client.GetNameSrv().AddrList() {
nsAddr += fmt.Sprintf("%s;", value)
}
info.Properties[internal.PropNameServerAddr] = nsAddr
info.Properties[internal.PropConsumeType] = string(pc.cType)
info.Properties[internal.PropConsumeOrderly] = strconv.FormatBool(pc.consumeOrderly)
info.Properties[internal.PropThreadPoolCoreSize] = "-1"
info.Properties[internal.PropConsumerStartTimestamp] = strconv.FormatInt(pc.consumerStartTimestamp, 10)
return info
}