ingestor/cluster/batcher.go (363 lines of code) (raw):

package cluster import ( "context" "fmt" "math" "os" "slices" "sort" "sync" "sync/atomic" "time" "github.com/Azure/adx-mon/metrics" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/adx-mon/pkg/partmap" "github.com/Azure/adx-mon/pkg/service" "github.com/Azure/adx-mon/pkg/wal" "github.com/Azure/adx-mon/storage" ) // DefaultMaxSegmentCount is the default maximum number of segments to include in a batch before creating a new batch. const DefaultMaxSegmentCount = 25 type Segmenter interface { Get(infos []wal.SegmentInfo, prefix string) []wal.SegmentInfo PrefixesByAge() []string Remove(si wal.SegmentInfo) } type BatcherOpts struct { StorageDir string MinUploadSize int64 MaxSegmentAge time.Duration MaxTransferSize int64 MaxTransferAge time.Duration MaxBatchSegments int Partitioner MetricPartitioner Segmenter Segmenter UploadQueue chan *Batch TransferQueue chan *Batch PeerHealthReporter PeerHealthReporter TransfersDisabled bool } type Batch struct { Segments []wal.SegmentInfo Database string Table string Prefix string batcher Batcher released bool removed bool mu sync.Mutex } // Release releases the segments in the batch so they can be processed again. func (b *Batch) Release() { b.batcher.Release(b) b.mu.Lock() b.released = true b.mu.Unlock() } // Remove removes the segments in the batch from disk. func (b *Batch) Remove() error { b.mu.Lock() b.removed = true b.mu.Unlock() return b.batcher.Remove(b) } func (b *Batch) Paths() []string { var paths []string for _, v := range b.Segments { paths = append(paths, v.Path) } return paths } func (b *Batch) IsReleased() bool { b.mu.Lock() defer b.mu.Unlock() return b.released } func (b *Batch) IsRemoved() bool { b.mu.Lock() defer b.mu.Unlock() return b.removed } type Batcher interface { service.Component BatchSegments() error UploadQueueSize() int TransferQueueSize() int SegmentsTotal() int64 SegmentsSize() int64 Release(batch *Batch) Remove(batch *Batch) error MaxSegmentAge() time.Duration } // Batcher manages WAL segments that are ready for upload to kusto or that need // to be transferred to another node. type batcher struct { uploadQueue chan *Batch transferQueue chan *Batch store storage.Store // pendingUploads is the number of batches ready for upload but not in the upload queue. pendingUploads uint64 // pendingTransfers is the number of batches ready for transfer but not in the transfer queue. pendingTransfer uint64 // segmentsTotal is the total number of segments on disk. segmentsTotal int64 // segmentsSize is the total size of segments on disk. segementsSize int64 // transferDisabled is set to true when transfers are disabled. transferDisabled bool wg sync.WaitGroup closeFn context.CancelFunc storageDir string Partitioner MetricPartitioner Segmenter Segmenter health PeerHealthReporter hostname string maxTransferAge time.Duration maxTransferSize int64 minUploadSize int64 maxSegmentAge time.Duration maxBatchSegments int tempSet []wal.SegmentInfo segments *partmap.Map[int] } func NewBatcher(opts BatcherOpts) Batcher { minUploadSize := opts.MinUploadSize if minUploadSize == 0 { minUploadSize = 100 * 1024 * 1024 // This is the minimal "optimal" size for kusto uploads. } maxBatchSegments := DefaultMaxSegmentCount if opts.MaxBatchSegments > 0 { maxBatchSegments = opts.MaxBatchSegments } return &batcher{ storageDir: opts.StorageDir, maxTransferAge: opts.MaxTransferAge, maxTransferSize: opts.MaxTransferSize, minUploadSize: minUploadSize, // This is the minimal "optimal" size for kusto uploads. maxBatchSegments: maxBatchSegments, // The maximum number of segments to include in a merged batch Partitioner: opts.Partitioner, Segmenter: opts.Segmenter, uploadQueue: opts.UploadQueue, transferQueue: opts.TransferQueue, health: opts.PeerHealthReporter, transferDisabled: opts.TransfersDisabled, segments: partmap.NewMap[int](64), } } func (b *batcher) Open(ctx context.Context) error { ctx, b.closeFn = context.WithCancel(ctx) var err error b.hostname, err = os.Hostname() if err != nil { return err } go b.watch(ctx) return nil } func (b *batcher) Close() error { b.closeFn() b.wg.Wait() return nil } func (b *batcher) TransferQueueSize() int { return len(b.transferQueue) + int(atomic.LoadUint64(&b.pendingTransfer)) } func (b *batcher) UploadQueueSize() int { return len(b.uploadQueue) + int(atomic.LoadUint64(&b.pendingUploads)) } func (b *batcher) SegmentsTotal() int64 { return atomic.LoadInt64(&b.segmentsTotal) } func (b *batcher) SegmentsSize() int64 { return atomic.LoadInt64(&b.segementsSize) } func (b *batcher) MaxSegmentAge() time.Duration { return b.maxSegmentAge } func (b *batcher) watch(ctx context.Context) { b.wg.Add(1) defer b.wg.Done() t := time.NewTicker(5 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: if err := b.BatchSegments(); err != nil { logger.Errorf("Failed to batch segments: %v", err) } } } } func (b *batcher) BatchSegments() error { owned, notOwned, err := b.processSegments() if err != nil { return fmt.Errorf("process segments: %w", err) } atomic.StoreUint64(&b.pendingUploads, uint64(len(owned))) atomic.StoreUint64(&b.pendingTransfer, uint64(len(notOwned))) metrics.IngestorQueueSize.WithLabelValues("upload").Set(float64(len(b.uploadQueue) + len(owned))) metrics.IngestorQueueSize.WithLabelValues("transfer").Set(float64(len(b.transferQueue) + len(notOwned))) for _, v := range owned { b.uploadQueue <- v } for _, v := range notOwned { b.transferQueue <- v } return nil } // processSegments returns the set of batches that are owned by the current instance and // the set that are owned by peers and need to be transferred. The owned slice may contain // segments that are owned by other peers if they are already past the max age or max size // thresholds. In addition, the batches are ordered as oldest first to allow for prioritizing // lagging segments over new ones. func (b *batcher) processSegments() ([]*Batch, []*Batch, error) { // Groups is b map of metrics name to b list of segments for that metric. groups := make(map[string][]wal.SegmentInfo) byAge := b.Segmenter.PrefixesByAge() // Order them by newest first so we process recent data first. slices.Reverse(byAge) // Add a small percentage of the oldest segments to the front of the list to ensure that // we're ticking away through the backlog. byAge = prioritizeOldest(byAge) // We need to find the segment that this node is responsible for uploading to kusto and ones that // need to be transferred to other nodes. var ( owned, notOwned []*Batch groupSize int totalFiles, totalSize int64 ) b.maxSegmentAge = 0 for _, prefix := range byAge { b.tempSet = b.Segmenter.Get(b.tempSet[:0], prefix) // Remove any segments that are already part of a batch so we don't try to double upload them b.tempSet = slices.DeleteFunc(b.tempSet, func(si wal.SegmentInfo) bool { n, _ := b.segments.Get(si.Path) return n > 0 }) // If all the segments are already part of a batch, skip this prefix. if len(b.tempSet) == 0 { continue } groupSize = 0 var oldestSegment time.Time for _, v := range b.tempSet { if oldestSegment.IsZero() || v.CreatedAt.Before(oldestSegment) { oldestSegment = v.CreatedAt } groupSize += int(v.Size) totalFiles++ } totalSize += int64(groupSize) metrics.IngestorSegmentsMaxAge.WithLabelValues(prefix).Set(time.Since(oldestSegment).Seconds()) metrics.IngestorSegmentsSizeBytes.WithLabelValues(prefix).Set(float64(groupSize)) metrics.IngestorSegmentsTotal.WithLabelValues(prefix).Set(float64(len(b.tempSet))) if v := time.Since(oldestSegment); v > b.maxSegmentAge { b.maxSegmentAge = v } groups[prefix] = append(groups[prefix], b.tempSet...) } // For each sample, sort the segments by name. The last segment is the current segment. for _, prefix := range byAge { v := groups[prefix] if len(v) == 0 { continue } sort.Slice(v, func(i, j int) bool { return v[i].Path < v[j].Path }) var ( batchSize int64 batch *Batch ) db, table, _, _, err := wal.ParseFilename(v[0].Path) if err != nil { logger.Errorf("Failed to parse segment filename: %s", err) continue } batch = &Batch{ Prefix: prefix, Database: db, Table: table, batcher: b, } for _, si := range v { batch.Segments = append(batch.Segments, si) batchSize += si.Size // Record that this segment is part of a batch. _ = b.segments.Mutate(si.Path, func(n int) (int, error) { return n + 1, nil }) // Prevent trying to combine an unbounded number of segments at once even if they are very small. This // can incur a lot of CPU time and slower transfers when there are hundreds of segments that can be combined. if len(batch.Segments) >= b.maxBatchSegments { if logger.IsDebug() { logger.Debugf("Batch %s is merging more than %d segments, uploading directly", si.Path, 25) } owned = append(owned, batch) batch = &Batch{ Prefix: prefix, Database: db, Table: table, batcher: b, } batchSize = 0 continue } // The batch is at the optimal size for uploading to kusto, upload directly and start a new batch. if b.minUploadSize > 0 && batchSize >= b.minUploadSize { if logger.IsDebug() { logger.Debugf("Batch %s is larger than %dMB (%d), uploading directly", si.Path, (b.minUploadSize)/1e6, batchSize) } owned = append(owned, batch) batch = &Batch{ Prefix: prefix, Database: db, Table: table, batcher: b, } batchSize = 0 continue } if b.maxTransferSize > 0 && batchSize >= b.maxTransferSize { if logger.IsDebug() { logger.Debugf("Batch %s is larger than %dMB (%d), uploading directly", si.Path, b.maxTransferSize/1e6, batchSize) } owned = append(owned, batch) batch = &Batch{ Prefix: prefix, Database: db, Table: table, batcher: b, } batchSize = 0 continue } createdAt := si.CreatedAt // If the file has been on disk for more than 30 seconds, we're behind on uploading so upload it directly // ourselves vs transferring it to another node. This could result in suboptimal upload batches, but we'd // rather take that hit than have b node that's behind on uploading. if b.maxTransferAge > 0 && time.Since(createdAt) > b.maxTransferAge { if logger.IsDebug() { logger.Debugf("File %s is older than %s (%s) seconds, uploading directly", si.Path, b.maxTransferAge.String(), time.Since(createdAt).String()) } owned = append(owned, batch) batch = &Batch{ Prefix: prefix, Database: db, Table: table, batcher: b, } batchSize = 0 continue } } if len(batch.Segments) == 0 { continue } owner, _ := b.Partitioner.Owner([]byte(prefix)) // If the peer has signaled that it's unhealthy, upload the segments directly. peerHealthy := b.health.IsPeerHealthy(owner) if owner == b.hostname || !peerHealthy || b.transferDisabled { owned = append(owned, batch) } else { notOwned = append(notOwned, batch) } } atomic.StoreInt64(&b.segmentsTotal, totalFiles) atomic.StoreInt64(&b.segementsSize, totalSize) return owned, notOwned, nil } func (b *batcher) Release(batch *Batch) { for _, si := range batch.Segments { // Remove the segment from the map if it's no longer part of b batch so we don't leak keys _, _ = b.segments.Delete(si.Path) } } func (b *batcher) Remove(batch *Batch) error { for _, si := range batch.Segments { b.Segmenter.Remove(si) err := os.Remove(si.Path) if err != nil && !os.IsNotExist(err) { logger.Errorf("Failed to remove segment %s: %s", si.Path, err) continue } } return nil } func prioritizeOldest(a []string) []string { var b []string // Find the index that is roughly 10% from the end of the list idx := len(a) - int(math.Round(float64(len(a))*0.2)) // Move last 20% of batches to the front of the list b = append(b, a[idx:]...) // Reverse the list so the oldest batches are first slices.Reverse(b) // Move first 80% of batches to the end of the list b = append(b, a[:idx]...) return b }