in cmd/ingestor/main.go [87:395]
func realMain(ctx *cli.Context) error {
logger.Infof("%s version:%s", os.Args[0], version.String())
svcCtx, cancel := context.WithCancel(context.Background())
defer cancel()
scheme := clientgoscheme.Scheme
if err := clientgoscheme.AddToScheme(scheme); err != nil {
return err
}
if err := v1.AddToScheme(scheme); err != nil {
return err
}
_, k8scli, ctrlCli, err := newKubeClient(ctx)
if err != nil {
return err
}
runtime.MemProfileRate = 4096
runtime.SetBlockProfileRate(int(1 * time.Second))
runtime.SetMutexProfileFraction(1)
var (
storageDir string
cacert, key string
insecureSkipVerify, disablePeerTransfer bool
concurrentUploads int
maxConns int
maxSegmentSize, maxTransferSize int64
maxSegmentAge, maxTransferAge time.Duration
)
storageDir = ctx.String("storage-dir")
concurrentUploads = ctx.Int("uploads")
maxSegmentSize = ctx.Int64("max-segment-size")
maxSegmentAge = ctx.Duration("max-segment-age")
maxTransferSize = ctx.Int64("max-transfer-size")
maxTransferAge = ctx.Duration("max-transfer-age")
maxSegmentCount := ctx.Int64("max-segment-count")
dropPrefixes := ctx.StringSlice("drop-prefix")
maxDiskUsage := ctx.Int64("max-disk-usage")
maxBatchSegments := ctx.Int("max-batch-segments")
partitionSize := ctx.Int("partition-size")
maxConns = int(ctx.Uint("max-connections"))
cacert = ctx.String("ca-cert")
key = ctx.String("key")
insecureSkipVerify = ctx.Bool("insecure-skip-verify")
namespace := ctx.String("namespace")
hostname := ctx.String("hostname")
region := ctx.String("region")
disablePeerTransfer = ctx.Bool("disable-peer-transfer")
maxTransferConcurrency := ctx.Int("max-transfer-concurrency")
enableWALFsync := ctx.Bool("enable-wal-fsync")
slowRequestThreshold := ctx.Duration("slow-request-threshold")
if namespace == "" {
nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err == nil {
namespace = strings.TrimSpace(string(nsBytes))
}
}
if hostname == "" {
hostname, err = os.Hostname()
if err != nil {
logger.Errorf("Failed to get hostname: %s", err)
}
}
if cacert != "" || key != "" {
if cacert == "" || key == "" {
logger.Fatalf("Both --ca-cert and --key are required")
}
} else {
logger.Warnf("Using fake TLS credentials (not for production use!)")
certBytes, keyBytes, err := tls.NewFakeTLSCredentials()
if err != nil {
logger.Fatalf("Failed to create fake TLS credentials: %s", err)
}
certFile, err := os.CreateTemp("", "cert")
if err != nil {
logger.Fatalf("Failed to create cert temp file: %s", err)
}
if _, err := certFile.Write(certBytes); err != nil {
logger.Fatalf("Failed to write cert temp file: %s", err)
}
keyFile, err := os.CreateTemp("", "key")
if err != nil {
logger.Fatalf("Failed to create key temp file: %s", err)
}
if _, err := keyFile.Write(keyBytes); err != nil {
logger.Fatalf("Failed to write key temp file: %s", err)
}
cacert = certFile.Name()
key = keyFile.Name()
insecureSkipVerify = true
if err := certFile.Close(); err != nil {
logger.Fatalf("Failed to close cert temp file: %s", err)
}
if err := keyFile.Close(); err != nil {
logger.Fatalf("Failed to close key temp file: %s", err)
}
defer func() {
os.Remove(certFile.Name())
os.Remove(keyFile.Name())
}()
}
logger.Infof("Using TLS ca-cert=%s key=%s", cacert, key)
if storageDir == "" {
logger.Fatalf("--storage-dir is required")
}
var (
allowedDatabases []string
metricsUploaders, logsUploaders []adx.Uploader
metricsDatabases, logsDatabases []string
)
metricsKusto := ctx.StringSlice("metrics-kusto-endpoints")
if len(metricsKusto) > 0 {
metricsUploaders, metricsDatabases, err = newUploaders(
metricsKusto, storageDir, concurrentUploads,
schema.DefaultMetricsMapping, adx.PromMetrics)
if err != nil {
logger.Fatalf("Failed to create uploader: %s", err)
}
} else {
logger.Warnf("No kusto endpoint provided, using fake metrics uploader")
uploader := adx.NewFakeUploader("FakeMetrics")
metricsUploaders = append(metricsUploaders, uploader)
metricsDatabases = append(metricsDatabases, uploader.Database())
}
logsKusto := ctx.StringSlice("logs-kusto-endpoints")
if len(logsKusto) > 0 {
logsUploaders, logsDatabases, err = newUploaders(
logsKusto, storageDir, concurrentUploads,
schema.DefaultLogsMapping, adx.OTLPLogs)
if err != nil {
logger.Fatalf("Failed to create uploaders for OTLP logs: %s", err)
}
} else {
logger.Warnf("No kusto endpoint provided, using fake logs uploader")
uploader := adx.NewFakeUploader("FakeLogs")
logsUploaders = append(logsUploaders, uploader)
logsDatabases = append(logsDatabases, uploader.Database())
}
allowedDatabases = append(allowedDatabases, metricsDatabases...)
allowedDatabases = append(allowedDatabases, logsDatabases...)
uploadDispatcher := adx.NewDispatcher(append(logsUploaders, metricsUploaders...))
if err := uploadDispatcher.Open(svcCtx); err != nil {
logger.Fatalf("Failed to start upload dispatcher: %s", err)
}
defer uploadDispatcher.Close()
var metricsKustoCli []metrics.StatementExecutor
for _, cli := range metricsUploaders {
metricsKustoCli = append(metricsKustoCli, cli)
}
var logsKustoCli []metrics.StatementExecutor
for _, cli := range logsUploaders {
logsKustoCli = append(logsKustoCli, cli)
}
svc, err := ingestor.NewService(ingestor.ServiceOpts{
K8sCli: k8scli,
K8sCtrlCli: ctrlCli,
LogsKustoCli: logsKustoCli,
MetricsKustoCli: metricsKustoCli,
MetricsDatabases: metricsDatabases,
AllowedDatabase: metricsDatabases,
LogsDatabases: logsDatabases,
Namespace: namespace,
Hostname: hostname,
Region: region,
StorageDir: storageDir,
Uploader: uploadDispatcher,
DisablePeerTransfer: disablePeerTransfer,
PartitionSize: partitionSize,
MaxSegmentSize: maxSegmentSize,
MaxSegmentAge: maxSegmentAge,
MaxTransferSize: maxTransferSize,
MaxTransferAge: maxTransferAge,
MaxSegmentCount: maxSegmentCount,
MaxDiskUsage: maxDiskUsage,
MaxBatchSegments: maxBatchSegments,
EnableWALFsync: enableWALFsync,
MaxTransferConcurrency: maxTransferConcurrency,
InsecureSkipVerify: insecureSkipVerify,
DropFilePrefixes: dropPrefixes,
SlowRequestThreshold: slowRequestThreshold.Seconds(),
})
if err != nil {
logger.Fatalf("Failed to create service: %s", err)
}
if err := svc.Open(svcCtx); err != nil {
logger.Fatalf("Failed to start service: %s", err)
}
l, err := net.Listen("tcp", ":9090")
if err != nil {
logger.Fatalf("Failed to listen: %s", err)
}
if maxConns > 0 {
logger.Infof("Limiting connections to %d", maxConns)
l = limiter.LimitListener(l, maxConns)
}
defer l.Close()
logger.Infof("Listening at %s", ":9090")
mux := http.NewServeMux()
mux.HandleFunc("/transfer", svc.HandleTransfer)
mux.HandleFunc("/readyz", svc.HandleReady)
logger.Infof("Metrics Listening at %s", ":9091")
metricsMux := http.NewServeMux()
metricsMux.Handle("/metrics", promhttp.Handler())
metricsMux.HandleFunc("/debug/pprof/", pprof.Index)
metricsMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
metricsMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
metricsMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
metricsMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
metricsMux.HandleFunc("/debug/store", svc.HandleDebugStore)
srv := &http.Server{
Handler: mux,
// Close idle connections fairly often to establish new connections through the load balancer
// so that long-lived connections don't stay pinned to the same node indefinitely.
IdleTimeout: 5 * time.Minute,
}
srv.ErrorLog = newLogger()
go func() {
// Under high connection load and when the server is doing a lot of IO, this
// can cause the server to be unresponsive. This pins the accept goroutine
// to a single CPU to reduce context switching and improve performance.
runtime.LockOSThread()
if err := pinToCPU(0); err != nil {
logger.Warnf("Failed to pin to CPU: %s", err)
}
if err := srv.ServeTLS(l, cacert, key); err != nil {
logger.Errorf(err.Error())
}
}()
metricsSrv := &http.Server{Addr: ":9091", Handler: metricsMux}
metricsSrv.ErrorLog = newLogger()
go func() {
if err := metricsSrv.ListenAndServe(); err != nil {
logger.Errorf(err.Error())
}
}()
// Capture SIGINT and SIGTERM to trigger a shutdown and upload/transfer of all pending segments.
// This is best-effort, if the process is killed with SIGKILL or the shutdown takes too long
// the segments will be delayed until the process is restarted.
sc := make(chan os.Signal, 1)
signal.Notify(sc, os.Interrupt, syscall.SIGTERM)
go func() {
sig := <-sc
// Stop receiving new samples
srv.Shutdown(context.Background())
// Disable writes for internal processes (metrics)
if err := svc.DisableWrites(); err != nil {
logger.Errorf("Failed to disable writes: %s", err)
}
logger.Infof("Received signal %s, uploading pending segments...", sig.String())
// Trigger shutdown of any pending background processes
cancel()
if err := metricsSrv.Shutdown(context.Background()); err != nil {
logger.Errorf("Failed to shutdown metrics server: %s", err)
}
// Shutdown the server and cancel context
err := svc.Close()
if err != nil {
logger.Errorf(err.Error())
}
}()
// Only start the shutdown runner if running in a cluster
if _, err := rest.InClusterConfig(); err == nil {
go func() {
sd := runner.NewShutDownRunner(k8scli, srv, svc)
scheduler.RunForever(svcCtx, time.Minute, sd)
}()
}
<-svcCtx.Done()
return nil
}