func()

in ingestor/cluster/replicator.go [119:221]


func (r *replicator) transfer(ctx context.Context) {
	defer r.wg.Done()

	for {
		select {
		case <-ctx.Done():
			return
		case batch := <-r.queue:
			segments := batch.Segments

			if err := func() error {
				paths := make([]string, len(segments))
				for i, seg := range segments {
					paths[i] = seg.Path
				}
				mr, err := wal.NewSegmentMerger(paths...)
				if err != nil && os.IsNotExist(err) {
					return nil
				} else if err != nil {
					return fmt.Errorf("open segments: %w", err)
				}
				defer mr.Close()

				// Merge batch of files into the first file at the destination.  This ensures we transfer
				// the full batch atomimcally.
				filename := filepath.Base(paths[0])

				db, table, schema, _, err := wal.ParseFilename(filename)
				if err != nil {
					return fmt.Errorf("parse segment filename: %w", err)
				}

				var key string
				if schema != "" {
					key = fmt.Sprintf("%s_%s_%s", db, table, schema)
				} else {
					key = fmt.Sprintf("%s_%s", db, table)
				}

				// Each metric is written to a distinct file.  The first part of the filename
				// is the metric name.  We use the metric name to determine which node owns
				// the metric.
				owner, addr := r.Partitioner.Owner([]byte(key))

				// We're the owner of the file... leave it for the ingestor to upload.
				if owner == r.hostname {
					return nil
				}

				// If the peer is not healthy, don't attempt transferring the segment.  This could happen if we marked
				// the peer unhealthy after we received the batch to process.
				if !r.Health.IsPeerHealthy(owner) {
					return nil
				}

				start := time.Now()
				if err = r.cli.Write(ctx, addr, filename, mr); err != nil {
					if errors.Is(err, ErrBadRequest{}) {
						// If ingestor returns a bad request, we should drop the segments as it means we're sending something
						// that won't be accepted.  Retrying will continue indefinitely.  In this case, just drop the file
						// and log the error.
						logger.Errorf("Failed to transfer segment %s to %s@%s: %s.  Dropping segments.", filename, owner, addr, err)
						if err := batch.Remove(); err != nil {
							logger.Errorf("Failed to remove segment: %s", err)
						}
						return nil
					} else if errors.Is(err, ErrSegmentExists) {
						// Segment already exists, remove our side so we don't keep retrying.
						if err := batch.Remove(); err != nil {
							logger.Errorf("Failed to remove segment: %s", err)
						}
						return nil
					} else if errors.Is(err, ErrSegmentLocked) {
						// Segment is locked, retry later.
						return nil
					} else if errors.Is(err, ErrPeerOverloaded) {
						// Ingestor is overloaded, mark the peer as unhealthy and retry later.
						r.Health.SetPeerUnhealthy(owner)
						return fmt.Errorf("transfer segment %s to %s: %w", filename, addr, err)
					}
					// Unknown error, assume it's transient and retry after some backoff.
					r.Health.SetPeerUnhealthy(owner)
					return err
				}

				for _, seg := range segments {
					if logger.IsDebug() {
						logger.Debugf("Transferred %s as %s to %s addr=%s duration=%s ", seg.Path, filename, owner, addr, time.Since(start).String())
					}
				}
				if err := batch.Remove(); err != nil {
					logger.Errorf("Failed to batch segment: %s", err)
				}

				return nil
			}(); err != nil {
				logger.Errorf("Failed to transfer batch: %v", err)
			}

			batch.Release()
		}
	}
}