internal/master/broadcast_task_master.go (450 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package master
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/asynkron/protoactor-go/actor"
"github.com/tidwall/gjson"
"google.golang.org/protobuf/proto"
"github.com/alibaba/schedulerx-worker-go/config"
"github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/common"
"github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster"
"github.com/alibaba/schedulerx-worker-go/internal/masterpool"
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
"github.com/alibaba/schedulerx-worker-go/internal/utils"
"github.com/alibaba/schedulerx-worker-go/logger"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
)
var _ taskmaster.TaskMaster = &BroadcastTaskMaster{}
type BroadcastTaskMaster struct {
*TaskMaster
worker2uniqueIdMap *sync.Map // Map<String, String>
workerProgressMap *sync.Map // Map<String, WorkerProgressCounter>
running bool
monitor bool
taskIdResultMap *sync.Map // Map<Long, String>
taskIdStatusMap *sync.Map // Map<Long, TaskStatus>
allWorkers []string
lock sync.RWMutex
}
func NewBroadcastTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) taskmaster.TaskMaster {
broadcastTaskMaster := &BroadcastTaskMaster{
worker2uniqueIdMap: new(sync.Map),
workerProgressMap: new(sync.Map),
running: false,
monitor: false,
taskIdResultMap: new(sync.Map),
taskIdStatusMap: new(sync.Map),
allWorkers: []string{},
}
statusHandler := NewCommonUpdateInstanceStatusHandler(actorCtx, broadcastTaskMaster, jobInstanceInfo)
if utils.IsSecondTypeJob(common.TimeType(jobInstanceInfo.GetTimeType())) {
statusHandler = NewSecondJobUpdateInstanceStatusHandler(actorCtx, broadcastTaskMaster, jobInstanceInfo)
}
broadcastTaskMaster.TaskMaster = NewTaskMaster(actorCtx, jobInstanceInfo, statusHandler)
return broadcastTaskMaster
}
func (m *BroadcastTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error {
if err := m.preProcess(jobInstanceInfo); err != nil {
logger.Errorf("BroadcastTaskMaster.preProcess failed, jobInstanceId=%d, err=%s", jobInstanceInfo.GetJobInstanceId(), err.Error())
if e := m.TaskMaster.updateNewInstanceStatus(m.GetSerialNum(), m.jobInstanceInfo.GetJobInstanceId(), processor.InstanceStatusFailed, "Preprocess failed. "+err.Error()); e != nil {
logger.Errorf("updateNewInstanceStatus failed after BroadcastTaskMaster.preProcess, jobInstanceId=%v, serialNum=%v, status=%v, err=%s", e.Error())
}
}
m.allWorkers = jobInstanceInfo.GetAllWorkers()
// The master node does not execute task
if !config.GetWorkerConfig().BroadcastMasterExecEnable() {
m.allWorkers = utils.RemoveSliceElem(jobInstanceInfo.GetAllWorkers(), m.GetLocalWorkerIdAddr())
}
// Set initialized state before sending. The second-level task is forced to stop the entire task instance
// due to the failure of a single machine during the first broadcast process.
m.Init()
// Firstly build all the status maps to prevent the statusMap from being judged as completed
// before the task is broadcast and sent during the broadcast processing.
taskIdMap := make(map[string]int64)
for _, workerIdAddr := range m.allWorkers {
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
taskId := m.AcquireTaskId()
uniqueId := utils.GetUniqueId(jobInstanceInfo.GetJobId(), jobInstanceInfo.GetJobInstanceId(), taskId)
m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusInit)
counter, _ := m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
counter.(*common.WorkerProgressCounter).IncrementTotal()
taskIdMap[workerIdAddr] = taskId
}
for _, workerIdAddr := range m.allWorkers {
m.dispatchTask(jobInstanceInfo, workerIdAddr, taskIdMap)
}
// After the task distribution is completed, the broadcast task detection thread is started to prevent misjudgment
// caused by judging the processing status of each node during the broadcast distribution process.
m.startMonitorThreads()
return nil
}
// dispatchTask distribute broadcast tasks
func (m *BroadcastTaskMaster) dispatchTask(jobInstanceInfo *common.JobInstanceInfo, workerIdAddr string, taskIdMap map[string]int64) {
var (
err error
workerAddr string
uniqueId string
taskId int64
workerId string
)
defer func() {
if err != nil {
logger.Errorf("broadcast taskMaster submitTask=%s to worker=%s error, errMsg=%s", uniqueId, workerAddr, err.Error())
m.existInvalidWorker = true
failedReq := &schedulerx.ContainerReportTaskStatusRequest{
JobId: proto.Int64(jobInstanceInfo.GetJobId()),
JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()),
TaskId: proto.Int64(taskId),
Status: proto.Int32(int32(taskstatus.TaskStatusFailed)),
WorkerId: proto.String(workerId),
WorkerAddr: proto.String(workerAddr),
SerialNum: proto.Int64(m.serialNum.Load()),
}
m.UpdateTaskStatus(failedReq)
}
}()
workerInfo := strings.Split(workerIdAddr, "@")
workerId = workerInfo[0]
workerAddr = actorcomm.GetRealWorkerAddr(workerIdAddr)
taskId = taskIdMap[workerIdAddr]
uniqueId = utils.GetUniqueId(jobInstanceInfo.GetJobId(), jobInstanceInfo.GetJobInstanceId(), taskId)
req, e := m.convert2StartContainerRequest(jobInstanceInfo, taskId, "", nil, false)
if e != nil {
err = fmt.Errorf("convert2StartContainerRequest failed, jobInstanceInfo=%+v, taskId=%+v, err=%s", jobInstanceInfo, taskId, e.Error())
return
}
req.ShardingNum = proto.Int32(int32(len(m.allWorkers)))
m.taskIdStatusMap.Store(taskId, taskstatus.TaskStatusInit)
maxRetryTimes := int(config.GetWorkerConfig().BroadcastDispatchRetryTimes())
for retryTimes := 0; retryTimes < maxRetryTimes; retryTimes++ {
response, e := m.actorContext.RequestFuture(actorcomm.GetContainerRouterPid(workerAddr), req, 5*time.Second).Result()
if e != nil {
err = fmt.Errorf("start container failed, worker=%v, uniqueId=%v, serialNum=%v, err=%v",
workerAddr, m.GetSerialNum(), uniqueId, e.Error())
continue
}
resp, ok := response.(*schedulerx.MasterStartContainerResponse)
if !ok {
err = fmt.Errorf("start container failed, worker=%v, uniqueId=%v, serialNum=%v, response is not MasterStartContainerResponse, resp=%+v",
workerAddr, m.GetSerialNum(), uniqueId, response)
continue
}
if resp.GetSuccess() {
m.worker2uniqueIdMap.Store(workerIdAddr, uniqueId)
logger.Infof("broadcast taskMaster init succeed, worker addr is %s, uniqueId=%s", workerIdAddr, uniqueId)
return
} else {
err = fmt.Errorf("broadcast submitTask=%v serialNum=%v to worker=%v failed, err=%v",
uniqueId, m.GetSerialNum(), workerAddr, resp.GetMessage())
time.Sleep(2 * time.Millisecond)
continue
}
}
return
}
func (m *BroadcastTaskMaster) KillInstance(reason string) error {
m.TaskMaster.KillInstance(reason)
for _, workerIdAddr := range m.allWorkers {
uniqueId, ok := m.worker2uniqueIdMap.Load(workerIdAddr)
// FIXME 这块没跟 java 对齐,但是因为有些 required 字段没赋值,会导致 actor 解码失败
if ok {
workerInfo := strings.Split(workerIdAddr, "@")
workerId := workerInfo[0]
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
tokens := strings.Split(uniqueId.(string), utils.SplitterToken)
taskId, _ := strconv.Atoi(tokens[2])
req := &schedulerx.ContainerReportTaskStatusRequest{
JobId: proto.Int64(m.jobInstanceInfo.GetJobId()),
JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()),
TaskId: proto.Int64(int64(taskId)),
Status: proto.Int32(int32(taskstatus.TaskStatusFailed)),
WorkerId: proto.String(workerId),
WorkerAddr: proto.String(workerAddr),
}
m.actorContext.Send(actorcomm.GetContainerRouterPid(workerAddr), req)
}
}
if err := m.TaskMaster.updateNewInstanceStatus(m.GetSerialNum(), m.jobInstanceInfo.GetJobInstanceId(), processor.InstanceStatusFailed, reason); err != nil {
return fmt.Errorf("updateNewInstanceStatus failed, err=%s", err.Error())
}
// Clear the taskStatusMap, and directly end the task.
m.taskStatusMap = sync.Map{}
return nil
}
func (m *BroadcastTaskMaster) DestroyContainerPool() {
for _, workerIdAddr := range m.allWorkers {
req := &schedulerx.MasterDestroyContainerPoolRequest{
JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()),
JobId: proto.Int64(m.jobInstanceInfo.GetJobId()),
WorkerIdAddr: proto.String(workerIdAddr),
SerialNum: proto.Int64(m.GetSerialNum()),
}
actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: req,
}
}
}
func (m *BroadcastTaskMaster) UpdateTaskStatus(request *schedulerx.ContainerReportTaskStatusRequest) error {
if request.GetSerialNum() != m.GetSerialNum() {
errMsg := fmt.Sprintf("ignore ContainerReportTaskStatusRequest, current serialNum=%v, but request serialNum=%v.", m.GetSerialNum(), request.GetSerialNum())
return fmt.Errorf(errMsg)
}
var (
jobId = request.GetJobId()
jobInstanceId = request.GetJobInstanceId()
taskId = request.GetTaskId()
workerAddr = request.GetWorkerAddr()
taskStatus = taskstatus.TaskStatus(request.GetStatus())
uniqueId = utils.GetUniqueId(jobId, jobInstanceId, taskId)
)
logger.Infof("update task status serialNum=%v, uniqueId=%v, status=%v, workerAddr=%v", request.GetSerialNum(), uniqueId, taskStatus.Descriptor(), workerAddr)
if val, ok := m.taskStatusMap.Load(uniqueId); ok {
if val.(taskstatus.TaskStatus) == taskStatus {
logger.Warnf("duplicated ContainerReportTaskStatusRequest, uniqueId=%v, taskStatus=%v", uniqueId, taskStatus)
} else {
if taskStatus == taskstatus.TaskStatusSucceed {
// If a machine is finished running, it is directly removed from taskStatusMap.
m.taskStatusMap.Delete(uniqueId)
} else {
// Update status to running
m.taskStatusMap.Store(uniqueId, taskStatus)
}
if _, ok := m.workerProgressMap.Load(workerAddr); !ok {
m.workerProgressMap.Store(workerAddr, common.NewWorkerProgressCounter(workerAddr))
}
counter, _ := m.workerProgressMap.Load(workerAddr)
workerProgressCounter := counter.(*common.WorkerProgressCounter)
switch taskStatus {
case taskstatus.TaskStatusRunning:
workerProgressCounter.IncrementRunning()
case taskstatus.TaskStatusSucceed:
workerProgressCounter.IncrementSuccess()
case taskstatus.TaskStatusFailed:
workerProgressCounter.IncrementOneFailed()
}
// update taskResultMap and taskStatusMap
m.taskIdResultMap.Store(request.GetTaskId(), request.GetResult())
m.taskIdStatusMap.Store(request.GetTaskId(), taskStatus)
m.updateNewInstanceStatus(request.GetSerialNum(), jobInstanceId, request.GetResult())
}
}
return nil
}
func (m *BroadcastTaskMaster) updateNewInstanceStatus(serialNum int64, jobInstanceId int64, result string) {
newStatus := processor.InstanceStatusSucceed
if m.IsKilled() {
newStatus = processor.InstanceStatusFailed
}
if utils.SyncMapLen(&m.taskStatusMap) > 0 {
if !m.IsJobInstanceFinished() {
newStatus = processor.InstanceStatusRunning
} else {
newStatus = processor.InstanceStatusSucceed
// as long as one subtask status is FAILED, then return FAILED
m.taskStatusMap.Range(func(_, status interface{}) bool {
if status.(taskstatus.TaskStatus) == taskstatus.TaskStatusFailed {
newStatus = processor.InstanceStatusFailed
return false
}
return true
})
}
}
logger.Infof("update serialNum=%v, jobInstanceId=%v status=%v", serialNum, jobInstanceId, newStatus.Descriptor())
m.TaskMaster.updateNewInstanceStatus(serialNum, jobInstanceId, newStatus, result)
}
func (m *BroadcastTaskMaster) GetJobInstanceProgress() (string, error) {
detail := common.NewMapTaskProgress()
counters := make([]*common.WorkerProgressCounter, 0, utils.SyncMapLen(m.workerProgressMap))
m.workerProgressMap.Range(func(_, val interface{}) bool {
counters = append(counters, val.(*common.WorkerProgressCounter))
return true
})
detail.SetWorkerProgress(counters)
data, err := json.Marshal(detail)
if err != nil {
return "", fmt.Errorf("Marshal workerProgressDetail failed, err=%s ", err.Error())
}
return string(data), nil
}
// startMonitorThreads turns on the execution status monitoring of this round.
// During the execution of second-level tasks, the monitor is used to control the status detection
// after each round of broadcast task distribution is completed to prevent abnormal task detection during the distribution process.
func (m *BroadcastTaskMaster) startMonitorThreads() {
m.setMonitor(true)
if m.running {
return
}
//jobIdAndInstanceId := fmt.Sprintf("%v_%v", m.jobInstanceInfo.GetJobId(), m.jobInstanceInfo.GetJobInstanceId())
// check if worker is alive
go m.checkWorkerAlive()
// report job instance progress
go m.reportJobInstanceProgress()
// check instance status
go m.checkInstanceStatus()
m.running = true
}
func (m *BroadcastTaskMaster) checkWorkerAlive() {
for !m.isInstanceStatusFinished() {
if !m.isMonitor() {
continue
}
for _, worker := range m.allWorkers {
m.aliveCheckWorkerSet.Add(worker)
}
for _, workerIdAddr := range m.aliveCheckWorkerSet.ToStringSlice() {
req := &schedulerx.MasterCheckWorkerAliveRequest{
JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()),
}
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
_, err := m.actorContext.RequestFuture(actorcomm.GetHeartbeatActorPid(workerAddr), req, 10*time.Second).Result()
if err != nil {
m.existInvalidWorker = true
uniqueId, ok := m.worker2uniqueIdMap.Load(workerIdAddr)
if ok {
workerInfo := strings.Split(workerIdAddr, "@")
workerId := workerInfo[0]
workerAddr = actorcomm.GetRealWorkerAddr(workerIdAddr)
tokens := strings.Split(uniqueId.(string), utils.SplitterToken)
jobId, _ := strconv.Atoi(tokens[0])
jobInstanceId, _ := strconv.Atoi(tokens[1])
taskId, _ := strconv.Atoi(tokens[2])
req := &schedulerx.ContainerReportTaskStatusRequest{
JobId: proto.Int64(int64(jobId)),
JobInstanceId: proto.Int64(int64(jobInstanceId)),
TaskId: proto.Int64(int64(taskId)),
Status: proto.Int32(int32(taskstatus.TaskStatusFailed)),
WorkerId: proto.String(workerId),
WorkerAddr: proto.String(workerAddr),
SerialNum: proto.Int64(m.GetSerialNum()),
}
if err := m.UpdateTaskStatus(req); err != nil {
logger.Warnf("worker=%v is down, set=%v to failed status error, err=%s", workerAddr, uniqueId, err.Error())
} else {
logger.Warnf("worker=%v is down, set=%v to failed", workerAddr, uniqueId)
}
} else {
logger.Errorf("can't found workerAddr of uniqueId=%v", uniqueId)
}
}
}
time.Sleep(10 * time.Second)
}
}
func (m *BroadcastTaskMaster) reportJobInstanceProgress() {
for !m.isInstanceStatusFinished() {
progress, err := m.GetJobInstanceProgress()
if err != nil {
logger.Errorf("reportJobInstanceProgress failed, err=%s", err.Error())
return
}
req := &schedulerx.WorkerReportJobInstanceProgressRequest{
JobId: proto.Int64(m.jobInstanceInfo.GetJobId()),
JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()),
Progress: proto.String(progress),
}
logger.Infof("BroadcastTaskMaster reportJobInstanceProgress, progress=%s", progress)
// Send to server by master
actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: req,
}
time.Sleep(5 * time.Second)
}
}
func (m *BroadcastTaskMaster) checkInstanceStatus() {
for !m.isInstanceStatusFinished() {
time.Sleep(5 * time.Second)
if !m.isMonitor() {
continue
}
if utils.SyncMapLen(&m.taskStatusMap) < 10 {
logger.Infof("taskStatusMap=%+v", m.taskStatusMap)
}
m.updateNewInstanceStatus(m.GetSerialNum(), m.jobInstanceInfo.GetJobInstanceId(), "")
}
}
func (m *BroadcastTaskMaster) GetWorkerProgressMap() *sync.Map {
return m.workerProgressMap
}
func (m *BroadcastTaskMaster) Clear(taskMaster taskmaster.TaskMaster) {
m.TaskMaster.Clear(taskMaster)
m.worker2uniqueIdMap = new(sync.Map)
m.workerProgressMap = new(sync.Map)
m.taskIdResultMap = new(sync.Map)
m.taskIdStatusMap = new(sync.Map)
m.monitor = false
}
func (m *BroadcastTaskMaster) preProcess(jobInstanceInfo *common.JobInstanceInfo) error {
jobCtx := m.convertJobInstance2JobContext(jobInstanceInfo)
jobName := gjson.Get(jobCtx.Content(), "jobName").String()
// Compatible with the existing Java language configuration mechanism
if jobCtx.JobType() == "java" {
jobName = gjson.Get(jobCtx.Content(), "className").String()
}
task, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName)
if !ok {
return fmt.Errorf("preProcess broadcast task=%s failed, because it's unregistered. ", jobName)
}
if p, ok := task.(processor.BroadcastProcessor); ok {
startTime := time.Now().UnixMilli()
if err := p.PreProcess(jobCtx); err != nil {
return fmt.Errorf("preProcess broadcast task=%s failed, jobInstanceId=%v, taskName=%s, serialNum=%v, err=%s ", jobName, jobInstanceInfo.GetJobInstanceId(), jobCtx.TaskName(), jobCtx.SerialNum(), err.Error())
}
logger.Infof("preProcess broadcast task=%s finished, jobInstanceId=%v, taskName=%s, cost=%vms", jobName, jobInstanceInfo.GetJobInstanceId(), jobCtx.TaskName(), time.Now().UnixMilli()-startTime)
}
return nil
}
func (m *BroadcastTaskMaster) CheckProcessor() {
// do nothing
}
func (m *BroadcastTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult {
jobCtx := m.convertJobInstance2JobContext(m.jobInstanceInfo)
defaultRet := processor.NewProcessResult(processor.WithSucceed())
jobName := gjson.Get(jobCtx.Content(), "jobName").String()
// Compatible with the existing Java language configuration mechanism
if jobCtx.JobType() == "java" {
jobName = gjson.Get(jobCtx.Content(), "className").String()
}
task, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName)
if !ok {
logger.Errorf("PostFinish broadcast task=%s failed, because it's unregistered. ", jobName)
return defaultRet
}
if p, ok := task.(processor.BroadcastProcessor); ok {
startTime := time.Now().UnixMilli()
result, err := p.PostProcess(jobCtx)
logger.Infof("PostFinish broadcast task=%s finished, jobInstanceId=%v, taskName=%s, cost=%vms", jobName, jobInstanceId, jobCtx.TaskName(), time.Now().UnixMilli()-startTime)
if err != nil {
logger.Errorf("PostFinish broadcast task=%s failed, jobInstanceId=%v, taskName=%s, serialNum=%v, err=%s ", jobName, jobInstanceId, jobCtx.TaskName(), jobCtx.SerialNum(), err.Error())
return defaultRet
}
return result
}
return defaultRet
}
func (m *BroadcastTaskMaster) convertJobInstance2JobContext(jobInstanceInfo *common.JobInstanceInfo) *jobcontext.JobContext {
jobCtx := new(jobcontext.JobContext)
jobCtx.SetJobId(jobInstanceInfo.GetJobId())
jobCtx.SetJobInstanceId(jobInstanceInfo.GetJobInstanceId())
jobCtx.SetJobType(jobInstanceInfo.GetJobType())
jobCtx.SetContent(jobInstanceInfo.GetContent())
jobCtx.SetScheduleTime(jobInstanceInfo.GetScheduleTime())
jobCtx.SetDataTime(jobInstanceInfo.GetDataTime())
jobCtx.SetJobParameters(jobInstanceInfo.GetParameters())
jobCtx.SetInstanceParameters(jobInstanceInfo.GetInstanceParameters())
jobCtx.SetUser(jobInstanceInfo.GetUser())
taskResults := make(map[int64]string)
m.taskIdResultMap.Range(func(key, value any) bool {
taskId := key.(int64)
taskResult := value.(string)
taskResults[taskId] = taskResult
return true
})
jobCtx.SetTaskResults(taskResults)
taskStatuses := make(map[int64]taskstatus.TaskStatus)
m.taskIdStatusMap.Range(func(key, value any) bool {
taskId := key.(int64)
taskStatus := value.(taskstatus.TaskStatus)
taskStatuses[taskId] = taskStatus
return true
})
jobCtx.SetTaskStatuses(taskStatuses)
jobCtx.SetSerialNum(m.GetSerialNum())
return jobCtx
}
func (m *BroadcastTaskMaster) isMonitor() bool {
m.lock.RLock()
defer m.lock.RUnlock()
return m.monitor
}
func (m *BroadcastTaskMaster) setMonitor(isRunning bool) {
m.lock.Lock()
m.monitor = isRunning
m.lock.Unlock()
}
func (m *BroadcastTaskMaster) isInstanceStatusFinished() bool {
return m.GetInstanceStatus().IsFinished()
}