internal/master/persistence/h2_persistence.go (192 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package persistence import ( "fmt" "sync" "time" "github.com/alibaba/schedulerx-worker-go/config" "github.com/alibaba/schedulerx-worker-go/internal/common" "github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx" "github.com/alibaba/schedulerx-worker-go/logger" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) var ( _ TaskPersistence = &H2Persistence{} h2pOnce sync.Once h2Persistence *H2Persistence ) func GetH2Persistence() *H2Persistence { h2pOnce.Do(func() { h2Persistence = NewH2Persistence() }) return h2Persistence } // H2Persistence is a singleton, so that only executes initTable on the first initialization type H2Persistence struct { h2CP *H2ConnectionPool taskDao *TaskDao inited bool lock sync.RWMutex } func NewH2Persistence() *H2Persistence { return &H2Persistence{ inited: false, } } func (rcvr *H2Persistence) BatchUpdateTaskStatus(jobInstanceId int64, status taskstatus.TaskStatus, workerId string, workerAddr string) int64 { var ( affectCnt int64 err error ) for i := 0; i < 3; i++ { if config.GetWorkerConfig().IsMapMasterFailover() { affectCnt, err = rcvr.taskDao.BatchUpdateStatus2(jobInstanceId, int(status), workerId, workerAddr) } else { affectCnt, err = rcvr.taskDao.BatchDeleteTasks2(jobInstanceId, workerId, workerAddr) } if err != nil { logger.Errorf("batchUpdateTaskStatus error, try after 1000ms, err=%s", err.Error()) time.Sleep(1000 * time.Millisecond) continue } } return affectCnt } func (rcvr *H2Persistence) CheckInstanceStatus(jobInstanceId int64) processor.InstanceStatus { instanceStatus := processor.InstanceStatusSucceed existed, err := rcvr.taskDao.Exist(jobInstanceId) if err != nil { logger.Errorf("CheckInstanceStatus from H2Persistence failed, err=%s", err.Error()) return instanceStatus } if existed { instanceStatus = processor.InstanceStatusRunning } return instanceStatus } func (rcvr *H2Persistence) ClearTasks(jobInstanceId int64) error { _, err := rcvr.taskDao.DeleteByJobInstanceId(jobInstanceId) return err } func (rcvr *H2Persistence) convert2TaskInfo(taskSnapshot *TaskSnapshot) *common.TaskInfo { taskInfo := new(common.TaskInfo) taskInfo.SetTaskId(taskSnapshot.GetTaskId()) taskInfo.SetTaskName(taskSnapshot.GetTaskName()) taskInfo.SetTaskBody(taskSnapshot.GetTaskBody()) taskInfo.SetJobId(taskSnapshot.GetJobId()) taskInfo.SetJobInstanceId(taskSnapshot.GetJobInstanceId()) return taskInfo } func (rcvr *H2Persistence) convert2TaskSnapshot(taskInfo *common.TaskInfo) *TaskSnapshot { taskSnapshot := NewTaskSnapshot() taskSnapshot.SetJobId(taskInfo.JobId()) taskSnapshot.SetJobInstanceId(taskInfo.JobInstanceId()) taskSnapshot.SetTaskId(taskInfo.TaskId()) taskSnapshot.SetTaskName(taskInfo.TaskName()) taskSnapshot.SetTaskBody(taskInfo.TaskBody()) return taskSnapshot } func (rcvr *H2Persistence) CreateTask(jobId, jobInstanceId, taskId int64, taskName string, taskBody []byte) error { return rcvr.taskDao.Insert(jobId, jobInstanceId, taskId, taskName, taskBody) } func (rcvr *H2Persistence) CreateTasks(containers []*schedulerx.MasterStartContainerRequest, workerId, workerAddr string) error { succeed := false for i := 0; i < 3; i++ { affectCnt, err := rcvr.taskDao.BatchInsert(containers, workerId, workerAddr) if err != nil { logger.Warnf("batch insert tasks error, try after 1000ms, err=%s", err.Error()) time.Sleep(1000 * time.Millisecond) continue } logger.Debugf("batch insert tasks succeed, affectCnt=%d", affectCnt) succeed = true break } if !succeed { return fmt.Errorf(fmt.Sprintf("batch insert tasks error, workerId=%s, workerAddr=%s", workerId, workerAddr)) } return nil } // GetDistinctInstanceIds get the remaining terminated but undeleted instances in H2 func (rcvr *H2Persistence) GetDistinctInstanceIds() []int64 { ret, err := rcvr.taskDao.GetDistinctInstanceIds() if err != nil { logger.Errorf("GetDistinctInstanceIds failed, err=%s", err.Error()) return nil } return ret } // GetTaskStatistics get H2 tasks summary statistics func (rcvr *H2Persistence) GetTaskStatistics() *common.TaskStatistics { ret, err := rcvr.taskDao.GetTaskStatistics() if err != nil { logger.Errorf("GetDistinctInstanceIds failed, err=%s", err.Error()) return nil } return ret } func (rcvr *H2Persistence) InitTable() { rcvr.lock.Lock() defer rcvr.lock.Unlock() if !rcvr.inited { rcvr.taskDao.DropTable() rcvr.taskDao.CreateTable() rcvr.inited = true } } func (rcvr *H2Persistence) IsInited() bool { rcvr.lock.RLock() defer rcvr.lock.RUnlock() return rcvr.inited } func (rcvr *H2Persistence) Pull(jobInstanceId int64, pageSize int32) ([]*common.TaskInfo, error) { var taskInfoList []*common.TaskInfo taskSnapshots, err := rcvr.taskDao.QueryTaskList(jobInstanceId, int(taskstatus.TaskStatusInit), pageSize) if err != nil { return nil, err } if len(taskSnapshots) > 0 { var taskIdList []int64 for _, taskSnapshot := range taskSnapshots { taskIdList = append(taskIdList, taskSnapshot.GetTaskId()) taskInfoList = append(taskInfoList, rcvr.convert2TaskInfo(taskSnapshot)) } // FIXME if failed 3 times? for i := 0; i < 3; i++ { _, err := rcvr.taskDao.BatchUpdateStatus(jobInstanceId, taskIdList, int(taskstatus.TaskStatusPulled)) if err != nil { logger.Warnf("batchUpdateStatus error, try after 1000ms, err=%s", err.Error()) time.Sleep(1000 * time.Millisecond) continue } break } } return taskInfoList, nil } // UpdateTaskStatues . /* * !!!Attention!!! For Grid/Batch tasks, this method invoked only when finish statuses updated. * In order to reduce h2 size, this method will delete all finish tasks; * taskStatusInfos list of task Status */ func (rcvr *H2Persistence) UpdateTaskStatues(taskStatusInfos []*schedulerx.ContainerReportTaskStatusRequest) error { if len(taskStatusInfos) == 0 { return nil } // update task statues always batch by same job instance jobInstanceId := taskStatusInfos[0].GetJobInstanceId() var taskIds []int64 for _, taskStatusInfo := range taskStatusInfos { taskStatus := taskstatus.TaskStatus(taskStatusInfo.GetStatus()) if taskStatus.IsFinished() { taskIds = append(taskIds, taskStatusInfo.GetTaskId()) } } _, err := rcvr.taskDao.BatchDeleteTasks(jobInstanceId, taskIds) return err } func (rcvr *H2Persistence) UpdateTaskStatus(jobInstanceId int64, taskIds []int64, status taskstatus.TaskStatus, workerId, workerAddr string) (int64, error) { var ( affectCnt int64 err error ) if len(taskIds) == 0 { return 0, nil } affectCnt, err = rcvr.taskDao.UpdateStatus2(jobInstanceId, taskIds, int(status), workerId, workerAddr) if err != nil { logger.Errorf("JobInstanceId=%d, updateTaskStatus failed, err=%s", jobInstanceId, err.Error()) return 0, err } return affectCnt, nil }