collector/aggregate/perf_client.go (179 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package aggregate
import (
"context"
"fmt"
"sync"
"time"
"github.com/apache/incubator-pegasus/go-client/idl/admin"
"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/session"
log "github.com/sirupsen/logrus"
batchErr "k8s.io/apimachinery/pkg/util/errors"
)
// PerfClient manages sessions to all replica nodes.
type PerfClient struct {
meta *session.MetaManager
nodes map[string]*PerfSession
}
// GetPartitionStats retrieves all the partition stats from replica nodes.
// NOTE: Only the primaries are counted.
func (m *PerfClient) GetPartitionStats() ([]*PartitionStats, error) {
m.updateNodes()
partitions, err := m.preparePrimariesStats()
if err != nil {
return nil, err
}
nodeStats, err := m.GetNodeStats("@")
if err != nil {
return nil, err
}
for _, n := range nodeStats {
for name, value := range n.Stats {
perfCounter := decodePartitionPerfCounter(name, value)
if perfCounter == nil {
continue
}
if !aggregatable(perfCounter) {
continue
}
part := partitions[perfCounter.gpid]
if part == nil || part.Addr != n.Addr {
// if this node is not the primary of this partition
continue
}
part.Stats[perfCounter.name] = perfCounter.value
}
}
var ret []*PartitionStats
for _, part := range partitions {
extendStats(&part.Stats)
ret = append(ret, part)
}
return ret, nil
}
// getPrimaries returns mapping of [partition -> primary address]
func (m *PerfClient) getPrimaries() (map[base.Gpid]string, error) {
tables, err := m.listTables()
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
result := make(map[base.Gpid]string)
var mu sync.Mutex
var funcs []func() error
for _, table := range tables {
tb := table
funcs = append(funcs, func() (subErr error) {
tableCfg, err := m.meta.QueryConfig(ctx, tb.AppName)
if err != nil {
return fmt.Errorf("failed on table(%s): %s", tb.AppName, err)
}
mu.Lock()
for _, p := range tableCfg.Partitions {
result[*p.Pid] = p.Primary.GetAddress()
}
mu.Unlock()
return nil
})
}
return result, batchErr.AggregateGoroutines(funcs...)
}
func (m *PerfClient) preparePrimariesStats() (map[base.Gpid]*PartitionStats, error) {
primaries, err := m.getPrimaries()
if err != nil {
return nil, err
}
partitions := make(map[base.Gpid]*PartitionStats)
for p, addr := range primaries {
partitions[p] = &PartitionStats{
Gpid: p,
Stats: make(map[string]float64),
Addr: addr,
}
}
return partitions, nil
}
// NodeStat contains the stats of a replica node.
type NodeStat struct {
// Address of the replica node.
Addr string
// perfCounter's name -> the value.
Stats map[string]float64
}
// GetNodeStats retrieves all the stats matched with `filter` from replica nodes.
func (m *PerfClient) GetNodeStats(filter string) ([]*NodeStat, error) {
m.updateNodes()
// concurrently send RPC for perf-counters.
var results []*NodeStat
var funcs []func() error
var mu sync.Mutex
for _, node := range m.nodes {
n := node
funcs = append(funcs, func() (subErr error) {
stat := &NodeStat{
Addr: n.Address,
Stats: make(map[string]float64),
}
perfCounters, err := n.GetPerfCounters(filter)
if err != nil {
return fmt.Errorf("failed on node(%s): %s", n.Address, err)
}
for _, p := range perfCounters {
stat.Stats[p.Name] = p.Value
}
mu.Lock()
results = append(results, stat)
defer mu.Unlock()
return nil
})
}
return results, batchErr.AggregateGoroutines(funcs...)
}
func (m *PerfClient) listNodes() ([]*admin.NodeInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
resp, err := m.meta.ListNodes(ctx, &admin.ListNodesRequest{
Status: admin.NodeStatus_NS_ALIVE,
})
if err != nil {
return nil, err
}
return resp.Infos, nil
}
func (m *PerfClient) listTables() ([]*admin.AppInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
resp, err := m.meta.ListApps(ctx, &admin.ListAppsRequest{
Status: admin.AppStatus_AS_AVAILABLE,
})
if err != nil {
return nil, err
}
return resp.Infos, nil
}
// updateNodes
func (m *PerfClient) updateNodes() {
nodeInfos, err := m.listNodes()
if err != nil {
log.Error("skip updating nodes due to list-nodes RPC failure: ", err)
return
}
newNodes := make(map[string]*PerfSession)
for _, n := range nodeInfos {
addr := n.Address.GetAddress()
node, found := m.nodes[addr]
if !found {
newNodes[addr] = NewPerfSession(addr)
} else {
newNodes[addr] = node
}
}
for n, client := range m.nodes {
// close the unused connections
if _, found := newNodes[n]; !found {
client.Close()
}
}
m.nodes = newNodes
}
// Close release the resources.
func (m *PerfClient) Close() {
m.meta.Close()
for _, n := range m.nodes {
n.Close()
}
}
// NewPerfClient returns an instance of PerfClient.
func NewPerfClient(metaAddrs []string) *PerfClient {
return &PerfClient{
meta: session.NewMetaManager(metaAddrs, session.NewNodeSession),
nodes: make(map[string]*PerfSession),
}
}