in banyand/tsdb/buffer.go [210:259]
func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
go func() {
defer func() {
for _, g := range []meter.Gauge{maxBytes, mutableBytes} {
g.Delete(bsb.labelValues...)
}
}()
defer bsb.flushWaitGroup.Done()
for event := range bsb.flushCh {
oldSkipList := event.data
memSize := oldSkipList.MemSize()
t1 := time.Now()
for {
if err := onFlushFn(bsb.index, oldSkipList); err != nil {
bsb.log.Err(err).Msg("flushing immutable buffer failed. Retrying...")
flushNum.Inc(1, append(bsb.labelValues[:2], "true")...)
time.Sleep(time.Second)
continue
}
break
}
flushLatency.Observe(time.Since(t1).Seconds(), bsb.shardLabelValues...)
flushBytes.Inc(float64(memSize), bsb.shardLabelValues...)
flushNum.Inc(1, append(bsb.shardLabelValues, "false")...)
bsb.mutex.Lock()
if len(bsb.immutables) > 0 {
bsb.immutables = bsb.immutables[1:]
}
bsb.mutex.Unlock()
oldSkipList.DecrRef()
}
}()
go func() {
defer bsb.writeWaitGroup.Done()
volume := 0
for op := range bsb.writeCh {
k := y.KeyWithTs(op.key, op.epoch)
v := y.ValueStruct{Value: op.value}
volume += len(k) + int(v.EncodedSize()) + skl.MaxNodeSize + nodeAlign
memSize := bsb.mutable.MemSize()
mutableBytes.Set(float64(memSize), bsb.labelValues...)
if volume >= bsb.capacity || memSize >= int64(bsb.capacity) {
bsb.triggerFlushing()
volume = 0
}
bsb.mutable.Put(k, v)
}
}()
}