internal/master/second_job_update_instance_status_handler.go (304 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package master
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"reflect"
"strconv"
"strings"
"time"
"github.com/asynkron/protoactor-go/actor"
"google.golang.org/protobuf/proto"
"github.com/alibaba/schedulerx-worker-go/config"
"github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/common"
"github.com/alibaba/schedulerx-worker-go/internal/constants"
"github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster"
"github.com/alibaba/schedulerx-worker-go/internal/openapi"
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
"github.com/alibaba/schedulerx-worker-go/internal/utils"
"github.com/alibaba/schedulerx-worker-go/logger"
"github.com/alibaba/schedulerx-worker-go/processor"
)
const missServerKillTime = 30 // seconds
type secondJobUpdateInstanceStatusHandler struct {
*baseUpdateInstanceStatusHandler
actorCtx actor.Context
secondProgressDetail *common.SecondProgressDetail
cycleStartTime int64
triggerTimes int32
triggerCus int32
enableCycleIntervalMs bool
recentProgressHistory *utils.LimitedQueue
}
func NewSecondJobUpdateInstanceStatusHandler(actorCtx actor.Context, taskMaster taskmaster.TaskMaster, jobInstanceInfo *common.JobInstanceInfo) UpdateInstanceStatusHandler {
h := &secondJobUpdateInstanceStatusHandler{
baseUpdateInstanceStatusHandler: NewBaseUpdateInstanceStatusHandler(jobInstanceInfo, taskMaster),
actorCtx: actorCtx,
cycleStartTime: time.Now().UnixMilli(),
enableCycleIntervalMs: config.GetWorkerConfig().IsSecondDelayIntervalMS(),
secondProgressDetail: common.NewSecondProgressDetail(),
recentProgressHistory: utils.NewLimitedQueue(10),
}
h.init()
return h
}
func (h *secondJobUpdateInstanceStatusHandler) init() {
GetTimeScheduler().init()
// job instance progress report thread.
go h.reportJobInstanceProgress()
}
func (h *secondJobUpdateInstanceStatusHandler) reportJobInstanceProgress() {
intervalTimes := 0
jobIdAndInstanceId := utils.GetUniqueIdWithoutTaskId(h.jobInstanceInfo.GetJobId(), h.jobInstanceInfo.GetJobInstanceId())
for !h.taskMaster.IsKilled() {
time.Sleep(1 * time.Second)
intervalTimes++
if intervalTimes > 10 {
progress, err := h.getJobInstanceProgress()
if err != nil {
logger.Errorf("report status failed due to getJobInstanceProgress failed, err=%v, jobIdAndInstanceId=%v.", err, jobIdAndInstanceId)
continue
}
req := &schedulerx.WorkerReportJobInstanceProgressRequest{
JobId: proto.Int64(h.jobInstanceInfo.GetJobId()),
JobInstanceId: proto.Int64(h.jobInstanceInfo.GetJobInstanceId()),
Progress: proto.String(progress),
TriggerTimes: proto.Int32(h.triggerTimes),
}
// FIXME ServerDiscovery serverDiscovery = ServerDiscoveryFactory.GetDiscovery(jobInstanceInfo.GetGroupId());
actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: req,
}
intervalTimes = 0
h.triggerTimes = 0
h.triggerCus = 0
}
if err := h.need2KillSelf(); err != nil {
logger.Errorf("report status error, err=%v, jobIdAndInstanceId=%v.", err, jobIdAndInstanceId)
}
}
}
// Kill self is required if any of the following conditions are met:
// 1. Lost contact with the server for more than 30 seconds
// 2. The grid task has no available worker
func (h *secondJobUpdateInstanceStatusHandler) need2KillSelf() error {
if !h.taskMaster.IsInited() {
return nil
}
jobIdAndInstanceId := utils.GetUniqueIdWithoutTaskId(h.jobInstanceInfo.GetJobId(), h.jobInstanceInfo.GetJobInstanceId())
if utils.GetHealthTimeHolder().IsServerHeartbeatHealthTimeout(missServerKillTime) {
if err := h.taskMaster.KillInstance("killed, because of worker missed active server."); err != nil {
return err
}
logger.Warnf("Missed server timeout=%vms, kill jobIdAndInstanceId=%v.", utils.GetHealthTimeHolder().GetServerHeartbeatMsInterval(), jobIdAndInstanceId)
}
if h.taskMaster.GetAliveCheckWorkerSet().Len() == 0 && len(h.taskMaster.GetJobInstanceInfo().GetAllWorkers()) == 0 {
if err := h.taskMaster.KillInstance("killed, because of missed useful worker list."); err != nil {
return err
}
logger.Warnf("Missed useful worker list, kill jobIdAndInstanceId=%v.", jobIdAndInstanceId)
}
return nil
}
func (h *secondJobUpdateInstanceStatusHandler) getJobInstanceProgress() (string, error) {
progress, err := h.getJobInstanceProgress()
if err != nil {
return "", err
}
h.secondProgressDetail.SetRunningProgress(progress)
h.secondProgressDetail.SetRunningStartTime(h.cycleStartTime)
h.secondProgressDetail.SetRecentProgressHistory(h.recentProgressHistory.Convert2Slice())
data, err := json.Marshal(h.secondProgressDetail)
if err != nil {
return "", err
}
h.secondProgressDetail.SetRunningProgress("")
return string(data), nil
}
// Get the latest worker list
func (h *secondJobUpdateInstanceStatusHandler) getAllWorkers(appGroupId, jobId int64) (*utils.Set, error) {
url := fmt.Sprintf("http://%s/app/getAllUsefulWorkerList.json?appGroupId=%d&jobId=%d", openapi.GetOpenAPIClient().Domain(), appGroupId, jobId)
resp, err := openapi.GetOpenAPIClient().HttpClient().Get(url)
if err != nil {
return nil, fmt.Errorf("HTTP request getAllWorkers failed, appGroupId:%d, err:%s", appGroupId, err.Error())
}
defer resp.Body.Close()
respData, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("Read http http response failed, err=%s ", err.Error())
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Read http post response failed, statusCode=%s ", resp.Status)
}
jsonResp := new(openapi.JSONResult)
if err := json.Unmarshal(respData, jsonResp); err != nil {
return nil, fmt.Errorf("Unmarshal response body failed, respData=%+v ", respData)
}
set := new(utils.Set)
if jsonResp.Data != nil && reflect.TypeOf(jsonResp.Data).Kind() == reflect.Slice {
sliceValue := reflect.ValueOf(jsonResp.Data)
for i := 0; i < sliceValue.Len(); i++ {
item := sliceValue.Index(i).Interface()
set.Add(item)
}
}
return set, nil
}
func (h *secondJobUpdateInstanceStatusHandler) Handle(serialNum int64, instanceStatus processor.InstanceStatus, result string) error {
cycleId := utils.GetUniqueId(h.jobInstanceInfo.GetJobId(), h.jobInstanceInfo.GetJobInstanceId(), h.taskMaster.GetSerialNum())
logger.Infof("cycleId: %v instanceStatus=%v cycle update status.", cycleId, instanceStatus)
// if init failed, instance status finished and master has not been killed, so second job kill self
if !h.taskMaster.IsInited() && instanceStatus.IsFinished() && !h.taskMaster.IsKilled() {
h.taskMaster.KillInstance("killed, because of worker init failed.")
logger.Warnf("Init failed need to kill self, cycleId=%v", cycleId)
return nil
}
// if instance is killed, need to report to server
// From a logical point of view, you only need to judge whether the master has been killed.
// There is no need to judge whether the result contains the specified information.
// However, history says that we should not delete it in the short term.
if h.taskMaster.IsKilled() &&
(strings.Contains(result, "killed") || strings.Contains(result, "Worker master shutdown")) {
h.taskMaster.SetInstanceStatus(processor.InstanceStatusFailed)
h.taskMaster.Stop()
h.masterPool.Remove(h.jobInstanceInfo.GetJobInstanceId())
if result != "killed from server" {
// There is no status feedback for the server-side forced stop operation.
req := &schedulerx.WorkerReportJobInstanceStatusRequest{
JobId: proto.Int64(h.jobInstanceInfo.GetJobId()),
JobInstanceId: proto.Int64(h.jobInstanceInfo.GetJobInstanceId()),
Status: proto.Int32(h.jobInstanceInfo.GetStatus()),
GroupId: proto.String(h.jobInstanceInfo.GetGroupId()),
}
if result != "" {
req.Result = proto.String(result)
}
progress, err := h.getJobInstanceProgress()
if err != nil {
return fmt.Errorf("cycleId: %v instanceStatus=%v cycle update status failed due to getJobInstanceProgress failed, err=%s", cycleId, instanceStatus, err.Error())
}
if progress != "" {
req.Progress = proto.String(progress)
}
actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: req,
}
logger.Infof("report cycleId=%v, status=%v to AtLeastDeliveryRoutingActor", cycleId, instanceStatus)
}
// If the instance terminates no further action is required
return nil
}
// If job instance is finished, remove from TaskMasterPool
if instanceStatus.IsFinished() {
h.triggerNextCycle(cycleId, serialNum, instanceStatus)
}
return nil
}
func (h *secondJobUpdateInstanceStatusHandler) triggerNextCycle(cycleId string, serialNum int64, instanceStatus processor.InstanceStatus) {
if serialNum != h.taskMaster.GetSerialNum() {
logger.Infof("triggerNextCycle=%v ignore, current serialNum=%v, but trigger serialNum=%v, status=%v, killed=%v.",
cycleId, h.taskMaster.GetSerialNum(), serialNum, instanceStatus, h.taskMaster.IsKilled())
return
}
postResult := h.taskMaster.PostFinish(h.jobInstanceInfo.GetJobInstanceId())
if postResult != nil {
logger.Infof("cycleId: %v cycle post status, result=%v.", cycleId, postResult.Status(), postResult.Result())
}
logger.Infof("cycleId: %v cycle end.", cycleId)
h.setHistory(h.taskMaster.GetSerialNum(), h.cycleStartTime, instanceStatus)
if !h.taskMaster.IsKilled() {
// TODO: 先清理这次迭代的资源,未来可以优化不需要每次清理
h.taskMaster.Clear(h.taskMaster)
// The current node is offline
// FIXME implement it
// if (!SchedulerxWorker.INITED) {
// LOGGER.info("Current worker is not running. To shutdown this master JobInstanceId={}", jobInstanceInfo.getJobInstanceId());
// taskMaster.killInstance(true,"Worker master shutdown.");
// return;
// }
// Calculate the next scheduling time and add it to the time scheduler
delayTime, err := strconv.Atoi(h.jobInstanceInfo.GetTimeExpression())
if err != nil {
h.taskMaster.KillInstance("killed, because of cycle submit failed.")
logger.Errorf("cycleId=%v cycle submit failed, need to kill, err=%s", cycleId, err.Error())
return
}
if !h.enableCycleIntervalMs {
delayTime = 1000 * delayTime
}
h.cycleStartTime = time.Now().UnixMilli() + int64(delayTime)
planEntry := NewTimePlanEntry(h.jobInstanceInfo.GetJobInstanceId(), h.cycleStartTime, h)
GetTimeScheduler().add(planEntry)
} else {
h.taskMaster.AcquireSerialNum()
}
}
func (h *secondJobUpdateInstanceStatusHandler) setHistory(serialNum int64, loopStartTime int64, status processor.InstanceStatus) {
if status == processor.InstanceStatusSucceed {
h.secondProgressDetail.GetTodayProgressCounter().IncrementOneSuccess()
} else {
h.secondProgressDetail.GetTodayProgressCounter().IncrementOneFailed()
}
if !h.taskMaster.IsKilled() {
h.secondProgressDetail.GetTodayProgressCounter().IncrementRunning()
h.secondProgressDetail.GetTodayProgressCounter().IncrementOneTotal()
}
// reset today progress counter
todayBeginTime, err := time.Parse(constants.TimeFormat, h.secondProgressDetail.GetTodayBeginTime())
if err != nil {
logger.Errorf("setHistory failed, getTodayBeginTime from secondProgressDetail failed, todayBeginTime=%s", todayBeginTime)
return
}
if time.Now().Day() != todayBeginTime.Day() {
h.secondProgressDetail.SetYesterdayProgressCounter(h.secondProgressDetail.GetTodayProgressCounter())
h.secondProgressDetail.SetTodayBeginTime(time.Now().Format(constants.TimeFormat))
h.secondProgressDetail.SetTodayProgressCounter(common.NewTaskProgressCounter(h.secondProgressDetail.GetTodayBeginTime()))
}
taskProgressMap := make(map[string]*common.TaskProgressCounter)
switch h.taskMaster.(type) {
case taskmaster.MapTaskMaster:
h.taskMaster.(*MapTaskMaster).GetTaskProgressMap().Range(func(key, value any) bool {
taskProgressMap[key.(string)] = value.(*common.TaskProgressCounter)
return true
})
case *BroadcastTaskMaster:
workerProgressCounterMap := h.taskMaster.(*BroadcastTaskMaster).GetWorkerProgressMap()
if utils.SyncMapLen(workerProgressCounterMap) == 0 {
return
}
workerProgressCounterMap.Range(func(key, value any) bool {
counter := value.(*common.WorkerProgressCounter)
newCounter := new(common.TaskProgressCounter)
newCounter.IncrementSuccess(counter.GetSuccess())
newCounter.IncrementFailed(counter.GetFailed())
newCounter.IncrementTotal(counter.GetTotal())
taskProgressMap[counter.GetWorkerAddr()] = newCounter
return true
})
case *StandaloneTaskMaster:
ipAndPort := h.taskMaster.(*StandaloneTaskMaster).GetCurrentSelection()
counter := common.NewTaskProgressCounter(ipAndPort)
counter.IncrementOneTotal()
if status == processor.InstanceStatusSucceed {
counter.IncrementOneSuccess()
} else {
counter.IncrementOneFailed()
}
taskProgressMap[ipAndPort] = counter
}
if len(taskProgressMap) == 0 {
return
}
history := common.NewProgressHistory()
history.SetSerialNum(serialNum)
history.SetStartTime(loopStartTime)
history.SetEndTime(time.Now().UnixMilli())
history.SetCostTime(history.EndTime() - history.StartTime())
history.SetTaskProgressMap(taskProgressMap)
if status == processor.InstanceStatusSucceed {
history.SetSuccess(true)
} else {
history.SetSuccess(false)
}
h.recentProgressHistory.Enqueue(history)
}
// Schedule a new iteration
func (h *secondJobUpdateInstanceStatusHandler) triggerNewCycle() {
cycleId := utils.GetUniqueId(h.jobInstanceInfo.GetJobId(), h.jobInstanceInfo.GetJobInstanceId(), h.taskMaster.GetSerialNum())
logger.Infof("cycleId: %v cycle begin.", cycleId)
h.cycleStartTime = time.Now().UnixMilli()
h.jobInstanceInfo.SetScheduleTime(time.Now())
// If existed invalid worker nodes, re-obtain the latest list
if h.taskMaster.ExistInvalidWorker() {
freeWorkers, err := h.getAllWorkers(h.jobInstanceInfo.GetAppGroupId(), h.jobInstanceInfo.GetJobId())
if err != nil {
h.taskMaster.KillInstance("killed, because of cycle submit failed.")
logger.Errorf("cycleId=%s cycle submit failed, err:%s, need to kill.", cycleId, err.Error())
}
h.taskMaster.RestJobInstanceWorkerList(freeWorkers)
}
h.taskMaster.SubmitInstance(context.Background(), h.jobInstanceInfo)
h.triggerTimes++
// If it is a standalone task, cu+1
// If it is a distributed task, cu+workers
if h.jobInstanceInfo.GetExecuteMode() == string(common.StandaloneExecuteMode) {
h.triggerCus++
} else {
h.triggerCus += int32(len(h.jobInstanceInfo.GetAllWorkers()))
}
}