func()

in ingestor/cluster/batcher.go [264:454]


func (b *batcher) processSegments() ([]*Batch, []*Batch, error) {
	// Groups is b map of metrics name to b list of segments for that metric.
	groups := make(map[string][]wal.SegmentInfo)

	byAge := b.Segmenter.PrefixesByAge()

	// Order them by newest first so we process recent data first.
	slices.Reverse(byAge)

	// Add a small percentage of the oldest segments to the front of the list to ensure that
	// we're ticking away through the backlog.
	byAge = prioritizeOldest(byAge)

	// We need to find the segment that this node is responsible for uploading to kusto and ones that
	// need to be transferred to other nodes.
	var (
		owned, notOwned       []*Batch
		groupSize             int
		totalFiles, totalSize int64
	)

	b.maxSegmentAge = 0

	for _, prefix := range byAge {
		b.tempSet = b.Segmenter.Get(b.tempSet[:0], prefix)

		// Remove any segments that are already part of a batch so we don't try to double upload them
		b.tempSet = slices.DeleteFunc(b.tempSet, func(si wal.SegmentInfo) bool {
			n, _ := b.segments.Get(si.Path)
			return n > 0
		})

		// If all the segments are already part of a batch, skip this prefix.
		if len(b.tempSet) == 0 {
			continue
		}

		groupSize = 0
		var oldestSegment time.Time
		for _, v := range b.tempSet {
			if oldestSegment.IsZero() || v.CreatedAt.Before(oldestSegment) {
				oldestSegment = v.CreatedAt
			}
			groupSize += int(v.Size)
			totalFiles++
		}

		totalSize += int64(groupSize)

		metrics.IngestorSegmentsMaxAge.WithLabelValues(prefix).Set(time.Since(oldestSegment).Seconds())
		metrics.IngestorSegmentsSizeBytes.WithLabelValues(prefix).Set(float64(groupSize))
		metrics.IngestorSegmentsTotal.WithLabelValues(prefix).Set(float64(len(b.tempSet)))
		if v := time.Since(oldestSegment); v > b.maxSegmentAge {
			b.maxSegmentAge = v
		}
		groups[prefix] = append(groups[prefix], b.tempSet...)
	}

	// For each sample, sort the segments by name.  The last segment is the current segment.
	for _, prefix := range byAge {
		v := groups[prefix]

		if len(v) == 0 {
			continue
		}

		sort.Slice(v, func(i, j int) bool {
			return v[i].Path < v[j].Path
		})

		var (
			batchSize int64
			batch     *Batch
		)

		db, table, _, _, err := wal.ParseFilename(v[0].Path)
		if err != nil {
			logger.Errorf("Failed to parse segment filename: %s", err)
			continue
		}

		batch = &Batch{
			Prefix:   prefix,
			Database: db,
			Table:    table,
			batcher:  b,
		}

		for _, si := range v {
			batch.Segments = append(batch.Segments, si)
			batchSize += si.Size

			// Record that this segment is part of a batch.
			_ = b.segments.Mutate(si.Path, func(n int) (int, error) {
				return n + 1, nil
			})

			// Prevent trying to combine an unbounded number of segments at once even if they are very small.  This
			// can incur a lot of CPU time and slower transfers when there are hundreds of segments that can be combined.
			if len(batch.Segments) >= b.maxBatchSegments {
				if logger.IsDebug() {
					logger.Debugf("Batch %s is merging more than %d segments, uploading directly", si.Path, 25)
				}

				owned = append(owned, batch)
				batch = &Batch{
					Prefix:   prefix,
					Database: db,
					Table:    table,
					batcher:  b,
				}
				batchSize = 0
				continue
			}

			// The batch is at the optimal size for uploading to kusto, upload directly and start a new batch.
			if b.minUploadSize > 0 && batchSize >= b.minUploadSize {
				if logger.IsDebug() {
					logger.Debugf("Batch %s is larger than %dMB (%d), uploading directly", si.Path, (b.minUploadSize)/1e6, batchSize)
				}

				owned = append(owned, batch)
				batch = &Batch{
					Prefix:   prefix,
					Database: db,
					Table:    table,
					batcher:  b,
				}
				batchSize = 0
				continue
			}

			if b.maxTransferSize > 0 && batchSize >= b.maxTransferSize {
				if logger.IsDebug() {
					logger.Debugf("Batch %s is larger than %dMB (%d), uploading directly", si.Path, b.maxTransferSize/1e6, batchSize)
				}

				owned = append(owned, batch)
				batch = &Batch{
					Prefix:   prefix,
					Database: db,
					Table:    table,
					batcher:  b,
				}
				batchSize = 0
				continue
			}

			createdAt := si.CreatedAt

			// If the file has been on disk for more than 30 seconds, we're behind on uploading so upload it directly
			// ourselves vs transferring it to another node.  This could result in suboptimal upload batches, but we'd
			// rather take that hit than have b node that's behind on uploading.
			if b.maxTransferAge > 0 && time.Since(createdAt) > b.maxTransferAge {
				if logger.IsDebug() {
					logger.Debugf("File %s is older than %s (%s) seconds, uploading directly", si.Path, b.maxTransferAge.String(), time.Since(createdAt).String())
				}
				owned = append(owned, batch)
				batch = &Batch{
					Prefix:   prefix,
					Database: db,
					Table:    table,
					batcher:  b,
				}
				batchSize = 0
				continue

			}
		}

		if len(batch.Segments) == 0 {
			continue
		}

		owner, _ := b.Partitioner.Owner([]byte(prefix))

		// If the peer has signaled that it's unhealthy, upload the segments directly.
		peerHealthy := b.health.IsPeerHealthy(owner)

		if owner == b.hostname || !peerHealthy || b.transferDisabled {
			owned = append(owned, batch)
		} else {
			notOwned = append(notOwned, batch)
		}
	}

	atomic.StoreInt64(&b.segmentsTotal, totalFiles)
	atomic.StoreInt64(&b.segementsSize, totalSize)

	return owned, notOwned, nil
}