pkg/flow/dedup_priority_queue.go (71 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package flow
import (
"container/heap"
"github.com/emirpasic/gods/utils"
)
var _ heap.Interface = (*DedupPriorityQueue)(nil)
// Element represents an item in the DedupPriorityQueue.
type Element interface {
GetIndex() int
SetIndex(int)
}
// DedupPriorityQueue implements heap.Interface.
// DedupPriorityQueue is not thread-safe.
type DedupPriorityQueue struct {
comparator utils.Comparator
cache map[Element]struct{}
Items []Element
allowDuplicates bool
}
// NewPriorityQueue returns a new DedupPriorityQueue.
func NewPriorityQueue(comparator utils.Comparator, allowDuplicates bool) *DedupPriorityQueue {
return &DedupPriorityQueue{
comparator: comparator,
Items: make([]Element, 0),
cache: make(map[Element]struct{}),
allowDuplicates: allowDuplicates,
}
}
// Len returns the DedupPriorityQueue length.
func (pq *DedupPriorityQueue) Len() int { return len(pq.Items) }
// Less is the items less comparator.
func (pq *DedupPriorityQueue) Less(i, j int) bool {
return pq.comparator(pq.Items[i], pq.Items[j]) < 0
}
// Swap exchanges indexes of the items.
func (pq *DedupPriorityQueue) Swap(i, j int) {
pq.Items[i], pq.Items[j] = pq.Items[j], pq.Items[i]
pq.Items[i].SetIndex(i)
pq.Items[j].SetIndex(j)
}
// Push implements heap.Interface.Push.
// Appends an item to the DedupPriorityQueue.
func (pq *DedupPriorityQueue) Push(x interface{}) {
item := x.(Element)
// if duplicates is not allowed
if !pq.allowDuplicates {
// use mutex to protect cache and items
// check existence
if _, ok := pq.cache[item]; ok {
return
}
pq.cache[item] = struct{}{}
}
n := len(pq.Items)
item.SetIndex(n)
pq.Items = append(pq.Items, item)
}
// Pop implements heap.Interface.Pop.
// Removes and returns the Len() - 1 element.
func (pq *DedupPriorityQueue) Pop() interface{} {
n := len(pq.Items)
item := pq.Items[n-1]
item.SetIndex(-1) // for safety
delete(pq.cache, item)
pq.Items = pq.Items[0 : n-1]
return item
}
// Peek returns the first item of the DedupPriorityQueue without removing it.
func (pq *DedupPriorityQueue) Peek() Element {
if len(pq.Items) > 0 {
return (pq.Items)[0]
}
return nil
}
// ReplaceLowest replaces the lowest item with the newLowest.
func (pq *DedupPriorityQueue) ReplaceLowest(newLowest Element) {
pq.Items[0] = newLowest
heap.Fix(pq, 0)
}
// Values returns all items.
func (pq *DedupPriorityQueue) Values() []Element {
values := make([]Element, pq.Len())
for pq.Len() > 0 {
item := heap.Pop(pq).(Element)
values[pq.Len()] = item
}
return values
}