ingestor/service.go (405 lines of code) (raw):
package ingestor
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"regexp"
"strconv"
"strings"
"time"
"github.com/Azure/adx-mon/ingestor/adx"
"github.com/Azure/adx-mon/ingestor/cluster"
ingestorstorage "github.com/Azure/adx-mon/ingestor/storage"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/debug"
adxhttp "github.com/Azure/adx-mon/pkg/http"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/reader"
"github.com/Azure/adx-mon/pkg/scheduler"
"github.com/Azure/adx-mon/pkg/wal"
"github.com/Azure/adx-mon/storage"
"github.com/klauspost/compress/gzip"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// invalidEntityCharacters is a regex that matches invalid characters for Kusto entities and segment files.
// This is a subset of the invalid characters for Kusto entities and segment files naming patterns. This should
// match tranform.Normalize.
var invalidEntityCharacters = regexp.MustCompile(`[^a-zA-Z0-9]`)
type Interface interface {
Open(ctx context.Context) error
Close() error
HandleReady(w http.ResponseWriter, r *http.Request)
HandleTransfer(w http.ResponseWriter, r *http.Request)
Shutdown(ctx context.Context) error
UploadSegments(ctx context.Context) error
DisableWrites() error
}
type Service struct {
walOpts wal.WALOpts
opts ServiceOpts
// database is a map of known DB names used for validating requests.
databases map[string]struct{}
uploader adx.Uploader
replicator cluster.Replicator
coordinator cluster.Coordinator
batcher cluster.Batcher
closeFn context.CancelFunc
store storage.Store
metrics metrics.Service
scheduler *scheduler.Periodic
dropFilePrefixes []string
health interface{ IsHealthy() bool }
}
type ServiceOpts struct {
StorageDir string
Uploader adx.Uploader
MaxSegmentSize int64
MaxSegmentAge time.Duration
K8sCli kubernetes.Interface
K8sCtrlCli client.Client
// MetricsKustoCli is the Kusto client connected to the metrics kusto cluster.
MetricsKustoCli []metrics.StatementExecutor
// LogsKustoCli is the Kusto client connected to the logs kusto cluster.
LogsKustoCli []metrics.StatementExecutor
// InsecureSkipVerify disables TLS certificate verification.
InsecureSkipVerify bool
// Namespace is the namespace used for peer discovery.
Namespace string
// Hostname is the hostname of the current node.
Hostname string
// Region is a location identifier
Region string
// DisablePeerTransfer disables peer discovery and prevents transfers of small segments to an owner.
// Each instance of ingestor will upload received segments directly.
DisablePeerTransfer bool
// MaxTransferSize is the minimum size of a segment that will be transferred to another node. If a segment
// exceeds this size, it will be uploaded directly by the current node.
MaxTransferSize int64
// MaxTransferAge is the maximum age of a segment that will be transferred to another node. If a segment
// exceeds this age, it will be uploaded directly by the current node.
MaxTransferAge time.Duration
// MaxSegmentCount is the maximum number of segments files allowed on disk before signaling back-pressure.
MaxSegmentCount int64
// MaxDiskUsage is the maximum disk usage allowed before signaling back-pressure.
MaxDiskUsage int64
// AllowedDatabases is the distinct set of database names that are allowed to be written to.
AllowedDatabase []string
// MetricsDatabase is the name of the metrics database.
MetricsDatabases []string
// LogsDatabases is a slice of log database names.
LogsDatabases []string
// PartitionSize is the max size of the group of nodes forming a partition. A partition is a set of nodes where
// keys are distributed.
PartitionSize int
// MaxTransferConcurrency is the maximum number of concurrent transfers allowed in flight at the same time.
MaxTransferConcurrency int
// EnableWALFsync enables fsync of segments before closing the segment.
EnableWALFsync bool
// DropFilePrefixes is a slice of prefixes that will be dropped when importing segments.
DropFilePrefixes []string
// MaxBatchSegments is the maximum number of segments to include when transferring segments in a batch. The segments
// are merged into a new segment. A higher number takes longer to combine on the sending side and increases the
// size of segments on the receiving side. A lower number creates more batches and high remote transfer calls. If
// not specified, the default is 25.
MaxBatchSegments int
// SlowRequestThreshold is the threshold for logging slow requests.
SlowRequestThreshold float64
}
func NewService(opts ServiceOpts) (*Service, error) {
store := storage.NewLocalStore(storage.StoreOpts{
StorageDir: opts.StorageDir,
SegmentMaxSize: opts.MaxSegmentSize,
SegmentMaxAge: opts.MaxSegmentAge,
EnableWALFsync: opts.EnableWALFsync,
})
coord, err := cluster.NewCoordinator(&cluster.CoordinatorOpts{
WriteTimeSeriesFn: store.WriteTimeSeries,
K8sCli: opts.K8sCli,
Hostname: opts.Hostname,
Namespace: opts.Namespace,
PartitionSize: opts.PartitionSize,
})
if err != nil {
return nil, err
}
health := cluster.NewHealth(cluster.HealthOpts{
UnhealthyTimeout: time.Minute,
MaxSegmentCount: opts.MaxSegmentCount,
MaxDiskUsage: opts.MaxDiskUsage,
})
repl, err := cluster.NewReplicator(cluster.ReplicatorOpts{
Hostname: opts.Hostname,
Partitioner: coord,
InsecureSkipVerify: opts.InsecureSkipVerify,
Health: health,
SegmentRemover: store,
MaxTransferConcurrency: opts.MaxTransferConcurrency,
DisableGzip: true,
})
if err != nil {
return nil, err
}
batcher := cluster.NewBatcher(cluster.BatcherOpts{
StorageDir: opts.StorageDir,
MaxSegmentAge: opts.MaxSegmentAge,
MaxTransferSize: opts.MaxTransferSize,
MaxTransferAge: opts.MaxTransferAge,
MaxBatchSegments: opts.MaxBatchSegments,
Partitioner: coord,
Segmenter: store.Index(),
UploadQueue: opts.Uploader.UploadQueue(),
TransferQueue: repl.TransferQueue(),
PeerHealthReporter: health,
TransfersDisabled: opts.DisablePeerTransfer,
})
health.QueueSizer = batcher
allKustoCli := make([]metrics.StatementExecutor, 0, len(opts.MetricsKustoCli)+len(opts.LogsKustoCli))
allKustoCli = append(allKustoCli, opts.MetricsKustoCli...)
allKustoCli = append(allKustoCli, opts.LogsKustoCli...)
metricsSvc := metrics.NewService(metrics.ServiceOpts{
Hostname: opts.Hostname,
Elector: coord,
MetricsKustoCli: opts.MetricsKustoCli,
KustoCli: allKustoCli,
PeerHealthReport: health,
})
dbs := make(map[string]struct{}, len(opts.AllowedDatabase))
for _, db := range opts.AllowedDatabase {
dbs[db] = struct{}{}
}
databases := make(map[string]struct{})
for _, db := range opts.LogsDatabases {
databases[db] = struct{}{}
}
for _, db := range opts.MetricsDatabases {
databases[db] = struct{}{}
}
sched := scheduler.NewScheduler(coord)
return &Service{
opts: opts,
databases: databases,
uploader: opts.Uploader,
replicator: repl,
store: store,
coordinator: coord,
batcher: batcher,
metrics: metricsSvc,
health: health,
scheduler: sched,
dropFilePrefixes: opts.DropFilePrefixes,
}, nil
}
func (s *Service) Open(ctx context.Context) error {
var svcCtx context.Context
svcCtx, s.closeFn = context.WithCancel(ctx)
if err := s.store.Open(svcCtx); err != nil {
return err
}
if err := s.coordinator.Open(svcCtx); err != nil {
return err
}
if err := s.batcher.Open(svcCtx); err != nil {
return err
}
if err := s.replicator.Open(svcCtx); err != nil {
return err
}
if err := s.metrics.Open(svcCtx); err != nil {
return err
}
if err := s.scheduler.Open(svcCtx); err != nil {
return err
}
s.scheduler.ScheduleEvery(time.Minute, "ingestor-health-check", func(ctx context.Context) error {
metrics.IngestorHealthCheck.WithLabelValues(s.opts.Region).Set(1)
return nil
})
fnStore := ingestorstorage.NewFunctions(s.opts.K8sCtrlCli, s.coordinator)
crdStore := ingestorstorage.NewCRDHandler(s.opts.K8sCtrlCli, s.coordinator)
for _, v := range s.opts.MetricsKustoCli {
t := adx.NewDropUnusedTablesTask(v)
s.scheduler.ScheduleEvery(12*time.Hour, "delete-unused-tables", func(ctx context.Context) error {
return t.Run(ctx)
})
f := adx.NewSyncFunctionsTask(fnStore, v)
s.scheduler.ScheduleEvery(time.Minute, "sync-metrics-functions", func(ctx context.Context) error {
return f.Run(ctx)
})
m := adx.NewManagementCommandsTask(crdStore, v)
s.scheduler.ScheduleEvery(10*time.Minute, "management-commands", func(ctx context.Context) error {
return m.Run(ctx)
})
}
for _, v := range s.opts.LogsKustoCli {
f := adx.NewSyncFunctionsTask(fnStore, v)
s.scheduler.ScheduleEvery(time.Minute, "sync-logs-functions", func(ctx context.Context) error {
return f.Run(ctx)
})
m := adx.NewManagementCommandsTask(crdStore, v)
s.scheduler.ScheduleEvery(10*time.Minute, "management-commands", func(ctx context.Context) error {
return m.Run(ctx)
})
}
return nil
}
func (s *Service) Close() error {
s.closeFn()
if err := s.scheduler.Close(); err != nil {
return err
}
if err := s.metrics.Close(); err != nil {
return err
}
if err := s.replicator.Close(); err != nil {
return err
}
if err := s.batcher.Close(); err != nil {
return err
}
if err := s.coordinator.Close(); err != nil {
return err
}
return s.store.Close()
}
func (s *Service) HandleDebugStore(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if debugWriter, ok := s.store.(debug.DebugWriter); ok {
if err := debugWriter.WriteDebug(w); err != nil {
logger.Errorf("Failed to write debug info: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}
return
}
}
// HandleReady handles the readiness probe.
func (s *Service) HandleReady(w http.ResponseWriter, r *http.Request) {
if s.health.IsHealthy() {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
return
}
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("not ready"))
}
// HandleTransfer handles the transfer WAL segments from other nodes in the cluster.
func (s *Service) HandleTransfer(w http.ResponseWriter, r *http.Request) {
adxhttp.MaybeCloseConnection(w, r)
start := time.Now()
m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": "/transfer"})
filename := r.URL.Query().Get("filename")
if filename == "" {
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, "missing filename", http.StatusBadRequest)
return
}
xff := r.Header.Get("X-Forwarded-For")
// If the header is present, split it by comma and take the first IP address
var originalIP string
if xff != "" {
ips := strings.Split(xff, ",")
originalIP = strings.TrimSpace(ips[0])
} else {
// If the header is not present, use the remote address
originalIP = r.RemoteAddr
}
cr := reader.NewCounterReader(r.Body)
var body io.ReadCloser = cr
defer func() {
io.Copy(io.Discard, body)
metrics.RequestsBytesReceived.Add(float64(cr.Count()))
dur := time.Since(start)
if s.opts.SlowRequestThreshold > 0 && dur.Seconds() > s.opts.SlowRequestThreshold {
logger.Warnf("Slow request: path=/transfer duration=%s from=%s size=%d file=%s", dur.String(), originalIP, cr.Count(), filename)
}
if err := body.Close(); err != nil {
logger.Errorf("Close http body: %s, path=/transfer duration=%s from=%s", err.Error(), dur.String(), originalIP)
}
}()
for _, prefix := range s.dropFilePrefixes {
if strings.HasPrefix(filename, prefix) {
io.Copy(io.Discard, body)
metrics.IngestorDroppedPrefixes.WithLabelValues(prefix).Inc()
m.WithLabelValues(strconv.Itoa(http.StatusAccepted)).Inc()
w.WriteHeader(http.StatusAccepted)
return
}
}
if !s.health.IsHealthy() {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
}
// https://pkg.go.dev/io/fs#ValidPath
// Check for possible traversal attacks.
f := s.validateFileName(filename)
if f == "" {
logger.Errorf("Transfer requested with an invalid filename %q", filename)
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, "Filename is invalid", http.StatusBadRequest)
return
}
// If the request is gzipped, create a gzip reader to decompress the body.
if r.Header.Get("Content-Encoding") == "gzip" {
gzipReader, err := gzip.NewReader(body)
if err != nil {
logger.Errorf("Failed to create gzip reader: %s", err.Error())
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, "Invalid gzip encoding", http.StatusBadRequest)
return
}
defer gzipReader.Close()
body = gzipReader
}
n, err := s.store.Import(f, body)
if errors.Is(err, wal.ErrMaxSegmentsExceeded) || errors.Is(err, wal.ErrMaxDiskUsageExceeded) {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
} else if errors.Is(err, wal.ErrSegmentLocked) {
http.Error(w, err.Error(), http.StatusLocked)
return
} else if err != nil && strings.Contains(err.Error(), "block checksum verification failed") {
logger.Errorf("Transfer requested with checksum error %q from=%s", filename, originalIP)
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, "Block checksum verification failed", http.StatusBadRequest)
return
} else if err != nil {
logger.Errorf("Failed to import %s: %s from=%s", filename, err.Error(), originalIP)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else {
if logger.IsDebug() {
logger.Debugf("Imported %d bytes to %s", n, filename)
}
}
m.WithLabelValues(strconv.Itoa(http.StatusAccepted)).Inc()
w.WriteHeader(http.StatusAccepted)
}
func (s *Service) Shutdown(ctx context.Context) error {
if err := s.metrics.Close(); err != nil {
return err
}
if err := s.UploadSegments(ctx); err != nil {
return fmt.Errorf("Failed to upload segments: %s", err.Error())
}
return nil
}
func (s *Service) UploadSegments(ctx context.Context) error {
if err := s.batcher.BatchSegments(); err != nil {
return err
}
logger.Infof("Waiting for upload queue to drain, %d batches remaining", len(s.uploader.UploadQueue()))
logger.Infof("Waiting for transfer queue to drain, %d batches remaining", len(s.replicator.TransferQueue()))
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-t.C:
if len(s.uploader.UploadQueue()) == 0 && len(s.replicator.TransferQueue()) == 0 {
return nil
}
if len(s.uploader.UploadQueue()) != 0 {
logger.Infof("Waiting for upload queue to drain, %d batches remaining", len(s.uploader.UploadQueue()))
}
if len(s.replicator.TransferQueue()) != 0 {
logger.Infof("Waiting for transfer queue to drain, %d batches remaining", len(s.replicator.TransferQueue()))
}
case <-ctx.Done():
return fmt.Errorf("timed out to upload segments")
}
}
}
func (s *Service) DisableWrites() error {
if err := s.metrics.Close(); err != nil {
return err
}
if err := s.store.Close(); err != nil {
return err
}
return nil
}
func (s *Service) validateFileName(filename string) string {
if !fs.ValidPath(filename) {
return ""
}
db, table, schema, epoch, err := wal.ParseFilename(filename)
if err != nil {
return ""
}
if invalidEntityCharacters.MatchString(db) || invalidEntityCharacters.MatchString(table) || invalidEntityCharacters.MatchString(epoch) || invalidEntityCharacters.MatchString(schema) {
return ""
}
if _, ok := s.databases[db]; !ok {
return ""
}
return wal.Filename(db, table, schema, epoch)
}