func()

in banyand/tsdb/buffer.go [210:259]


func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
	go func() {
		defer func() {
			for _, g := range []meter.Gauge{maxBytes, mutableBytes} {
				g.Delete(bsb.labelValues...)
			}
		}()
		defer bsb.flushWaitGroup.Done()
		for event := range bsb.flushCh {
			oldSkipList := event.data
			memSize := oldSkipList.MemSize()
			t1 := time.Now()
			for {
				if err := onFlushFn(bsb.index, oldSkipList); err != nil {
					bsb.log.Err(err).Msg("flushing immutable buffer failed. Retrying...")
					flushNum.Inc(1, append(bsb.labelValues[:2], "true")...)
					time.Sleep(time.Second)
					continue
				}
				break
			}
			flushLatency.Observe(time.Since(t1).Seconds(), bsb.shardLabelValues...)
			flushBytes.Inc(float64(memSize), bsb.shardLabelValues...)
			flushNum.Inc(1, append(bsb.shardLabelValues, "false")...)

			bsb.mutex.Lock()
			if len(bsb.immutables) > 0 {
				bsb.immutables = bsb.immutables[1:]
			}
			bsb.mutex.Unlock()
			oldSkipList.DecrRef()
		}
	}()
	go func() {
		defer bsb.writeWaitGroup.Done()
		volume := 0
		for op := range bsb.writeCh {
			k := y.KeyWithTs(op.key, op.epoch)
			v := y.ValueStruct{Value: op.value}
			volume += len(k) + int(v.EncodedSize()) + skl.MaxNodeSize + nodeAlign
			memSize := bsb.mutable.MemSize()
			mutableBytes.Set(float64(memSize), bsb.labelValues...)
			if volume >= bsb.capacity || memSize >= int64(bsb.capacity) {
				bsb.triggerFlushing()
				volume = 0
			}
			bsb.mutable.Put(k, v)
		}
	}()
}