syncer/service/admin/health.go (140 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package admin
import (
"context"
"errors"
"fmt"
"time"
"github.com/go-chassis/go-chassis/v2/server/restful"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/apache/servicecomb-service-center/client"
"github.com/apache/servicecomb-service-center/pkg/log"
pkgrpc "github.com/apache/servicecomb-service-center/pkg/rpc"
"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
syncerclient "github.com/apache/servicecomb-service-center/syncer/client"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/metrics"
"github.com/apache/servicecomb-service-center/syncer/rpc"
)
const (
scheme = "grpc"
serviceName = "syncer"
)
var (
peerInfos []*PeerInfo
ErrConfigIsEmpty = errors.New("sync config is empty")
)
type Resp struct {
Peers []*Peer `json:"peers"`
}
type PeerInfo struct {
Peer *Peer
ClientConn *grpc.ClientConn
}
type Peer struct {
Name string `json:"name"`
Kind string `json:"kind"`
Mode []string `json:"mode"`
Endpoints []string `json:"endpoints"`
Status string `json:"status"`
Token string `json:"-"`
}
func Init() {
cfg := config.GetConfig()
peerInfos = make([]*PeerInfo, 0, len(cfg.Sync.Peers))
for _, c := range cfg.Sync.Peers {
if len(c.Endpoints) <= 0 {
log.Warn("no endpoints of peer: " + c.Name)
continue
}
p := &Peer{
Name: c.Name,
Kind: c.Kind,
Mode: c.Mode,
Endpoints: c.Endpoints,
}
if config.GetConfig().Sync.RbacEnabled {
plainToken, err := cipher.Decrypt(c.Token)
if err != nil {
log.Error(fmt.Sprintf("decrypt token of peer %s failed, use original content", c.Name), err)
plainToken = c.Token
}
p.Token = plainToken
}
conn, err := newRPCConn(p.Endpoints)
if err != nil {
log.Error(fmt.Sprintf("new client failed for peer: %s", c.Name), err)
continue
}
peerInfos = append(peerInfos, &PeerInfo{Peer: p, ClientConn: conn})
}
}
func Health() (*Resp, error) {
if len(peerInfos) <= 0 {
return nil, ErrConfigIsEmpty
}
resp := &Resp{Peers: make([]*Peer, 0, len(peerInfos))}
for _, peerInfo := range peerInfos {
if len(peerInfo.Peer.Endpoints) <= 0 {
continue
}
status := getPeerStatus(peerInfo)
resp.Peers = append(resp.Peers, &Peer{
Name: peerInfo.Peer.Name,
Kind: peerInfo.Peer.Kind,
Mode: peerInfo.Peer.Mode,
Endpoints: peerInfo.Peer.Endpoints,
Status: status,
})
}
reportMetrics(resp.Peers)
return resp, nil
}
func getPeerStatus(peerInfo *PeerInfo) string {
if peerInfo.ClientConn == nil {
log.Warn("clientConn is nil")
return rpc.HealthStatusAbnormal
}
local := time.Now().UnixNano()
set := client.NewSet(peerInfo.ClientConn)
ctx := context.Background()
if config.GetConfig().Sync.RbacEnabled {
ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{
restful.HeaderAuth: "Bearer " + peerInfo.Peer.Token,
}))
}
reply, err := set.EventServiceClient.Health(ctx, &v1sync.HealthRequest{})
if err != nil || reply == nil {
log.Error("get peer health failed", err)
return rpc.HealthStatusAbnormal
}
reportClockDiff(peerInfo.Peer.Name, local, reply.LocalTimestamp)
return reply.Status
}
func reportClockDiff(peerName string, local int64, resp int64) {
curr := time.Now().UnixNano()
spent := (curr - local) / 2
metrics.PeersClockDiffSet(peerName, local+spent-resp)
}
func reportMetrics(peers []*Peer) {
var connectPeersCount int64
for _, peer := range peers {
if peer.Status == rpc.HealthStatusConnected {
connectPeersCount++
}
}
metrics.PeersTotalSet(int64(len(peers)))
metrics.ConnectedPeersSet(connectPeersCount)
}
func newRPCConn(endpoints []string) (*grpc.ClientConn, error) {
return pkgrpc.GetRoundRobinLbConn(&pkgrpc.Config{
Addrs: endpoints,
Scheme: scheme,
ServiceName: serviceName,
TLSConfig: syncerclient.RPClientConfig(),
})
}
func Peers() []*PeerInfo {
return peerInfos
}