internal/remoting/heartbeat.go (154 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package remoting
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"runtime"
"strconv"
"syscall"
"time"
"github.com/asynkron/protoactor-go/actor"
"github.com/shirou/gopsutil/v4/disk"
"github.com/shirou/gopsutil/v4/load"
"github.com/shirou/gopsutil/v4/mem"
"google.golang.org/protobuf/proto"
"github.com/alibaba/schedulerx-worker-go/config"
"github.com/alibaba/schedulerx-worker-go/internal/discovery"
"github.com/alibaba/schedulerx-worker-go/internal/masterpool"
"github.com/alibaba/schedulerx-worker-go/internal/proto/akka"
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/codec"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/pool"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/trans"
"github.com/alibaba/schedulerx-worker-go/internal/utils"
"github.com/alibaba/schedulerx-worker-go/internal/version"
"github.com/alibaba/schedulerx-worker-go/logger"
)
var (
heartbeatInterval = 5 * time.Second
waitHeartbeatRespTimeout = 5 * time.Second
)
func KeepHeartbeat(ctx context.Context, actorSystem *actor.ActorSystem, appKey string, stopChan chan os.Signal) {
var (
taskMasterPool = masterpool.GetTaskMasterPool()
groupManager = discovery.GetGroupManager()
)
heartbeat := func(online bool) {
_, actorSystemPort, err := actorSystem.GetHostPort()
if err != nil {
logger.Errorf("Write heartbeat to remote failed due to get actorSystem port failed, err=%s", err.Error())
return
}
for groupId, appGroupId := range groupManager.GroupId2AppGroupIdMap() {
jobInstanceIds := taskMasterPool.GetInstanceIds(appGroupId)
heartbeatReq := genHeartBeatRequest(groupId, appGroupId, jobInstanceIds, actorSystemPort, online, appKey)
if err := sendHeartbeat(ctx, heartbeatReq); err != nil {
if errors.Is(err, syscall.EPIPE) || errors.Is(err, os.ErrDeadlineExceeded) {
pool.GetConnPool().ReconnectTrigger() <- struct{}{}
}
logger.Warnf("Write heartbeat to server failed, had already re-connect with server, reason=%s", err.Error())
continue
}
logger.Debugf("Write heartbeat to remote succeed.")
}
}
heartbeat(true)
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for {
select {
case <-stopChan:
heartbeat(false)
logger.Infof("Write shutdown heartbeat to remote succeed.")
return
case <-ticker.C:
heartbeat(true)
}
}
}
func sendHeartbeat(ctx context.Context, req *schedulerx.WorkerHeartBeatRequest) error {
conn, err := pool.GetConnPool().Get(ctx)
if err != nil {
return err
}
akkaMsg, err := codec.EncodeAkkaMessage(
req,
fmt.Sprintf("akka.tcp://server@%s/", conn.RemoteAddr().String()),
fmt.Sprintf("akka.tcp://%s@%s/temp/%s", utils.GetWorkerId(), conn.LocalAddr().String(), utils.GenPathTpl()),
"com.alibaba.schedulerx.protocol.Worker$WorkerHeartBeatRequest",
codec.WithMessageContainerSerializer(),
codec.WithSelectionEnvelopePattern([]*akka.Selection{
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("user"),
},
{
Type: akka.PatternType_CHILD_NAME.Enum(),
Matcher: proto.String("heartbeat"),
},
}))
if err != nil {
return err
}
return trans.WriteAkkaMsg(akkaMsg, conn)
}
func metricsJsonStr() string {
loadAvg, err := load.Avg()
if err != nil {
logger.Warnf("Failed to get system load average:" + err.Error())
return "{}"
}
cpus, _ := strconv.Atoi(os.Getenv("SIGMA_MAX_PROCESSORS_LIMIT"))
if cpus <= 0 {
cpus = runtime.NumCPU()
}
ms, err := mem.VirtualMemory()
if err != nil {
logger.Warnf("Failed to get system mem info:" + err.Error())
return "{}"
}
metricsJson := map[string]float64{
"cpuLoad1": loadAvg.Load1,
"cpuLoad5": loadAvg.Load5,
"cpuProcessors": float64(cpus),
"heap1Usage": ms.UsedPercent / 100,
"heap1Used": float64(ms.Used) / 1024 / 1024,
"heap5Usage": ms.UsedPercent / 100,
"heapMax": float64(ms.Available+ms.Used) / 1024 / 1024,
}
diskStat, err := disk.Usage("/")
if err != nil {
fmt.Println("Failed to get system disk usage info:" + err.Error())
} else {
diskUsed := diskStat.Used / 1024 / 1024
diskFree := diskStat.Free / 1024 / 1024
diskMax := float64(diskUsed + diskFree)
metricsJson["diskUsed"] = float64(diskUsed)
metricsJson["diskMax"] = diskMax
metricsJson["diskUsage"] = float64(diskUsed) / diskMax
}
ret, err := json.Marshal(metricsJson)
if err != nil {
logger.Warnf("Get metric json failed for heartbeat, err=%s", err.Error())
return "{}"
}
return string(ret)
}
func genHeartBeatRequest(groupId string, appGroupId int64, jobInstanceIds []int64, actorSystemPort int, online bool, appKey string) *schedulerx.WorkerHeartBeatRequest {
return &schedulerx.WorkerHeartBeatRequest{
GroupId: proto.String(groupId),
WorkerId: proto.String(utils.GetWorkerId()),
Version: proto.String(version.Version()),
MetricsJson: proto.String(metricsJsonStr()),
JobInstanceId: jobInstanceIds,
Starter: proto.String("go"),
AppGroupId: proto.Int64(appGroupId),
Source: proto.String("unknown"),
Label: proto.String(config.GetWorkerConfig().Label()),
Online: proto.Bool(online),
AppKey: proto.String(appKey),
RpcPort: proto.Int32(int32(actorSystemPort)),
}
}