ingestor/cluster/replicator.go (162 lines of code) (raw):
package cluster
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/service"
"github.com/Azure/adx-mon/pkg/wal"
)
type ReplicatorOpts struct {
// Partitioner is used to determine which node owns a given metric.
Partitioner MetricPartitioner
// Health is used to report the health of the peer replication.
Health PeerHealthReporter
// SegmentRemover is used to remove segments after they have been replicated.
SegmentRemover SegmentRemover
// InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
InsecureSkipVerify bool
// Hostname is the name of the current node.
Hostname string
// MaxTransferConcurrency is the maximum number of concurrent transfer requests to in flight at a time.
// Default is 5.
MaxTransferConcurrency int
// DisableGzip controls whether the client uses gzip compression for transfer requests.
DisableGzip bool
}
type SegmentRemover interface {
Remove(path string) error
}
// Replicator manages the transfer of local segments to other nodes.
type Replicator interface {
service.Component
// TransferQueue returns a channel that can be used to transfer files to other nodes.
TransferQueue() chan *Batch
}
type replicator struct {
queue chan *Batch
cli *Client
wg sync.WaitGroup
closeFn context.CancelFunc
hostname string
// Partitioner is used to determine which node owns a given metric.
Partitioner MetricPartitioner
Health PeerHealthReporter
SegmentRemover SegmentRemover
transferConcurrency int
}
func NewReplicator(opts ReplicatorOpts) (Replicator, error) {
transferConcurrency := 5
if opts.MaxTransferConcurrency > 0 {
transferConcurrency = opts.MaxTransferConcurrency
}
cli, err := NewClient(ClientOpts{
Timeout: 30 * time.Second,
InsecureSkipVerify: opts.InsecureSkipVerify,
Close: false,
MaxIdleConnsPerHost: 1,
MaxIdleConns: transferConcurrency,
IdleConnTimeout: 90 * time.Second,
ResponseHeaderTimeout: 20 * time.Second,
DisableHTTP2: true,
DisableKeepAlives: false,
DisableGzip: opts.DisableGzip,
})
if err != nil {
return nil, err
}
return &replicator{
queue: make(chan *Batch, 10000),
cli: cli,
hostname: opts.Hostname,
Partitioner: opts.Partitioner,
Health: opts.Health,
SegmentRemover: opts.SegmentRemover,
transferConcurrency: transferConcurrency,
}, nil
}
func (r *replicator) Open(ctx context.Context) error {
ctx, r.closeFn = context.WithCancel(ctx)
r.wg.Add(r.transferConcurrency)
for i := 0; i < r.transferConcurrency; i++ {
go r.transfer(ctx)
}
return nil
}
func (r *replicator) Close() error {
r.closeFn()
r.wg.Wait()
return nil
}
func (r *replicator) TransferQueue() chan *Batch {
return r.queue
}
func (r *replicator) transfer(ctx context.Context) {
defer r.wg.Done()
for {
select {
case <-ctx.Done():
return
case batch := <-r.queue:
segments := batch.Segments
if err := func() error {
paths := make([]string, len(segments))
for i, seg := range segments {
paths[i] = seg.Path
}
mr, err := wal.NewSegmentMerger(paths...)
if err != nil && os.IsNotExist(err) {
return nil
} else if err != nil {
return fmt.Errorf("open segments: %w", err)
}
defer mr.Close()
// Merge batch of files into the first file at the destination. This ensures we transfer
// the full batch atomimcally.
filename := filepath.Base(paths[0])
db, table, schema, _, err := wal.ParseFilename(filename)
if err != nil {
return fmt.Errorf("parse segment filename: %w", err)
}
var key string
if schema != "" {
key = fmt.Sprintf("%s_%s_%s", db, table, schema)
} else {
key = fmt.Sprintf("%s_%s", db, table)
}
// Each metric is written to a distinct file. The first part of the filename
// is the metric name. We use the metric name to determine which node owns
// the metric.
owner, addr := r.Partitioner.Owner([]byte(key))
// We're the owner of the file... leave it for the ingestor to upload.
if owner == r.hostname {
return nil
}
// If the peer is not healthy, don't attempt transferring the segment. This could happen if we marked
// the peer unhealthy after we received the batch to process.
if !r.Health.IsPeerHealthy(owner) {
return nil
}
start := time.Now()
if err = r.cli.Write(ctx, addr, filename, mr); err != nil {
if errors.Is(err, ErrBadRequest{}) {
// If ingestor returns a bad request, we should drop the segments as it means we're sending something
// that won't be accepted. Retrying will continue indefinitely. In this case, just drop the file
// and log the error.
logger.Errorf("Failed to transfer segment %s to %s@%s: %s. Dropping segments.", filename, owner, addr, err)
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to remove segment: %s", err)
}
return nil
} else if errors.Is(err, ErrSegmentExists) {
// Segment already exists, remove our side so we don't keep retrying.
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to remove segment: %s", err)
}
return nil
} else if errors.Is(err, ErrSegmentLocked) {
// Segment is locked, retry later.
return nil
} else if errors.Is(err, ErrPeerOverloaded) {
// Ingestor is overloaded, mark the peer as unhealthy and retry later.
r.Health.SetPeerUnhealthy(owner)
return fmt.Errorf("transfer segment %s to %s: %w", filename, addr, err)
}
// Unknown error, assume it's transient and retry after some backoff.
r.Health.SetPeerUnhealthy(owner)
return err
}
for _, seg := range segments {
if logger.IsDebug() {
logger.Debugf("Transferred %s as %s to %s addr=%s duration=%s ", seg.Path, filename, owner, addr, time.Since(start).String())
}
}
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to batch segment: %s", err)
}
return nil
}(); err != nil {
logger.Errorf("Failed to transfer batch: %v", err)
}
batch.Release()
}
}
}