in ingestor/cluster/batcher.go [264:454]
func (b *batcher) processSegments() ([]*Batch, []*Batch, error) {
// Groups is b map of metrics name to b list of segments for that metric.
groups := make(map[string][]wal.SegmentInfo)
byAge := b.Segmenter.PrefixesByAge()
// Order them by newest first so we process recent data first.
slices.Reverse(byAge)
// Add a small percentage of the oldest segments to the front of the list to ensure that
// we're ticking away through the backlog.
byAge = prioritizeOldest(byAge)
// We need to find the segment that this node is responsible for uploading to kusto and ones that
// need to be transferred to other nodes.
var (
owned, notOwned []*Batch
groupSize int
totalFiles, totalSize int64
)
b.maxSegmentAge = 0
for _, prefix := range byAge {
b.tempSet = b.Segmenter.Get(b.tempSet[:0], prefix)
// Remove any segments that are already part of a batch so we don't try to double upload them
b.tempSet = slices.DeleteFunc(b.tempSet, func(si wal.SegmentInfo) bool {
n, _ := b.segments.Get(si.Path)
return n > 0
})
// If all the segments are already part of a batch, skip this prefix.
if len(b.tempSet) == 0 {
continue
}
groupSize = 0
var oldestSegment time.Time
for _, v := range b.tempSet {
if oldestSegment.IsZero() || v.CreatedAt.Before(oldestSegment) {
oldestSegment = v.CreatedAt
}
groupSize += int(v.Size)
totalFiles++
}
totalSize += int64(groupSize)
metrics.IngestorSegmentsMaxAge.WithLabelValues(prefix).Set(time.Since(oldestSegment).Seconds())
metrics.IngestorSegmentsSizeBytes.WithLabelValues(prefix).Set(float64(groupSize))
metrics.IngestorSegmentsTotal.WithLabelValues(prefix).Set(float64(len(b.tempSet)))
if v := time.Since(oldestSegment); v > b.maxSegmentAge {
b.maxSegmentAge = v
}
groups[prefix] = append(groups[prefix], b.tempSet...)
}
// For each sample, sort the segments by name. The last segment is the current segment.
for _, prefix := range byAge {
v := groups[prefix]
if len(v) == 0 {
continue
}
sort.Slice(v, func(i, j int) bool {
return v[i].Path < v[j].Path
})
var (
batchSize int64
batch *Batch
)
db, table, _, _, err := wal.ParseFilename(v[0].Path)
if err != nil {
logger.Errorf("Failed to parse segment filename: %s", err)
continue
}
batch = &Batch{
Prefix: prefix,
Database: db,
Table: table,
batcher: b,
}
for _, si := range v {
batch.Segments = append(batch.Segments, si)
batchSize += si.Size
// Record that this segment is part of a batch.
_ = b.segments.Mutate(si.Path, func(n int) (int, error) {
return n + 1, nil
})
// Prevent trying to combine an unbounded number of segments at once even if they are very small. This
// can incur a lot of CPU time and slower transfers when there are hundreds of segments that can be combined.
if len(batch.Segments) >= b.maxBatchSegments {
if logger.IsDebug() {
logger.Debugf("Batch %s is merging more than %d segments, uploading directly", si.Path, 25)
}
owned = append(owned, batch)
batch = &Batch{
Prefix: prefix,
Database: db,
Table: table,
batcher: b,
}
batchSize = 0
continue
}
// The batch is at the optimal size for uploading to kusto, upload directly and start a new batch.
if b.minUploadSize > 0 && batchSize >= b.minUploadSize {
if logger.IsDebug() {
logger.Debugf("Batch %s is larger than %dMB (%d), uploading directly", si.Path, (b.minUploadSize)/1e6, batchSize)
}
owned = append(owned, batch)
batch = &Batch{
Prefix: prefix,
Database: db,
Table: table,
batcher: b,
}
batchSize = 0
continue
}
if b.maxTransferSize > 0 && batchSize >= b.maxTransferSize {
if logger.IsDebug() {
logger.Debugf("Batch %s is larger than %dMB (%d), uploading directly", si.Path, b.maxTransferSize/1e6, batchSize)
}
owned = append(owned, batch)
batch = &Batch{
Prefix: prefix,
Database: db,
Table: table,
batcher: b,
}
batchSize = 0
continue
}
createdAt := si.CreatedAt
// If the file has been on disk for more than 30 seconds, we're behind on uploading so upload it directly
// ourselves vs transferring it to another node. This could result in suboptimal upload batches, but we'd
// rather take that hit than have b node that's behind on uploading.
if b.maxTransferAge > 0 && time.Since(createdAt) > b.maxTransferAge {
if logger.IsDebug() {
logger.Debugf("File %s is older than %s (%s) seconds, uploading directly", si.Path, b.maxTransferAge.String(), time.Since(createdAt).String())
}
owned = append(owned, batch)
batch = &Batch{
Prefix: prefix,
Database: db,
Table: table,
batcher: b,
}
batchSize = 0
continue
}
}
if len(batch.Segments) == 0 {
continue
}
owner, _ := b.Partitioner.Owner([]byte(prefix))
// If the peer has signaled that it's unhealthy, upload the segments directly.
peerHealthy := b.health.IsPeerHealthy(owner)
if owner == b.hostname || !peerHealthy || b.transferDisabled {
owned = append(owned, batch)
} else {
notOwned = append(notOwned, batch)
}
}
atomic.StoreInt64(&b.segmentsTotal, totalFiles)
atomic.StoreInt64(&b.segementsSize, totalSize)
return owned, notOwned, nil
}