config/worker_config.go (214 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"sync"
"time"
"github.com/alibaba/schedulerx-worker-go/internal/constants"
)
var (
workerConfig *WorkerConfig
once sync.Once
)
func InitWorkerConfig(cfg *WorkerConfig) {
once.Do(func() {
workerConfig = cfg
})
}
func GetWorkerConfig() *WorkerConfig {
if workerConfig != nil {
return workerConfig
}
return defaultWorkerConfig()
}
type Option func(*WorkerConfig)
func WithEnableShareContainerPool() Option {
return func(config *WorkerConfig) {
config.isShareContainerPool = true
}
}
func WithDisableMapMasterFailover() Option {
return func(config *WorkerConfig) {
config.isMapMasterFailover = false
}
}
func WithEnableSecondDelayIntervalMS() Option {
return func(config *WorkerConfig) {
config.isSecondDelayIntervalMS = true
}
}
func WithEnableDispatchSecondDelayStandalone() Option {
return func(config *WorkerConfig) {
config.isDispatchSecondDelayStandalone = true
}
}
func WithDisableBroadcastMasterExec() Option {
return func(config *WorkerConfig) {
config.broadcastMasterExecEnable = false
}
}
func WithBroadcastDispatchRetryTimes(broadcastDispatchRetryTimes int32) Option {
return func(config *WorkerConfig) {
config.broadcastDispatchRetryTimes = broadcastDispatchRetryTimes
}
}
func WithMapMasterPageSize(mapMasterPageSize int32) Option {
return func(config *WorkerConfig) {
config.mapMasterPageSize = mapMasterPageSize
}
}
func WithMapMasterQueueSize(mapMasterQueueSize int32) Option {
return func(config *WorkerConfig) {
config.mapMasterQueueSize = mapMasterQueueSize
}
}
func WithMapMasterDispatcherSize(mapMasterDispatcherSize int32) Option {
return func(config *WorkerConfig) {
config.mapMasterDispatcherSize = mapMasterDispatcherSize
}
}
func WithMapMasterStatusCheckInterval(mapMasterStatusCheckInterval time.Duration) Option {
return func(config *WorkerConfig) {
config.mapMasterStatusCheckInterval = mapMasterStatusCheckInterval
}
}
func WithSharePoolSize(sharePoolSize int32) Option {
return func(config *WorkerConfig) {
config.sharePoolSize = sharePoolSize
}
}
func WithWorkerParallelTaskMaxSize(workerParallelTaskMaxSize int32) Option {
return func(config *WorkerConfig) {
config.workerParallelTaskMaxSize = workerParallelTaskMaxSize
}
}
func WithWorkerMapPageSize(workerMapPageSize int32) Option {
return func(config *WorkerConfig) {
config.workerMapPageSize = workerMapPageSize
}
}
func WithTaskBodySizeMax(taskBodySizeMax int32) Option {
return func(config *WorkerConfig) {
config.taskBodySizeMax = taskBodySizeMax
}
}
func WithGrpcPort(port int32) Option {
return func(config *WorkerConfig) {
config.grpcPort = port
}
}
func WithIface(iface string) Option {
return func(config *WorkerConfig) {
config.iface = iface
}
}
func WithQueueSize(queueSize int32) Option {
return func(config *WorkerConfig) {
config.queueSize = queueSize
}
}
func WithLabel(label string) Option {
return func(config *WorkerConfig) {
config.label = label
}
}
func NewWorkerConfig(opts ...Option) *WorkerConfig {
once.Do(func() {
workerConfig = defaultWorkerConfig()
for _, opt := range opts {
opt(workerConfig)
}
})
return workerConfig
}
type WorkerConfig struct {
isShareContainerPool bool
isMapMasterFailover bool
isSecondDelayIntervalMS bool
isDispatchSecondDelayStandalone bool
broadcastMasterExecEnable bool
broadcastDispatchRetryTimes int32
mapMasterPageSize int32
mapMasterQueueSize int32
mapMasterDispatcherSize int32
mapMasterStatusCheckInterval time.Duration
sharePoolSize int32
workerParallelTaskMaxSize int32
workerMapPageSize int32
taskBodySizeMax int32
grpcPort int32
iface string
queueSize int32
label string
}
func (w *WorkerConfig) IsShareContainerPool() bool {
return w.isShareContainerPool
}
func (w *WorkerConfig) IsMapMasterFailover() bool {
return w.isMapMasterFailover
}
func (w *WorkerConfig) IsSecondDelayIntervalMS() bool {
return w.isSecondDelayIntervalMS
}
func (w *WorkerConfig) IsDispatchSecondDelayStandalone() bool {
return w.isDispatchSecondDelayStandalone
}
func (w *WorkerConfig) BroadcastMasterExecEnable() bool {
return w.broadcastMasterExecEnable
}
func (w *WorkerConfig) BroadcastDispatchRetryTimes() int32 {
return w.broadcastDispatchRetryTimes
}
func (w *WorkerConfig) MapMasterPageSize() int32 {
return w.mapMasterPageSize
}
func (w *WorkerConfig) MapMasterQueueSize() int32 {
return w.mapMasterQueueSize
}
func (w *WorkerConfig) MapMasterDispatcherSize() int32 {
return w.mapMasterDispatcherSize
}
func (w *WorkerConfig) MapMasterStatusCheckInterval() time.Duration {
return w.mapMasterStatusCheckInterval
}
func (w *WorkerConfig) SharePoolSize() int32 {
return w.sharePoolSize
}
func (w *WorkerConfig) WorkerParallelTaskMaxSize() int32 {
return w.workerParallelTaskMaxSize
}
func (w *WorkerConfig) WorkerMapPageSize() int32 {
return w.workerMapPageSize
}
func (w *WorkerConfig) TaskBodySizeMax() int32 {
return w.taskBodySizeMax
}
func (w *WorkerConfig) GrpcPort() int32 {
return w.grpcPort
}
func (w *WorkerConfig) Iface() string {
return w.iface
}
func (w *WorkerConfig) QueueSize() int32 {
return w.queueSize
}
func (w *WorkerConfig) Label() string {
return w.label
}
func defaultWorkerConfig() *WorkerConfig {
return &WorkerConfig{
isSecondDelayIntervalMS: false,
isShareContainerPool: false,
isDispatchSecondDelayStandalone: false,
isMapMasterFailover: true,
broadcastMasterExecEnable: true,
broadcastDispatchRetryTimes: constants.BroadcastDispatchRetryTimesDefault,
mapMasterPageSize: constants.MapMasterPageSizeDefault,
mapMasterQueueSize: constants.MapMasterQueueSizeDefault,
mapMasterDispatcherSize: constants.MapMasterDispatcherSizeDefault,
mapMasterStatusCheckInterval: constants.MapMasterStatusCheckIntervalDefault,
sharePoolSize: constants.SharedPoolSizeDefault,
workerParallelTaskMaxSize: constants.ParallelTaskListSizeMaxDefault,
workerMapPageSize: constants.WorkerMapPageSizeDefault,
taskBodySizeMax: constants.TaskBodySizeMaxDefault,
queueSize: constants.MapMasterQueueSizeDefault,
}
}