pkg/export/shard.go (111 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package export
import (
"fmt"
"sync"
monitoring_pb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
)
// shard holds a queue of data for a subset of samples.
type shard struct {
mtx sync.Mutex
queue *queue
pending bool
// A cache of series IDs that have been added to the batch in fill already.
// It's only part of the struct to not re-allocate on each call to fill.
seen map[uint64]struct{}
}
func newShard(queueSize uint) *shard {
return &shard{
queue: newQueue(queueSize),
seen: map[uint64]struct{}{},
}
}
func (s *shard) enqueue(hash uint64, sample *monitoring_pb.TimeSeries) {
s.mtx.Lock()
defer s.mtx.Unlock()
e := queueEntry{
hash: hash,
sample: sample,
}
if !s.queue.add(e) {
// TODO(freinartz): tail drop is not a great solution. Once we have the WAL buffer,
// we can just block here when enqueueing from it.
samplesDropped.WithLabelValues("queue-full").Inc()
}
}
// fill adds samples to the batch until its capacity is reached or the shard
// has no more samples for series that are not in the batch yet.
func (s *shard) fill(batch *batch) (took, remaining int) {
s.mtx.Lock()
defer s.mtx.Unlock()
shardProcess.Inc()
if s.pending {
shardProcessPending.Inc()
return 0, s.queue.length()
}
n := 0
for !batch.full() {
e, ok := s.queue.peek()
if !ok {
break
}
// If we already added a sample for the same series to the batch, stop
// the filling entirely.
if _, ok := s.seen[e.hash]; ok {
break
}
s.queue.remove()
batch.add(e.sample)
s.seen[e.hash] = struct{}{}
n++
}
if n > 0 {
s.setPending(true)
batch.addShard(s)
shardProcessSamplesTaken.Observe(float64(n))
}
// Clear seen cache. Because the shard is now pending, we won't add any more data
// to the batch, even if fill was called again.
for k := range s.seen {
delete(s.seen, k)
}
return n, s.queue.length()
}
func (s *shard) setPending(b bool) {
// This case should never happen in our usage of shards unless there is a bug.
if s.pending == b {
panic(fmt.Sprintf("pending set to %v while it already was", b))
}
s.pending = b
}
func (s *shard) notifyDone() {
s.mtx.Lock()
defer s.mtx.Unlock()
s.setPending(false)
}
type queue struct {
buf []queueEntry
head, tail int
len int
}
type queueEntry struct {
hash uint64
sample *monitoring_pb.TimeSeries
}
func newQueue(size uint) *queue {
return &queue{buf: make([]queueEntry, size)}
}
func (q *queue) length() int {
return q.len
}
func (q *queue) add(e queueEntry) bool {
if q.len == len(q.buf) {
return false
}
q.buf[q.tail] = e
q.tail = (q.tail + 1) % len(q.buf)
q.len++
return true
}
func (q *queue) peek() (queueEntry, bool) {
if q.len < 1 {
return queueEntry{}, false
}
return q.buf[q.head], true
}
func (q *queue) remove() bool {
if q.len < 1 {
return false
}
q.buf[q.head] = queueEntry{} // resetting makes debugging easier
q.head = (q.head + 1) % len(q.buf)
q.len--
return true
}