internal/batch/container_status_req_handler_pool.go (54 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package batch
import (
"sync"
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
)
var (
containerStatusReqHandlerPool *ContainerStatusReqHandlerPool
once sync.Once
)
func GetContainerStatusReqHandlerPool() *ContainerStatusReqHandlerPool {
once.Do(func() {
containerStatusReqHandlerPool = NewContainerStatusReqHandlerPool()
})
return containerStatusReqHandlerPool
}
// ContainerStatusReqHandlerPool a reqs handler per jobInstance
type ContainerStatusReqHandlerPool struct {
handlers *sync.Map // Map<Long, ContainerStatusReqHandler<ContainerReportTaskStatusRequest>>
}
func NewContainerStatusReqHandlerPool() *ContainerStatusReqHandlerPool {
return &ContainerStatusReqHandlerPool{
handlers: new(sync.Map),
}
}
func (p *ContainerStatusReqHandlerPool) Start(jobInstanceId int64, reqHandler *ContainerStatusReqHandler) {
// only process init phase;
// make sure no other already create mapping during sync blocking time range.
handler, ok := p.handlers.LoadOrStore(jobInstanceId, reqHandler)
if !ok {
if statusReqHandler, ok := handler.(*ContainerStatusReqHandler); ok {
statusReqHandler.Start(statusReqHandler)
}
}
}
func (p *ContainerStatusReqHandlerPool) Stop(jobInstanceId int64) {
handler, ok := p.handlers.LoadAndDelete(jobInstanceId)
if ok {
handler.(*ContainerStatusReqHandler).Stop()
handler = nil
}
}
func (p *ContainerStatusReqHandlerPool) Contains(jobInstanceId int64) bool {
_, ok := p.handlers.Load(jobInstanceId)
return ok
}
func (p *ContainerStatusReqHandlerPool) SubmitReq(jobInstanceId int64, req *schedulerx.ContainerReportTaskStatusRequest) bool {
success := false
handler, ok := p.handlers.Load(jobInstanceId)
if ok {
success = true
handler.(*ContainerStatusReqHandler).SubmitRequest(req)
}
return success
}
func (p *ContainerStatusReqHandlerPool) GetHandlers() *sync.Map {
return p.handlers
}