internal/masterpool/taskmaster_pool.go (67 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package masterpool
import (
"sync"
"github.com/alibaba/schedulerx-worker-go/internal/master/taskmaster"
"github.com/alibaba/schedulerx-worker-go/internal/tasks"
"github.com/alibaba/schedulerx-worker-go/internal/utils"
"github.com/alibaba/schedulerx-worker-go/logger"
)
var (
taskMasterPool *TaskMasterPool
once sync.Once
)
func InitTaskMasterPool(masterPool *TaskMasterPool) {
once.Do(func() {
taskMasterPool = masterPool
})
}
// GetTaskMasterPool must be executed after InitTaskMasterPool, otherwise it returns nil
func GetTaskMasterPool() *TaskMasterPool {
return taskMasterPool
}
type TaskMasterPool struct {
taskMasters sync.Map
tasks *tasks.TaskMap
}
func NewTaskMasterPool(tasks *tasks.TaskMap) *TaskMasterPool {
return &TaskMasterPool{
taskMasters: sync.Map{},
tasks: tasks,
}
}
func (p *TaskMasterPool) Tasks() *tasks.TaskMap {
return p.tasks
}
func (p *TaskMasterPool) Get(jobInstanceId int64) taskmaster.TaskMaster {
val, ok := p.taskMasters.Load(jobInstanceId)
if ok {
return val.(taskmaster.TaskMaster)
}
return nil
}
func (p *TaskMasterPool) Put(jobInstanceId int64, master taskmaster.TaskMaster) {
p.taskMasters.Store(jobInstanceId, master)
}
func (p *TaskMasterPool) Remove(jobInstanceId int64) {
p.taskMasters.Delete(jobInstanceId)
}
func (p *TaskMasterPool) Contains(jobInstanceId int64) bool {
_, ok := p.taskMasters.Load(jobInstanceId)
return ok
}
func (p *TaskMasterPool) GetInstanceIds(specifiedAppGroupId int64) []int64 {
set := utils.NewSet()
p.taskMasters.Range(func(key, val interface{}) bool {
if master, ok := val.(taskmaster.TaskMaster); ok {
if master.GetJobInstanceInfo() != nil {
if appGroupId := master.GetJobInstanceInfo().GetAppGroupId(); appGroupId == specifiedAppGroupId {
set.Add(master.GetJobInstanceInfo().GetJobInstanceId())
return true
}
}
} else {
logger.Infof("TaskMaster=%+v is not a valid type, expect TaskMaster", val)
}
return true
})
return set.ToInt64Slice()
}