internal/master/grid_task_master.go (66 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package master import ( "encoding/json" "fmt" "github.com/alibaba/schedulerx-worker-go/config" "github.com/asynkron/protoactor-go/actor" "github.com/alibaba/schedulerx-worker-go/internal/batch" "github.com/alibaba/schedulerx-worker-go/internal/common" "github.com/alibaba/schedulerx-worker-go/internal/constants" "github.com/alibaba/schedulerx-worker-go/internal/master/persistence" "github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster" "github.com/alibaba/schedulerx-worker-go/logger" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" ) var _ taskmaster.MapTaskMaster = &GridTaskMaster{} type GridTaskMaster struct { *MapTaskMaster } func NewGridTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) *GridTaskMaster { jobInstanceId := jobInstanceInfo.GetJobInstanceId() gridTaskMaster := &GridTaskMaster{ NewMapTaskMaster(jobInstanceInfo, actorCtx).(*MapTaskMaster), } gridTaskMaster.taskPersistence = persistence.GetH2MemoryPersistence() gridTaskMaster.taskPersistence.InitTable() gridTaskMaster.taskStatusReqQueue = batch.NewReqQueue(config.GetWorkerConfig().QueueSize()) gridTaskMaster.taskStatusReqBatchHandler = batch.NewTMStatusReqHandler(jobInstanceId, 1, 1, 3000, gridTaskMaster.taskStatusReqQueue) gridTaskMaster.taskBlockingQueue = batch.NewReqQueue(config.GetWorkerConfig().QueueSize()) if jobInstanceInfo.GetXattrs() != "" { gridTaskMaster.xAttrs = new(common.MapTaskXAttrs) if err := json.Unmarshal([]byte(jobInstanceInfo.GetXattrs()), gridTaskMaster.xAttrs); err != nil { logger.Errorf("Unmarshal xAttrs failed, err=%s", err.Error()) } } if gridTaskMaster.xAttrs != nil && gridTaskMaster.xAttrs.GetTaskDispatchMode() == string(common.TaskDispatchModePull) { gridTaskMaster.taskDispatchReqHandler = batch.NewTaskPullReqHandler(jobInstanceId, 1, 1, gridTaskMaster.pageSize*int32(len(jobInstanceInfo.GetAllWorkers())), gridTaskMaster.taskBlockingQueue).(*batch.TaskPullReqHandler) } else { gridTaskMaster.taskDispatchReqHandler = batch.NewTaskPushReqHandler(jobInstanceId, 1, 1, gridTaskMaster.pageSize*int32(len(jobInstanceInfo.GetAllWorkers())), gridTaskMaster.taskBlockingQueue, 3000) } return gridTaskMaster } // doMetricsCheck checks indicator information of the master func (m *GridTaskMaster) doMetricsCheck() { // FIXME how to get metric //vmDetail := MetricsCollector.getMetrics() //if vmDetail != nil { // usedMemoryPercent := vmDetail.getHeap5Usage() // if usedMemoryPercent > WorkerConstants.USER_MEMORY_PERCENT_MAX { // throw(NewIOException(fmt.Sprintf("%v%v%v%v%v", "used memory:", usedMemoryPercent*100, ",beyond ", WorkerConstants.USER_MEMORY_PERCENT_MAX*100, "%!"))) // } //} } func (m *GridTaskMaster) Map(jobCtx *jobcontext.JobContext, taskList [][]byte, taskName string) (bool, error) { if len(taskList) == 0 { errMsg := fmt.Sprintf("map taskList is empty, taskName:%v", taskName) logger.Warnf(errMsg) return false, fmt.Errorf(fmt.Sprintf("map taskList is empty, taskName:%v", taskName)) } logger.Infof("map taskList, jobInstanceId=%v, taskName:%v, taskList size:%v", m.GetJobInstanceInfo().GetJobInstanceId(), taskName, len(taskList)) counter := m.taskCounter.Add(int64(len(taskList))) if m.xAttrs != nil && m.xAttrs.GetTaskDispatchMode() == string(common.TaskDispatchModePull) && counter > constants.PullModeTaskSizeMax { logger.Errorf("jobInstanceId=%v, pullModel, task counter=%v, beyond %v", m.GetJobInstanceInfo().GetJobInstanceId(), counter, constants.PullModeTaskSizeMax) return false, fmt.Errorf(fmt.Sprintf("task size of pullModel can't beyond %d", constants.PullModeTaskSizeMax)) } m.doMetricsCheck() return m.MapTaskMaster.Map(jobCtx, taskList, taskName) } func (m *GridTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult { postResult := m.MapTaskMaster.PostFinish(jobInstanceId) if err := m.taskPersistence.ClearTasks(jobInstanceId); err != nil { logger.Errorf("Clear tasks failed in PostFinish, err=%s", err.Error()) } return postResult }