internal/master/persistence/server_task_persistence.go (245 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package persistence import ( "encoding/json" "fmt" "strings" "time" "google.golang.org/protobuf/proto" "github.com/alibaba/schedulerx-worker-go/internal/actor/common" "github.com/alibaba/schedulerx-worker-go/internal/common" "github.com/alibaba/schedulerx-worker-go/internal/discovery" "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" "github.com/alibaba/schedulerx-worker-go/logger" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/mapjob/bizsubtask" "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) var _ TaskPersistence = &ServerTaskPersistence{} type ServerTaskPersistence struct { serverDiscovery *discovery.ServiceDiscover groupManager *discovery.GroupManager groupId string } func NewServerTaskPersistence(groupId string) (rcvr *ServerTaskPersistence) { return &ServerTaskPersistence{ groupId: groupId, serverDiscovery: discovery.GetDiscovery(groupId), } } func (rcvr *ServerTaskPersistence) BatchUpdateTaskStatus(jobInstanceId int64, status taskstatus.TaskStatus, workerId string, workerAddr string) int64 { var affectCnt int64 req := &schedulerx.WorkerBatchUpdateTaskStatusRequest{ JobInstanceId: proto.Int64(jobInstanceId), Status: proto.Int32(int32(status)), } if workerAddr != "" { req.WorkerAddr = proto.String(workerAddr) req.WorkerId = proto.String(workerId) } // Send to server by master actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ Msg: req, } // Wait 5 seconds timer := time.NewTimer(5 * time.Second) defer timer.Stop() select { case resp := <-actorcomm.WorkerBatchUpdateTaskStatusRespMsgSender(): if resp.GetSuccess() { affectCnt = int64(resp.GetAffectCnt()) logger.Debugf("batch update Status=>%d to Server succeed, JobInstanceId=%d, workerAddr=%s", status, jobInstanceId, workerAddr) } else { logger.Errorf("batch update Status failed, response message=%s", resp.GetMessage()) } case <-timer.C: logger.Errorf("BatchUpdateTaskStatus of JobInstanceId=%d in ServerTaskPersistence timeout", jobInstanceId) } return affectCnt } func (rcvr *ServerTaskPersistence) CheckInstanceStatus(jobInstanceId int64) processor.InstanceStatus { status := processor.InstanceStatusUnknown req := &schedulerx.WorkerQueryJobInstanceStatusRequest{ JobInstanceId: proto.Int64(jobInstanceId), } // Send to server by master actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ Msg: req, } // Wait 30 seconds timer := time.NewTimer(30 * time.Second) defer timer.Stop() select { case resp := <-actorcomm.WorkerQueryJobInstanceStatusRespMsgSender(): if resp.GetSuccess() { status = processor.InstanceStatus(resp.GetStatus()) } else { logger.Errorf("query job instance Status failed, resp message=%s", resp.GetMessage()) } case <-timer.C: logger.Errorf("CheckInstanceStatus of JobInstanceId=%d in ServerTaskPersistence timeout", jobInstanceId) } return status } func (rcvr *ServerTaskPersistence) ClearTasks(jobInstanceId int64) error { req := &schedulerx.WorkerClearTasksRequest{ JobInstanceId: proto.Int64(jobInstanceId), } // Send to server by master actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ Msg: req, } // Wait 5 seconds timer := time.NewTimer(5 * time.Second) defer timer.Stop() select { case resp := <-actorcomm.WorkerClearTasksRespMsgSender(): if resp.GetSuccess() { logger.Infof("clear tasks of jobInstance[%d] succeed", jobInstanceId) } else { errMsg := fmt.Sprintf("clear tasks of jobInstance[%d] failed, error=%s", jobInstanceId, resp.GetMessage()) logger.Errorf(errMsg) return fmt.Errorf(errMsg) } case <-timer.C: logger.Errorf("ClearTasks of JobInstanceId=%d in ServerTaskPersistence timeout", jobInstanceId) } return nil } func (rcvr *ServerTaskPersistence) convert2TaskInfo(taskMessage *schedulerx.TaskMessage) *common.TaskInfo { req := new(common.TaskInfo) req.SetTaskId(taskMessage.GetTaskId()) req.SetTaskName(taskMessage.GetTaskName()) req.SetTaskBody(taskMessage.GetTaskBody()) return req } // CreateTask do nothing, already create by server. func (rcvr *ServerTaskPersistence) CreateTask(jobId int64, jobInstanceId int64, taskId int64, taskName string, taskBody []byte) error { // do nothing return nil } func (rcvr *ServerTaskPersistence) CreateTasks(containers []*schedulerx.MasterStartContainerRequest, workerId string, workerAddr string) error { if len(containers) == 0 { return fmt.Errorf("createTasks container list empty") } jobInstanceId := containers[0].GetJobInstanceId() batchReqs := new(schedulerx.WorkerBatchCreateTasksRequest) isAdvancedVersion := rcvr.groupManager.IsAdvancedVersion(rcvr.groupId) for _, taskInfo := range containers { req := &schedulerx.WorkerCreateTaskRequest{ JobId: proto.Int64(taskInfo.GetJobId()), JobInstanceId: proto.Int64(taskInfo.GetJobInstanceId()), TaskId: proto.Int64(taskInfo.GetTaskId()), TaskName: proto.String(taskInfo.GetTaskName()), TaskBody: taskInfo.GetTask(), } task := new(interface{}) if err := json.Unmarshal(taskInfo.GetTask(), task); err != nil { return fmt.Errorf("json unmarshal TaskBody failed, err=%s", err.Error()) } bisSubTask, ok := (*task).(bizsubtask.BizSubTask) if isAdvancedVersion && ok { tmp, err := json.Marshal(bisSubTask.LabelMap()) if err != nil { return fmt.Errorf("json marshal TaskBody's labelMap failed, err=%s", err.Error()) } req.LabelMap = proto.String(string(tmp)) } batchReqs.Task = append(batchReqs.Task, req) } batchReqs.JobInstanceId = proto.Int64(jobInstanceId) batchReqs.WorkerId = proto.String(workerId) batchReqs.WorkerAddr = proto.String(workerAddr) // Send to server by master actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ Msg: batchReqs, } // Wait 90 seconds timer := time.NewTimer(90 * time.Second) defer timer.Stop() select { case resp := <-actorcomm.WorkerBatchCreateTasksRespMsgSender(): if resp.GetSuccess() { logger.Infof("batch create tasks to server succeed, JobInstanceId=%d, size=%d", jobInstanceId, len(containers)) } else { errMsg := fmt.Sprintf("batch create tasks error, JobInstanceId=%d, reason=%s.", jobInstanceId, resp.GetMessage()) logger.Errorf(errMsg) return fmt.Errorf(errMsg) } case <-timer.C: logger.Errorf("ClearTasks of JobInstanceId=%d in ServerTaskPersistence timeout", jobInstanceId) } return nil } func (rcvr *ServerTaskPersistence) InitTable() { // FXIME do nothing } func (rcvr *ServerTaskPersistence) Pull(jobInstanceId int64, pageSize int32) ([]*common.TaskInfo, error) { taskInfos := make([]*common.TaskInfo, 0, 10) req := &schedulerx.WorkerPullTasksRequest{ JobInstanceId: proto.Int64(jobInstanceId), PageSize: proto.Int32(pageSize), } // Send to server by master actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ Msg: req, } // Wait 30 seconds timer := time.NewTimer(30 * time.Second) defer timer.Stop() select { case resp := <-actorcomm.WorkerPullTasksRespMsgSender(): if resp.GetSuccess() { taskMessages := resp.GetTaskMessage() for _, taskMessage := range taskMessages { taskInfos = append(taskInfos, rcvr.convert2TaskInfo(taskMessage)) } } else { logger.Errorf("pull tasks of jobInstance=%d failed, respMsg=%s", jobInstanceId, resp.GetMessage()) } case <-timer.C: logger.Errorf("ClearTasks of JobInstanceId=%d in ServerTaskPersistence timeout", jobInstanceId) } return taskInfos, nil } func (rcvr *ServerTaskPersistence) UpdateTaskStatues(taskStatusInfos []*schedulerx.ContainerReportTaskStatusRequest) error { if len(taskStatusInfos) == 0 { return fmt.Errorf("update task statues empty") } info := taskStatusInfos[0] status2WorkIdAddr2TaskIds := getTaskStatusMap(taskStatusInfos) var batchTaskStatuesReq schedulerx.WorkerBatchReportTaskStatuesRequest for status, workerAddr2TaskIds := range status2WorkIdAddr2TaskIds { for workerIdAddr, taskIds := range workerAddr2TaskIds { workerIdAddrParts := strings.Split(workerIdAddr, "@") batchTaskStatues := &schedulerx.BatchTaskStatues{ Status: proto.Int32(status), WorkerId: proto.String(workerIdAddrParts[0]), WorkerAddr: proto.String(actorcomm.GetRealWorkerAddr(workerIdAddr)), TaskIds: taskIds, } batchTaskStatuesReq.TaskStatues = append(batchTaskStatuesReq.TaskStatues, batchTaskStatues) batchTaskStatuesReq.GroupId = proto.String(rcvr.groupId) } } batchTaskStatuesReq.JobInstanceId = proto.Int64(info.GetJobInstanceId()) actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ Msg: &batchTaskStatuesReq, } return nil } func (rcvr *ServerTaskPersistence) UpdateTaskStatus(jobInstanceId int64, taskIds []int64, status taskstatus.TaskStatus, workerId, workerAddr string) (int64, error) { var affectCnt int64 if len(taskIds) == 0 { return 0, fmt.Errorf("update task statuses empty") } req := &schedulerx.WorkerBatchReportTaskStatuesRequest{ JobInstanceId: proto.Int64(jobInstanceId), GroupId: proto.String(rcvr.groupId), } batchTaskStatues := &schedulerx.BatchTaskStatues{ Status: proto.Int32(int32(status)), WorkerId: proto.String(workerId), WorkerAddr: proto.String(workerAddr), TaskIds: taskIds, } req.TaskStatues = append(req.TaskStatues, batchTaskStatues) actorcomm.AtLeastOnceDeliveryMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{ Msg: req, } var resp *schedulerx.WorkerBatchReportTaskStatuesResponse if !resp.GetSuccess() { errMsg := fmt.Sprintf("batch update task Status of jobInstance=%d failed, err=%s", jobInstanceId, resp.GetMessage()) logger.Errorf(errMsg) return 0, fmt.Errorf(errMsg) } else { affectCnt = int64(resp.GetAffectCnt()) } return affectCnt, nil }