internal/master/map_task_master.go (883 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package master
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/asynkron/protoactor-go/actor"
"github.com/tidwall/gjson"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/alibaba/schedulerx-worker-go/config"
"github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/batch"
"github.com/alibaba/schedulerx-worker-go/internal/common"
"github.com/alibaba/schedulerx-worker-go/internal/constants"
"github.com/alibaba/schedulerx-worker-go/internal/master/persistence"
"github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster"
"github.com/alibaba/schedulerx-worker-go/internal/masterpool"
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
"github.com/alibaba/schedulerx-worker-go/internal/remoting/pool"
"github.com/alibaba/schedulerx-worker-go/internal/tasks"
"github.com/alibaba/schedulerx-worker-go/internal/utils"
"github.com/alibaba/schedulerx-worker-go/logger"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
)
var (
_ taskmaster.MapTaskMaster = &MapTaskMaster{}
)
type MapTaskMaster struct {
*TaskMaster
actorCtx actor.Context
taskMasterPoolCleaner func(int64)
tasks *tasks.TaskMap
connpool pool.ConnPool
pageSize int32
queueSize int64
dispatcherSize int64
index int
taskStatusReqQueue *batch.ReqQueue
// task batch reporting queue, item: ContainerReportTaskStatusRequest
taskStatusReqBatchHandler *batch.TMStatusReqHandler
// Subtask memory cache queue, the push model is pulled and pushed actively through TaskDispatchReqHandler,
// and the pull model is pulled through PullThread
taskBlockingQueue *batch.ReqQueue
taskDispatchReqHandler batch.TaskDispatchReqHandler
// Handle task failure separately
rootTaskResult string
taskPersistence persistence.TaskPersistence
// map[string]*common.TaskProgressCounter
taskProgressMap *sync.Map
// map[string]*common.WorkerProgressCounter
workerProgressMap *sync.Map
taskResultMap map[int64]string
taskStatusMap map[int64]taskstatus.TaskStatus
xAttrs *common.MapTaskXAttrs
taskCounter *atomic.Int64
localTaskRouterPath string
once sync.Once
}
func NewMapTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) taskmaster.TaskMaster {
var (
connpool = pool.GetConnPool()
taskMasterPool = masterpool.GetTaskMasterPool()
taskMasterPoolCleaner = func(jobInstanceId int64) {
taskMasterPool.Get(jobInstanceId).Stop()
taskMasterPool.Remove(jobInstanceId)
}
)
mapTaskMaster := &MapTaskMaster{
actorCtx: actorCtx,
taskMasterPoolCleaner: taskMasterPoolCleaner,
tasks: taskMasterPool.Tasks(),
connpool: connpool,
pageSize: config.GetWorkerConfig().MapMasterPageSize(),
queueSize: int64(config.GetWorkerConfig().MapMasterQueueSize()),
dispatcherSize: int64(config.GetWorkerConfig().MapMasterDispatcherSize()),
taskProgressMap: new(sync.Map),
workerProgressMap: new(sync.Map),
taskResultMap: make(map[int64]string),
taskStatusMap: make(map[int64]taskstatus.TaskStatus),
taskCounter: atomic.NewInt64(0),
localTaskRouterPath: actorCtx.ActorSystem().Address(),
// taskStatusReqQueue: batch.NewReqQueue(100000),
// taskBlockingQueue: batch.NewReqQueue(100000),
}
statusHandler := NewCommonUpdateInstanceStatusHandler(actorCtx, mapTaskMaster, jobInstanceInfo)
if utils.IsSecondTypeJob(common.TimeType(jobInstanceInfo.GetTimeType())) {
statusHandler = NewSecondJobUpdateInstanceStatusHandler(actorCtx, mapTaskMaster, jobInstanceInfo)
}
mapTaskMaster.TaskMaster = NewTaskMaster(actorCtx, jobInstanceInfo, statusHandler)
// mapTaskMaster.taskStatusReqBatchHandler = batch.NewTMStatusReqHandler(jobInstanceInfo.GetJobInstanceId(), 1, 1, 3000, mapTaskMaster.taskStatusReqQueue)
// if jobInstanceInfo.GetXattrs() != "" {
// if err := json.Unmarshal([]byte(jobInstanceInfo.GetXattrs()), mapTaskMaster.xAttrs); err != nil {
// logger.Errorf("Unmarshal xAttrs failed, err=%s", err.Error())
// }
// }
// if mapTaskMaster.xAttrs != nil && mapTaskMaster.xAttrs.GetTaskDispatchMode() == string(common.TaskDispatchModePull) {
// mapTaskMaster.taskDispatchReqHandler = batch.NewTaskPullReqHandler(
// jobInstanceInfo.GetJobInstanceId(), 1, 1, int32(mapTaskMaster.pageSize*int64(len(jobInstanceInfo.GetAllWorkers()))),
// mapTaskMaster.taskBlockingQueue)
// } else {
// mapTaskMaster.taskDispatchReqHandler = batch.NewTaskPushReqHandler(
// jobInstanceInfo.GetJobInstanceId(), 1, 1, int32(mapTaskMaster.pageSize*int64(len(jobInstanceInfo.GetAllWorkers()))),
// mapTaskMaster.taskBlockingQueue, 3000)
// }
return mapTaskMaster
}
func (m *MapTaskMaster) init() {
m.once.Do(func() {
m.TaskMaster.Init()
jobIdAndInstanceId := strconv.FormatInt(m.GetJobInstanceInfo().GetJobId(), 10) + "_" + strconv.FormatInt(m.GetJobInstanceInfo().GetJobInstanceId(), 10)
logger.Infof("jobInstanceId=%d, map master config, pageSize:%d, queueSize:%d, dispatcherSize:%d, workerSize:%d",
jobIdAndInstanceId, m.pageSize, m.queueSize, m.dispatcherSize, len(m.GetJobInstanceInfo().GetAllWorkers()))
// pull
go m.pullTask(jobIdAndInstanceId)
// status check
go m.checkInstanceStatus()
// job instance progress report
if !utils.IsSecondTypeJob(common.TimeType(m.GetJobInstanceInfo().GetTimeType())) {
go m.reportJobInstanceProgress()
}
// worker alive check thread
go m.checkWorkerAlive()
// PULL_MODEL specially
// if m.xAttrs != nil && m.xAttrs.GetTaskDispatchMode() == string(common.TaskDispatchModePull) {
// go m.notifyWorkerPull()
// }
})
}
func (m *MapTaskMaster) pullTask(jobIdAndInstanceId string) {
for !m.GetInstanceStatus().IsFinished() {
jobInstanceId := m.GetJobInstanceInfo().GetJobInstanceId()
startTime := time.Now()
taskInfos, err := m.taskPersistence.Pull(jobInstanceId, m.pageSize)
if err != nil && errors.Is(err, persistence.ErrTimeout) {
logger.Errorf("pull task timeout, uniqueId: %s", jobIdAndInstanceId)
time.Sleep(10 * time.Second)
continue
}
logger.Debugf("jobInstanceId=%d, pull cost=%dms", jobInstanceId, time.Since(startTime).Milliseconds())
if len(taskInfos) == 0 {
logger.Debugf("pull task empty of jobInstanceId=%d, sleep 10s ...", jobInstanceId)
time.Sleep(10 * time.Second)
} else {
for _, taskInfo := range taskInfos {
taskName := taskInfo.TaskName()
if counter, ok := m.taskProgressMap.LoadOrStore(taskName, common.NewTaskProgressCounter(taskName)); ok {
counter.(*common.TaskProgressCounter).DecrementRunning()
}
req, err := m.convert2StartContainerRequest(m.GetJobInstanceInfo(), taskInfo.TaskId(), taskInfo.TaskName(), taskInfo.TaskBody(), true)
if err != nil {
errMsg := fmt.Sprintf("mapTaskMaster pull task failed, jobInstanceInfo=%+v, taskId=%d, taskName=%s, err=%s.", m.GetJobInstanceInfo(), taskInfo.TaskId(), taskInfo.TaskName(), err.Error())
logger.Errorf(errMsg)
m.updateNewInstanceStatus(m.GetSerialNum(), jobInstanceId, processor.InstanceStatusFailed, errMsg)
break // FIXME break or continue?
}
if m.taskBlockingQueue != nil {
m.taskBlockingQueue.SubmitRequest(req)
}
}
}
}
}
func (m *MapTaskMaster) checkInstanceStatus() {
checkInterval := config.GetWorkerConfig().MapMasterStatusCheckInterval()
for !m.GetInstanceStatus().IsFinished() {
time.Sleep(checkInterval)
newStatus := m.taskPersistence.CheckInstanceStatus(m.GetJobInstanceInfo().GetJobInstanceId())
if newStatus.IsFinished() && m.taskDispatchReqHandler.IsActive() {
var (
failCnt int64
successCnt int64
totalCnt int64
)
m.taskProgressMap.Range(func(key, value any) bool {
taskProgressCounter := value.(*common.TaskProgressCounter)
failCnt += taskProgressCounter.GetFailed()
successCnt += taskProgressCounter.GetSuccess()
totalCnt += taskProgressCounter.GetTotal()
return true
})
// avoid wrong early finish instance in condition root task was success but sub tasks are still creating.
time.Sleep(checkInterval)
continue
}
result := m.GetRootTaskResult()
if newStatus == processor.InstanceStatusSucceed {
// if return finish status, we need check counter
var (
failCnt int64
successCnt int64
totalCnt int64
)
m.taskProgressMap.Range(func(key, value any) bool {
taskProgressCounter := value.(*common.TaskProgressCounter)
failCnt += taskProgressCounter.GetFailed()
successCnt += taskProgressCounter.GetSuccess()
totalCnt += taskProgressCounter.GetTotal()
return true
})
if successCnt+failCnt < totalCnt {
newStatus = processor.InstanceStatusFailed
logger.Warnf("jobInstanceId=%d turn into finish status, but count isn't correct, successCnt:%d, failCnt:%d, totalCnt:%d", m.GetJobInstanceInfo().GetJobInstanceId(), successCnt, failCnt, totalCnt)
result = fmt.Sprintf("Turn into finish status, but count is wrong, sucCnt:%d, failCnt:%d, totalCnt:%d", successCnt, failCnt, totalCnt)
} else {
if failCnt > 0 {
newStatus = processor.InstanceStatusFailed
} else {
newStatus = processor.InstanceStatusSucceed
}
}
}
if err := m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), newStatus, result); err != nil {
logger.Errorf("updateNewInstanceStatus failed, serialNum=%v, jobInstanceId=%v, newStatus=%v, result=%v, err=%s",
m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), newStatus, result, err.Error())
}
}
}
func (m *MapTaskMaster) reportJobInstanceProgress() {
for !m.GetInstanceStatus().IsFinished() {
progress, err := m.GetJobInstanceProgress()
if err != nil {
logger.Errorf("report status error, uniqueId=%d, err=%s", m.GetJobInstanceInfo().GetJobInstanceId(), err.Error())
continue // FIXME continue or break?
}
req := &schedulerx.WorkerReportJobInstanceProgressRequest{
JobId: proto.Int64(m.GetJobInstanceInfo().GetJobId()),
JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
Progress: proto.String(progress),
}
// Send to server by master
logger.Infof("MapTaskMaster reportJobInstanceProgress, req=%+v", req)
actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: req,
}
time.Sleep(5 * time.Second)
}
}
func (m *MapTaskMaster) checkWorkerAlive() {
for !m.GetInstanceStatus().IsFinished() {
for _, worker := range m.GetJobInstanceInfo().GetAllWorkers() {
m.aliveCheckWorkerSet.Add(worker)
}
if m.aliveCheckWorkerSet.Len() == 0 {
logger.Warnf("worker list is empty, jobInstanceId=%d", m.GetJobInstanceInfo().GetJobInstanceId())
m.taskPersistence.BatchUpdateTaskStatus(m.GetJobInstanceInfo().GetJobInstanceId(), taskstatus.TaskStatusFailed, "", "")
break
} else {
for _, workerIdAddr := range m.aliveCheckWorkerSet.ToStringSlice() {
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
times := 0
for times < 3 {
conn, err := net.Dial("tcp", workerAddr)
if err == nil {
logger.Debugf("socket to %s is reachable, times=%d", workerAddr, times)
conn.Close()
break
} else {
logger.Warnf("socket to %s is not reachable, times=%d", workerAddr, times)
time.Sleep(5 * time.Second)
times++
}
}
if times >= 3 {
logger.Warnf("worker[%s] is down, start to remove this worker and failover tasks, jobInstanceId=%d", workerIdAddr, m.GetJobInstanceInfo().GetJobInstanceId())
m.handleWorkerShutdown(workerIdAddr)
continue
}
request := &schedulerx.MasterCheckWorkerAliveRequest{
JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
DispatchMode: proto.String(m.xAttrs.GetTaskDispatchMode()),
}
response, err := m.actorContext.RequestFuture(actorcomm.GetHeartbeatActorPid(workerAddr), request, 10*time.Second).Result()
if err != nil {
logger.Errorf("check worker error, jobInstanceId=%d, err=%s", m.GetJobInstanceInfo().GetJobInstanceId(), err.Error())
break
}
if resp := response.(*schedulerx.MasterCheckWorkerAliveResponse); !resp.GetSuccess() {
logger.Warnf("jobInstanceId=%d of worker=%s is not alive, remote worker resp=%+v", m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, resp.GetMessage())
m.handleWorkerShutdown(workerIdAddr)
// destroy containers of worker of PullModel
destroyContainerPoolRequest := &schedulerx.MasterDestroyContainerPoolRequest{
JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
JobId: proto.Int64(m.GetJobInstanceInfo().GetJobId()),
WorkerIdAddr: proto.String(workerAddr),
SerialNum: proto.Int64(m.GetSerialNum()),
}
actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: destroyContainerPoolRequest,
}
}
}
// Worker detection is performed every 10 seconds
time.Sleep(10 * time.Second)
}
}
}
func (m *MapTaskMaster) notifyWorkerPull() {
for !m.GetInstanceStatus().IsFinished() {
for _, workerIdAddr := range m.GetJobInstanceInfo().GetAllWorkers() {
// FIXME
jobInstanceActorPid := actor.NewPID(workerIdAddr, actorcomm.JobInstancePidId)
request := &schedulerx.MasterNotifyWorkerPullRequest{
JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
PageSize: proto.Int32(m.xAttrs.GetPageSize()),
QueueSize: proto.Int32(m.xAttrs.GetQueueSize()),
TaskMasterAkkaPath: proto.String(m.GetLocalTaskRouterPath()),
ConsumerSize: proto.Int32(m.xAttrs.GetConsumerSize()),
}
response, err := m.actorCtx.RequestFuture(jobInstanceActorPid, request, 5*time.Second).Result()
if err != nil {
logger.Errorf("notify worker pull error, jobInstanceId=%d, worker=%d, err=%s", m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, err.Error())
break
}
if resp := response.(*schedulerx.MasterNotifyWorkerPullResponse); !resp.GetSuccess() {
errorMsg := resp.GetMessage()
logger.Errorf("notify worker pull failed, jobInstanceId=%d", m.GetJobInstanceInfo().GetJobInstanceId(), errorMsg)
m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), processor.InstanceStatusFailed, errorMsg)
}
}
time.Sleep(5 * time.Second)
}
}
func (m *MapTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error {
var err error
defer func() {
if err != nil {
errMsg := fmt.Sprintf("Submit instance failed, err=%s", err.Error())
logger.Errorf(errMsg)
m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), processor.InstanceStatusFailed, errMsg)
}
}()
startTime := time.Now()
if m.dispatcherSize > constants.MapMasterDispatcherSizeMax {
m.dispatcherSize = constants.MapMasterDispatcherSizeMax
}
if err = m.startBatchHandler(); err != nil {
return err
}
if err = m.createRootTask(); err != nil {
return err
}
logger.Infof("jobInstanceId=%d create root task, cost=%dms", jobInstanceInfo.GetJobInstanceId(), time.Since(startTime).Milliseconds())
m.init()
return err
}
func (m *MapTaskMaster) UpdateTaskStatus(request *schedulerx.ContainerReportTaskStatusRequest) error {
m.taskStatusReqQueue.SubmitRequest(request)
return nil
}
func (m *MapTaskMaster) BatchUpdateTaskStatues(requests []*schedulerx.ContainerReportTaskStatusRequest) {
finalTaskStatus := make(map[int64]*schedulerx.ContainerReportTaskStatusRequest)
for _, request := range requests {
taskStatus := taskstatus.TaskStatus(request.GetStatus())
// Filter intermediate states
if _, ok := finalTaskStatus[request.GetTaskId()]; !ok || taskStatus.IsFinished() {
finalTaskStatus[request.GetTaskId()] = request
}
var (
workerAddr = request.GetWorkerAddr()
taskName = request.GetTaskName()
)
logger.Debugf("report task status:%s from worker:%s, uniqueId:%s", taskStatus.Descriptor(), workerAddr,
utils.GetUniqueId(request.GetJobId(), request.GetJobInstanceId(), request.GetTaskId()))
m.taskProgressMap.LoadOrStore(taskName, common.NewTaskProgressCounter(taskName))
if _, ok := m.workerProgressMap.Load(workerAddr); workerAddr != "" && !ok {
m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
}
switch taskStatus {
case taskstatus.TaskStatusRunning:
if val, ok := m.taskProgressMap.Load(taskName); ok {
val.(*common.TaskProgressCounter).IncrementRunning()
}
if workerAddr != "" {
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementRunning()
}
}
case taskstatus.TaskStatusSucceed:
if val, ok := m.taskProgressMap.Load(taskName); ok {
val.(*common.TaskProgressCounter).IncrementOneSuccess()
}
if workerAddr != "" {
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementSuccess()
}
}
case taskstatus.TaskStatusFailed:
if val, ok := m.taskProgressMap.Load(taskName); ok {
val.(*common.TaskProgressCounter).IncrementOneFailed()
}
if workerAddr != "" {
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementOneFailed()
}
}
}
// update taskResultMap and taskStatusMap
m.taskResultMap[request.GetTaskId()] = request.GetResult()
m.taskStatusMap[request.GetTaskId()] = taskStatus
}
startTime := time.Now()
// Return the reason for the failure of the root task node.
// It is possible to return two root task requests at the same time, which are running and failed statuses, take the last one.
idx := len(requests) - 1
if idx >= 0 && requests[idx].GetStatus() == int32(taskstatus.TaskStatusFailed) && requests[idx].GetTaskName() == constants.MapTaskRootName {
m.SetRootTaskResult(requests[idx].GetResult())
}
updateSuccess := false
allTaskStatus := make([]*schedulerx.ContainerReportTaskStatusRequest, 0, len(finalTaskStatus))
for _, value := range finalTaskStatus {
allTaskStatus = append(allTaskStatus, value)
}
for i := 0; i < 3; i++ {
// try 3 times
// FIXME if need 3 times?
if err := m.taskPersistence.UpdateTaskStatues(allTaskStatus); err != nil {
logger.Errorf("jobInstanceId=%d, persistent batch updateTaskStatus error, err=%s", m.jobInstanceInfo.GetJobInstanceId(), err.Error())
continue
}
updateSuccess = true
break
}
if !updateSuccess {
err := m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, "persistent batch update TaskStatus error up to 3 times")
if err != nil {
logger.Errorf("jobInstanceId=%d, UpdateNewInstanceStatus failed, err=%s", m.GetJobInstanceInfo().GetJobInstanceId(), err.Error())
}
}
logger.Debugf("jobInstanceId=%d batch update status db cost %dms", m.GetJobInstanceInfo().GetJobInstanceId(), time.Since(startTime).Milliseconds())
}
func (m *MapTaskMaster) Map(jobCtx *jobcontext.JobContext, taskList [][]byte, taskName string) (bool, error) {
logger.Debugf("map taskName:%s, size:%d", taskName, len(taskList))
m.initTaskProgress(taskName, len(taskList))
for _, taskBody := range taskList {
startContainerRequest, err := m.convert2StartContainerRequest(m.GetJobInstanceInfo(), m.AcquireTaskId(), taskName, taskBody, false)
if err != nil {
return false, fmt.Errorf("convert2StartContainerRequest failed, err=%s", err.Error())
}
m.taskBlockingQueue.SubmitRequest(startContainerRequest)
}
return m.machineOverload(), nil
}
func (m *MapTaskMaster) machineOverload() bool {
var (
memOverload = false
loadOverload = false
taskQueueOverload = false
)
// FIXME golang get heap and cpu metric
// vmDetail := MetricsCollector.getMetrics()
// if vmDetail != nil {
// memOverload = vmDetail.getHeap1Usage() >= WorkerConstants.USER_MEMORY_PERCENT_MAX
// loadOverload = vmDetail.getCpuLoad1() >= vmDetail.getCpuProcessors()
// }
return memOverload || loadOverload || taskQueueOverload
}
func (m *MapTaskMaster) clearTasks(jobInstanceId int64) {
m.taskPersistence.ClearTasks(jobInstanceId)
logger.Infof("jobInstanceId=%d clearTasks success.", jobInstanceId)
}
func (m *MapTaskMaster) createRootTask() error {
taskName := constants.MapTaskRootName
taskBody, err := json.Marshal(constants.MapTaskRootName)
if err != nil {
return err
}
m.initTaskProgress(taskName, 1)
startContainerRequest, err := m.convert2StartContainerRequest(m.GetJobInstanceInfo(), m.AcquireTaskId(), taskName, taskBody, false)
if err != nil {
return fmt.Errorf("convert2StartContainerRequest failed, err=%s", err.Error())
}
m.BatchDispatchTasks([]*schedulerx.MasterStartContainerRequest{startContainerRequest}, m.GetLocalWorkerIdAddr())
return nil
}
func (m *MapTaskMaster) batchHandleContainers(workerIdAddr string, reqs []*schedulerx.MasterStartContainerRequest, isFailover bool, dispatchMode common.TaskDispatchMode) {
parts := strings.Split(workerIdAddr, "@")
workerId := parts[0]
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
logger.Debugf("jobInstanceId=%d, batch dispatch, worker:%s, size:%d", m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs))
m.batchHandlePersistence(workerId, workerAddr, reqs, isFailover)
if dispatchMode == common.TaskDispatchModePush {
startTime := time.Now()
// FIXME
// workerAddr = actorcomm.GetRemoteWorkerAddr(workerAddr)
containerRouterActorPid := actorcomm.GetContainerRouterPid(workerAddr)
req := &schedulerx.MasterBatchStartContainersRequest{
JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
JobId: proto.Int64(m.GetJobInstanceInfo().GetJobId()),
StartReqs: reqs,
}
future := m.actorCtx.RequestFuture(containerRouterActorPid, req, 15*time.Second)
result, err := future.Result()
if err == nil {
// Trigger success callback
resp := result.(*schedulerx.MasterBatchStartContainersResponse)
if resp.GetSuccess() {
logger.Infof("jobInstanceId=%d, batch start containers successfully, size:%d, worker=%s, cost=%dms",
m.GetJobInstanceInfo().GetJobInstanceId(), len(reqs), workerIdAddr, time.Since(startTime).Milliseconds())
} else {
logger.Errorf("jobInstanceId=%d, batch start containers failed, worker=%s, response=%s, size:%d",
m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, resp.GetMessage(), len(reqs))
// TODO 发送失败应该尝试另一个worker还是直接置为失败?可能要根据返回值进行处理
// Currently it is set to fail directly
m.batchUpdateTaskStatus(workerId, workerAddr, reqs)
}
} else {
// Trigger timeout or failure callback
if errors.Is(err, actor.ErrTimeout) {
if len(m.GetJobInstanceInfo().GetAllWorkers()) == 1 {
logger.Errorf("jobInstanceId:%d, batch dispatch tasks failed due to only existed worker[%s] was down, size:%d, error=%s",
m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs), err.Error())
m.batchUpdateTaskStatus(workerId, workerAddr, reqs)
return
}
logger.Warnf("jobInstanceId=%d, worker[%s] is down, try another worker, size:%d",
m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs))
// TODO: worker挂了,先移除该worker,再尝试发给另一个worker,这里暂时去掉,探活交给专门的线程去干这里不做判断;
m.GetJobInstanceInfo().SetAllWorkers(utils.RemoveSliceElem(m.GetJobInstanceInfo().GetAllWorkers(), workerIdAddr))
// Send timeout, fallback to init status
var taskIds []int64
for _, req := range reqs {
taskIds = append(taskIds, req.GetTaskId())
}
affectCnt, err := m.taskPersistence.UpdateTaskStatus(m.GetJobInstanceInfo().GetJobInstanceId(), taskIds, taskstatus.TaskStatusInit, workerId, workerAddr)
if err != nil {
logger.Errorf("jobInstanceId=%d, timeout return init error", m.GetJobInstanceInfo().GetJobInstanceId())
m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, "timeout dispatch return init error")
}
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).DecrementRunning(affectCnt)
}
} else {
// If there are other exceptions (such as serialization failure, worker cannot be found), directly set the task to failure.
logger.Errorf("jobInstanceId:%d, batch dispatch Tasks error, worker=%s, size:%d, error=%s", m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs), err.Error())
m.batchUpdateTaskStatus(workerId, workerAddr, reqs)
}
}
}
}
func (m *MapTaskMaster) batchUpdateTaskStatus(workerId, workerAddr string, reqs []*schedulerx.MasterStartContainerRequest) {
for _, req := range reqs {
if val, ok := m.taskProgressMap.Load(req.GetTaskName()); ok {
val.(*common.TaskProgressCounter).IncrementOneFailed()
}
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementOneFailed()
}
failedStatusRequest := &schedulerx.ContainerReportTaskStatusRequest{
JobId: proto.Int64(m.GetJobInstanceInfo().GetJobId()),
JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
TaskId: proto.Int64(req.GetTaskId()),
Status: proto.Int32(int32(taskstatus.TaskStatusFailed)),
WorkerId: proto.String(workerId),
TaskName: proto.String(req.GetTaskName()),
WorkerAddr: proto.String(workerAddr),
}
m.UpdateTaskStatus(failedStatusRequest)
}
}
func (m *MapTaskMaster) batchHandlePersistence(workerId, workerAddr string, reqs []*schedulerx.MasterStartContainerRequest, isFailover bool) {
startTime := time.Now()
if !isFailover {
// first dispatch
if err := m.taskPersistence.CreateTasks(reqs, workerId, workerAddr); err != nil {
logger.Errorf("Batch persistence tasks to DB by CreateTasks failed, err=%s, reqs len=%d, workerId=%v, workerAddr=%v", err.Error(), len(reqs), workerId, workerAddr)
}
} else {
// failover, not first dispatch
taskIds := make([]int64, 0, len(reqs))
for _, req := range reqs {
taskIds = append(taskIds, req.GetTaskId())
}
_, err := m.taskPersistence.UpdateTaskStatus(m.GetJobInstanceInfo().GetJobInstanceId(), taskIds, taskstatus.TaskStatusRunning, workerId, workerAddr)
if err != nil {
logger.Errorf("Batch persistence tasks to DB by UpdateTaskStatus failed, err=%s, jobInstanceId=%d, tasks len=%d, workerId=%v, workerAddr=%v", err.Error(), m.GetJobInstanceInfo().GetJobInstanceId(), len(taskIds), workerId, workerAddr)
}
}
logger.Debugf("jobInstance=%d, batch dispatch db cost:%dms, size:%d", m.GetJobInstanceInfo().GetJobInstanceId(), time.Since(startTime).Milliseconds(), len(reqs))
}
// batchHandleRunningProgress is deprecated
func (m *MapTaskMaster) batchHandleRunningProgress(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest,
worker2ReqsWithNormal map[string][]*schedulerx.MasterStartContainerRequest, worker2ReqsWithFailover map[string][]*schedulerx.MasterStartContainerRequest) {
for _, request := range masterStartContainerRequests {
workerIdAddr := m.selectWorker()
if workerIdAddr == "" {
m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), processor.InstanceStatusFailed, "all worker is down!")
break
}
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
if request.GetFailover() {
if _, ok := worker2ReqsWithFailover[workerIdAddr]; !ok {
worker2ReqsWithFailover[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
} else {
worker2ReqsWithFailover[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request)
}
} else {
if _, ok := worker2ReqsWithNormal[workerIdAddr]; !ok {
worker2ReqsWithNormal[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
} else {
worker2ReqsWithNormal[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request)
}
}
if val, ok := m.taskProgressMap.Load(request.GetTaskName()); ok {
val.(*common.TaskProgressCounter).IncrementRunning()
}
if _, ok := m.workerProgressMap.Load(workerAddr); workerAddr != "" && !ok {
m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
}
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementTotal()
val.(*common.WorkerProgressCounter).IncrementRunning()
}
}
}
func (m *MapTaskMaster) BatchHandlePulledProgress(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest,
remoteWorker string) (map[string][]*schedulerx.MasterStartContainerRequest, map[string][]*schedulerx.MasterStartContainerRequest) {
var (
worker2ReqsWithNormal = make(map[string][]*schedulerx.MasterStartContainerRequest)
worker2ReqsWithFailover = make(map[string][]*schedulerx.MasterStartContainerRequest)
)
for _, request := range masterStartContainerRequests {
workerIdAddr := remoteWorker
if workerIdAddr == "" {
workerIdAddr = m.selectWorker()
}
if workerIdAddr == "" {
if err := m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(),
processor.InstanceStatusFailed, "all worker is down!"); err != nil {
logger.Errorf("updateNewInstanceStatus failed in BatchHandlePulledProgress, err=%s", err.Error())
}
break
}
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
if request.GetFailover() {
if _, ok := worker2ReqsWithFailover[workerIdAddr]; !ok {
worker2ReqsWithFailover[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
} else {
worker2ReqsWithFailover[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request)
}
} else {
if _, ok := worker2ReqsWithNormal[workerIdAddr]; !ok {
worker2ReqsWithNormal[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
} else {
worker2ReqsWithNormal[workerIdAddr] = append(worker2ReqsWithNormal[workerIdAddr], request)
}
// The subtasks of failover do not need to be counted anymore
if val, ok := m.taskProgressMap.Load(request.GetTaskName()); ok {
val.(*common.TaskProgressCounter).IncrementOnePulled()
}
}
if _, ok := m.workerProgressMap.Load(workerAddr); workerAddr != "" && !ok {
m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
}
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementTotal()
val.(*common.WorkerProgressCounter).IncrementPulled()
}
}
return worker2ReqsWithNormal, worker2ReqsWithFailover
}
// BatchDispatchTasks dispatches tasks
func (m *MapTaskMaster) BatchDispatchTasks(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest, remoteWorker string) {
worker2ReqsWithNormal, worker2ReqsWithFailover := m.BatchHandlePulledProgress(masterStartContainerRequests, remoteWorker)
// Push model starts subtask normally
for key, val := range worker2ReqsWithNormal {
m.batchHandleContainers(key, val, false, common.TaskDispatchModePush)
}
// Push model worker hangs up, failover subtask to other workers
for key, val := range worker2ReqsWithFailover {
m.batchHandleContainers(key, val, true, common.TaskDispatchModePush)
}
}
func (m *MapTaskMaster) BatchPullTasks(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest, workerIdAddr string) {
worker2ReqsWithNormal, worker2ReqsWithFailover := m.BatchHandlePulledProgress(masterStartContainerRequests, workerIdAddr)
// Pull model persistence tasks
for key, val := range worker2ReqsWithNormal {
m.batchHandleContainers(key, val, false, common.TaskDispatchModePull)
}
// Pull model update tasks
for key, val := range worker2ReqsWithFailover {
m.batchHandleContainers(key, val, true, common.TaskDispatchModePull)
}
}
func (m *MapTaskMaster) selectWorker() string {
allWorkers := m.GetJobInstanceInfo().GetAllWorkers()
size := len(allWorkers)
if size == 0 {
return ""
} else if m.index >= size {
m.index = m.index % size
}
worker := allWorkers[m.index]
m.index++
return worker
}
func (m *MapTaskMaster) KillInstance(reason string) error {
m.TaskMaster.KillInstance(reason)
allWorkers := m.GetJobInstanceInfo().GetAllWorkers()
for _, workerIdAddr := range allWorkers {
request := &schedulerx.MasterKillContainerRequest{
JobId: proto.Int64(m.GetJobInstanceInfo().GetJobId()),
JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
MayInterruptIfRunning: proto.Bool(false),
}
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
m.actorCtx.Send(actorcomm.GetContainerRouterPid(workerAddr), request)
}
return m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), processor.InstanceStatusFailed, reason)
}
func (m *MapTaskMaster) DestroyContainerPool() {
allWorkers := m.GetJobInstanceInfo().GetAllWorkers()
for _, workerIdAddr := range allWorkers {
request := &schedulerx.MasterDestroyContainerPoolRequest{
JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
JobId: proto.Int64(m.GetJobInstanceInfo().GetJobId()),
WorkerIdAddr: proto.String(workerIdAddr),
SerialNum: proto.Int64(m.GetSerialNum()),
}
actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: request,
}
}
}
func (m *MapTaskMaster) KillTask(uniqueId string, workerId string, workerAddr string) {
var (
workerIdAddr = fmt.Sprintf("%v%v%v", workerId, "@", workerAddr)
jobInstanceId, err1 = utils.ParseId(uniqueId, utils.IdTypeJobInstanceId)
jobId, err2 = utils.ParseId(uniqueId, utils.IdTypeJobId)
taskId, err3 = utils.ParseId(uniqueId, utils.IdTypeTaskId)
)
if err1 != nil || err2 != nil || err3 != nil {
logger.Errorf("send kill request exception due to invalid uniqueId=%d, workerIdAddr=%s", uniqueId, workerIdAddr)
return
}
request := &schedulerx.MasterKillContainerRequest{
JobInstanceId: proto.Int64(jobInstanceId),
JobId: proto.Int64(jobId),
TaskId: proto.Int64(taskId),
MayInterruptIfRunning: proto.Bool(false),
}
m.actorCtx.Send(actorcomm.GetContainerRouterPid(workerAddr), request)
}
func (m *MapTaskMaster) GetJobInstanceProgress() (string, error) {
detail := common.NewMapTaskProgress()
var taskProgressCounters []*common.TaskProgressCounter
m.taskProgressMap.Range(func(_, val interface{}) bool {
taskProgressCounters = append(taskProgressCounters, val.(*common.TaskProgressCounter))
return true
})
var workerProgressCounters []*common.WorkerProgressCounter
m.workerProgressMap.Range(func(_, val interface{}) bool {
workerProgressCounters = append(workerProgressCounters, val.(*common.WorkerProgressCounter))
return true
})
detail.SetTaskProgress(taskProgressCounters)
detail.SetWorkerProgress(workerProgressCounters)
progress, err := json.Marshal(detail)
if err != nil {
return "", fmt.Errorf("Get jobInstance progress failed, marshal to json error, err=%s ", err.Error())
}
return string(progress), nil
}
func (m *MapTaskMaster) CheckProcessor() {
// FIXME
// if "java".equalsIgnoreCase(jobInstanceInfo.getJobType()) {
// processor := JavaProcessorProfileUtil.getJavaProcessor(jobInstanceInfo.getContent())
// if !ok {
// throw(NewIOException(processor.getClass().getName() + " must extends MapJobProcessor or MapReduceJobProcessor"))
// }
// }
}
func (m *MapTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult {
var reduceResult *processor.ProcessResult
jobCtx := new(jobcontext.JobContext)
jobCtx.SetJobId(m.GetJobInstanceInfo().GetJobId())
jobCtx.SetJobInstanceId(jobInstanceId)
jobCtx.SetJobType(m.GetJobInstanceInfo().GetJobType())
jobCtx.SetContent(m.GetJobInstanceInfo().GetContent())
jobCtx.SetScheduleTime(m.GetJobInstanceInfo().GetScheduleTime())
jobCtx.SetDataTime(m.GetJobInstanceInfo().GetDataTime())
jobCtx.SetJobParameters(m.GetJobInstanceInfo().GetParameters())
jobCtx.SetInstanceParameters(m.GetJobInstanceInfo().GetInstanceParameters())
jobCtx.SetUser(m.GetJobInstanceInfo().GetUser())
jobCtx.SetTaskResults(m.taskResultMap)
jobCtx.SetTaskStatuses(m.taskStatusMap)
jobName := gjson.Get(jobCtx.Content(), "jobName").String()
// Compatible with the existing Java language configuration mechanism
if jobCtx.JobType() == "java" {
jobName = gjson.Get(jobCtx.Content(), "className").String()
}
jobProcessor, ok := masterpool.GetTaskMasterPool().Tasks().Find(jobName)
if !ok {
reduceResult = processor.NewProcessResult()
reduceResult.SetFailed()
reduceResult.SetResult(fmt.Sprintf("job=%s can not cast to MapReduceJobProcessor, must implement MapReduceJobProcessor interface to support reduce operations", jobName))
return reduceResult
}
if mpProcessor, ok := jobProcessor.(processor.MapReduceJobProcessor); ok {
runReduceIfFail := mpProcessor.RunReduceIfFail(jobCtx)
if m.GetInstanceStatus() == processor.InstanceStatusFailed && !runReduceIfFail {
logger.Warnf("jobInstanceId=%d is failed, skip reduce", jobInstanceId)
return nil
}
reduceTaskName := constants.ReduceTaskName
taskProgressCounter, _ := m.taskProgressMap.LoadOrStore(reduceTaskName, common.NewTaskProgressCounter(reduceTaskName))
taskProgressCounter.(*common.TaskProgressCounter).IncrementOneTotal()
taskProgressCounter.(*common.TaskProgressCounter).IncrementRunning()
workerAddr := m.actorCtx.ActorSystem().Address()
workerProgressCounter, _ := m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
workerProgressCounter.(*common.WorkerProgressCounter).IncrementTotal()
workerProgressCounter.(*common.WorkerProgressCounter).IncrementRunning()
result, err := mpProcessor.Reduce(jobCtx)
if err != nil {
result = processor.NewProcessResult()
result.SetFailed()
result.SetResult("reduce exception: " + err.Error())
}
if result.Status() == processor.InstanceStatusSucceed {
if val, ok := m.taskProgressMap.Load(reduceTaskName); ok {
val.(*common.TaskProgressCounter).IncrementOneSuccess()
}
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementSuccess()
}
} else {
if val, ok := m.taskProgressMap.Load(reduceTaskName); ok {
val.(*common.TaskProgressCounter).IncrementOneFailed()
}
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementOneFailed()
}
}
return result
}
return reduceResult
}
func (m *MapTaskMaster) Stop() {
if m.taskDispatchReqHandler != nil {
m.taskDispatchReqHandler.Stop()
}
if m.taskStatusReqBatchHandler != nil {
m.taskStatusReqBatchHandler.Stop()
}
logger.Infof("jobInstanceId:%v, instance master stop succeed.", m.GetJobInstanceInfo().GetJobInstanceId())
}
func (m *MapTaskMaster) startBatchHandler() error {
// FIXME
// if m.IsInited() {
// return nil
// }
// start batch handlers
if err := m.taskStatusReqBatchHandler.Start(m.taskStatusReqBatchHandler); err != nil {
return err
}
// m.taskBlockingQueue = batch.NewReqQueue(m.queueSize)
if m.xAttrs != nil && m.xAttrs.GetTaskDispatchMode() == string(common.TaskDispatchModePush) {
m.taskDispatchReqHandler.SetWorkThreadNum(int(m.dispatcherSize))
m.taskDispatchReqHandler.SetDispatchSize(int(m.pageSize) * len(m.GetJobInstanceInfo().GetAllWorkers()))
m.taskDispatchReqHandler.Start(m.taskDispatchReqHandler)
}
return nil
}
func (m *MapTaskMaster) getTotalPulledAndRunning() int64 {
var total int64
taskCounters := make([]*common.TaskProgressCounter, 0, 10)
m.taskProgressMap.Range(func(key, value any) bool {
taskCounters = append(taskCounters, value.(*common.TaskProgressCounter))
return true
})
for _, taskProgressCounter := range taskCounters {
total += taskProgressCounter.GetPulled()
total += taskProgressCounter.GetRunning()
}
return total
}
func (m *MapTaskMaster) GetRootTaskResult() string {
return m.rootTaskResult
}
func (m *MapTaskMaster) SetRootTaskResult(rootTaskResult string) {
m.rootTaskResult = rootTaskResult
}
func (m *MapTaskMaster) initTaskProgress(taskName string, delta int) {
taskProgressCounter, _ := m.taskProgressMap.LoadOrStore(taskName, common.NewTaskProgressCounter(taskName))
taskProgressCounter.(*common.TaskProgressCounter).IncrementTotal(int64(delta))
}
func (m *MapTaskMaster) SyncPullTasks(pageSize int32, workerIdAddr string) []*schedulerx.MasterStartContainerRequest {
if m.getTotalPulledAndRunning() >= int64(m.xAttrs.GetGlobalConsumerSize()) {
return nil
} else {
if reqs := m.taskDispatchReqHandler.SyncHandleReqs(m.taskDispatchReqHandler, pageSize, workerIdAddr); reqs != nil {
ret := make([]*schedulerx.MasterStartContainerRequest, 0, len(reqs))
for _, req := range reqs {
ret = append(ret, req.(*schedulerx.MasterStartContainerRequest))
}
return ret
}
return nil
}
}
func (m *MapTaskMaster) Clear(taskMaster taskmaster.TaskMaster) {
m.TaskMaster.Clear(taskMaster)
if m.taskStatusReqQueue != nil {
m.taskStatusReqQueue.Clear()
}
if m.taskBlockingQueue != nil {
m.taskBlockingQueue.Clear()
}
if m.taskDispatchReqHandler != nil {
m.taskDispatchReqHandler.Clear()
}
if m.taskStatusReqBatchHandler != nil {
m.taskStatusReqBatchHandler.Clear()
}
if m.taskProgressMap != nil {
m.taskProgressMap = nil
}
if m.workerProgressMap != nil {
m.workerProgressMap = nil
}
if m.taskResultMap != nil {
m.taskResultMap = nil
}
if m.taskStatusMap != nil {
m.taskStatusMap = nil
}
m.clearTasks(m.GetJobInstanceInfo().GetJobInstanceId())
m.taskCounter = atomic.NewInt64(0)
}
func (m *MapTaskMaster) GetTaskProgressMap() *sync.Map {
return m.taskProgressMap
}
func (m *MapTaskMaster) handleWorkerShutdown(workerIdAddr string) {
m.GetAliveCheckWorkerSet().Remove(workerIdAddr)
m.GetJobInstanceInfo().SetAllWorkers(utils.RemoveSliceElem(m.GetJobInstanceInfo().GetAllWorkers(), workerIdAddr))
// adjust dispatch batch size
m.taskDispatchReqHandler.SetDispatchSize(m.GetAliveCheckWorkerSet().Len() * int(m.pageSize))
parts := strings.Split(workerIdAddr, "@")
workerId := parts[0]
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
if config.GetWorkerConfig().IsMapMasterFailover() {
// If failover is enabled, set it to init state and wait for re-pull.
affectCnt := m.taskPersistence.BatchUpdateTaskStatus(m.GetJobInstanceInfo().GetJobInstanceId(), taskstatus.TaskStatusInit, workerId, workerAddr)
logger.Warnf("jobInstanceId=%d, failover task number:%d, workerId:%s, workerAddr:%s", m.GetJobInstanceInfo().GetJobInstanceId(), affectCnt, workerId, workerAddr)
if affectCnt > 0 {
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).DecrementRunning(affectCnt)
}
}
} else {
// If failover is not enabled, the subtask on this worker will be directly set to failure.
affectCnt := m.taskPersistence.BatchUpdateTaskStatus(m.GetJobInstanceInfo().GetJobInstanceId(), taskstatus.TaskStatusFailed, workerId, workerAddr)
logger.Warnf("jobInstanceId=%d, failover task number:%d, workerId:%s, workerAddr:%s", m.GetJobInstanceInfo().GetJobInstanceId(), affectCnt, workerId, workerAddr)
if affectCnt > 0 {
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementFailed(affectCnt)
}
}
}
}