internal/master/time_queue.go (58 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package master
import (
"github.com/alibaba/schedulerx-worker-go/internal/utils"
"github.com/alibaba/schedulerx-worker-go/logger"
)
// TimeQueue Time queue sorted by scheduling time and task priority
type TimeQueue struct {
timeSet *utils.ConcurrentSet // Set<TimePlanEntry>
timeQueue *utils.PriorityQueue // Queue<TimePlanEntry> priority queue, sorted from small to large by scheduling time
}
func NewTimeQueue() *TimeQueue {
return &TimeQueue{
timeSet: utils.NewConcurrentSet(),
timeQueue: utils.NewPriorityQueue(100),
}
}
func (q *TimeQueue) Add(timePlanEntry *TimePlanEntry) {
if !q.timeSet.Contains(timePlanEntry) {
q.timeSet.Add(timePlanEntry)
q.timeQueue.PushItem(timePlanEntry)
logger.Infof("timeQueue add plan=%+v", timePlanEntry)
} else {
logger.Warnf("plan=%+v is existed in timeQueue", timePlanEntry)
}
}
func (q *TimeQueue) Remove(jobInstanceId int64) {
for q.timeQueue.Len() > 0 {
planEntry := q.timeQueue.Peek().(*TimePlanEntry)
if jobInstanceId == planEntry.jobInstanceId {
q.timeQueue.Pop()
q.timeSet.Remove(planEntry)
logger.Infof("planEntry=%+v removed, event.getTriggerType() != null", planEntry)
}
}
}
// Peek return the head of this queue, or returns null if this queue is empty.
func (q *TimeQueue) Peek() *TimePlanEntry {
if item := q.timeQueue.Peek(); item != nil {
return item.(*TimePlanEntry)
}
return nil
}
// RemoveHeader removes the head of this queue.
func (q *TimeQueue) RemoveHeader() *TimePlanEntry {
var planEntry *TimePlanEntry
if item := q.timeQueue.Pop(); item != nil {
planEntry = item.(*TimePlanEntry)
q.timeSet.Remove(planEntry)
}
return planEntry
}
func (q *TimeQueue) IsEmpty() bool {
return q.timeQueue.Len() == 0
}
func (q *TimeQueue) Size() int {
return q.timeQueue.Len()
}
func (q *TimeQueue) Clear() {
q.timeSet.Clear()
q.timeQueue.Clear()
}