core/log/metric/aggregator.go (119 lines of code) (raw):
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"sort"
"sync"
"time"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/stat"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
)
type metricTimeMap = map[uint64][]*base.MetricItem
const (
logFlushQueueSize = 60
)
var (
// The timestamp of the last fetching. The time unit is ms (= second * 1000).
lastFetchTime int64 = -1
writeChan = make(chan metricTimeMap, logFlushQueueSize)
stopChan = make(chan struct{})
metricWriter MetricLogWriter
initOnce sync.Once
)
func InitTask() (err error) {
initOnce.Do(func() {
flushInterval := config.MetricLogFlushIntervalSec()
if flushInterval == 0 {
return
}
metricWriter, err = NewDefaultMetricLogWriter(config.MetricLogSingleFileMaxSize(), config.MetricLogMaxFileAmount())
if err != nil {
logging.Error(err, "Failed to initialize the MetricLogWriter in aggregator.InitTask()")
return
}
// Schedule the log flushing task
go util.RunWithRecover(writeTaskLoop)
// Schedule the log aggregating task
ticker := util.NewTicker(time.Duration(flushInterval) * time.Second)
go util.RunWithRecover(func() {
for {
select {
case <-ticker.C():
doAggregate()
case <-stopChan:
ticker.Stop()
return
}
}
})
})
return err
}
func writeTaskLoop() {
for {
select {
case m := <-writeChan:
keys := make([]uint64, 0, len(m))
for t := range m {
keys = append(keys, t)
}
// Sort the time
sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
for _, t := range keys {
err := metricWriter.Write(t, m[t])
if err != nil {
logging.Error(err, "[MetricAggregatorTask] fail tp write metric in aggregator.writeTaskLoop()")
}
}
}
}
}
func doAggregate() {
curTime := util.CurrentTimeMillis()
curTime = curTime - curTime%1000
if int64(curTime) <= lastFetchTime {
return
}
maps := make(metricTimeMap)
cns := stat.ResourceNodeList()
for _, node := range cns {
metrics := currentMetricItems(node, curTime)
aggregateIntoMap(maps, metrics, node)
}
// Aggregate for inbound entrance node.
aggregateIntoMap(maps, currentMetricItems(stat.InboundNode(), curTime), stat.InboundNode())
// Update current last fetch timestamp.
lastFetchTime = int64(curTime)
if len(maps) > 0 {
writeChan <- maps
}
}
func aggregateIntoMap(mm metricTimeMap, metrics map[uint64]*base.MetricItem, node *stat.ResourceNode) {
for t, item := range metrics {
item.Resource = node.ResourceName()
item.Classification = int32(node.ResourceType())
items, exists := mm[t]
if exists {
mm[t] = append(items, item)
} else {
mm[t] = []*base.MetricItem{item}
}
}
}
func isActiveMetricItem(item *base.MetricItem) bool {
return item.PassQps > 0 || item.BlockQps > 0 || item.CompleteQps > 0 || item.ErrorQps > 0 ||
item.AvgRt > 0 || item.Concurrency > 0
}
func isItemTimestampInTime(ts uint64, currentSecStart uint64) bool {
// The bucket should satisfy: windowStart between [lastFetchTime, curStart)
return int64(ts) >= lastFetchTime && ts < currentSecStart
}
func currentMetricItems(retriever base.MetricItemRetriever, currentTime uint64) map[uint64]*base.MetricItem {
items := retriever.MetricsOnCondition(func(ts uint64) bool {
return isItemTimestampInTime(ts, currentTime)
})
m := make(map[uint64]*base.MetricItem, len(items))
for _, item := range items {
if !isActiveMetricItem(item) {
continue
}
m[item.Timestamp] = item
}
return m
}