func realMain()

in cmd/ingestor/main.go [87:395]


func realMain(ctx *cli.Context) error {
	logger.Infof("%s version:%s", os.Args[0], version.String())

	svcCtx, cancel := context.WithCancel(context.Background())
	defer cancel()

	scheme := clientgoscheme.Scheme
	if err := clientgoscheme.AddToScheme(scheme); err != nil {
		return err
	}
	if err := v1.AddToScheme(scheme); err != nil {
		return err
	}

	_, k8scli, ctrlCli, err := newKubeClient(ctx)
	if err != nil {
		return err
	}

	runtime.MemProfileRate = 4096
	runtime.SetBlockProfileRate(int(1 * time.Second))
	runtime.SetMutexProfileFraction(1)

	var (
		storageDir                              string
		cacert, key                             string
		insecureSkipVerify, disablePeerTransfer bool
		concurrentUploads                       int
		maxConns                                int
		maxSegmentSize, maxTransferSize         int64
		maxSegmentAge, maxTransferAge           time.Duration
	)
	storageDir = ctx.String("storage-dir")
	concurrentUploads = ctx.Int("uploads")
	maxSegmentSize = ctx.Int64("max-segment-size")
	maxSegmentAge = ctx.Duration("max-segment-age")
	maxTransferSize = ctx.Int64("max-transfer-size")
	maxTransferAge = ctx.Duration("max-transfer-age")
	maxSegmentCount := ctx.Int64("max-segment-count")
	dropPrefixes := ctx.StringSlice("drop-prefix")
	maxDiskUsage := ctx.Int64("max-disk-usage")
	maxBatchSegments := ctx.Int("max-batch-segments")
	partitionSize := ctx.Int("partition-size")
	maxConns = int(ctx.Uint("max-connections"))
	cacert = ctx.String("ca-cert")
	key = ctx.String("key")
	insecureSkipVerify = ctx.Bool("insecure-skip-verify")
	namespace := ctx.String("namespace")
	hostname := ctx.String("hostname")
	region := ctx.String("region")
	disablePeerTransfer = ctx.Bool("disable-peer-transfer")
	maxTransferConcurrency := ctx.Int("max-transfer-concurrency")
	enableWALFsync := ctx.Bool("enable-wal-fsync")
	slowRequestThreshold := ctx.Duration("slow-request-threshold")

	if namespace == "" {
		nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
		if err == nil {
			namespace = strings.TrimSpace(string(nsBytes))
		}
	}

	if hostname == "" {
		hostname, err = os.Hostname()
		if err != nil {
			logger.Errorf("Failed to get hostname: %s", err)
		}
	}

	if cacert != "" || key != "" {
		if cacert == "" || key == "" {
			logger.Fatalf("Both --ca-cert and --key are required")
		}
	} else {
		logger.Warnf("Using fake TLS credentials (not for production use!)")
		certBytes, keyBytes, err := tls.NewFakeTLSCredentials()
		if err != nil {
			logger.Fatalf("Failed to create fake TLS credentials: %s", err)
		}

		certFile, err := os.CreateTemp("", "cert")
		if err != nil {
			logger.Fatalf("Failed to create cert temp file: %s", err)
		}

		if _, err := certFile.Write(certBytes); err != nil {
			logger.Fatalf("Failed to write cert temp file: %s", err)
		}

		keyFile, err := os.CreateTemp("", "key")
		if err != nil {
			logger.Fatalf("Failed to create key temp file: %s", err)
		}

		if _, err := keyFile.Write(keyBytes); err != nil {
			logger.Fatalf("Failed to write key temp file: %s", err)
		}

		cacert = certFile.Name()
		key = keyFile.Name()
		insecureSkipVerify = true

		if err := certFile.Close(); err != nil {
			logger.Fatalf("Failed to close cert temp file: %s", err)
		}

		if err := keyFile.Close(); err != nil {
			logger.Fatalf("Failed to close key temp file: %s", err)
		}

		defer func() {
			os.Remove(certFile.Name())
			os.Remove(keyFile.Name())
		}()
	}

	logger.Infof("Using TLS ca-cert=%s key=%s", cacert, key)
	if storageDir == "" {
		logger.Fatalf("--storage-dir is required")
	}

	var (
		allowedDatabases                []string
		metricsUploaders, logsUploaders []adx.Uploader
		metricsDatabases, logsDatabases []string
	)

	metricsKusto := ctx.StringSlice("metrics-kusto-endpoints")
	if len(metricsKusto) > 0 {
		metricsUploaders, metricsDatabases, err = newUploaders(
			metricsKusto, storageDir, concurrentUploads,
			schema.DefaultMetricsMapping, adx.PromMetrics)
		if err != nil {
			logger.Fatalf("Failed to create uploader: %s", err)
		}
	} else {
		logger.Warnf("No kusto endpoint provided, using fake metrics uploader")
		uploader := adx.NewFakeUploader("FakeMetrics")
		metricsUploaders = append(metricsUploaders, uploader)
		metricsDatabases = append(metricsDatabases, uploader.Database())
	}

	logsKusto := ctx.StringSlice("logs-kusto-endpoints")
	if len(logsKusto) > 0 {
		logsUploaders, logsDatabases, err = newUploaders(
			logsKusto, storageDir, concurrentUploads,
			schema.DefaultLogsMapping, adx.OTLPLogs)
		if err != nil {
			logger.Fatalf("Failed to create uploaders for OTLP logs: %s", err)
		}
	} else {
		logger.Warnf("No kusto endpoint provided, using fake logs uploader")
		uploader := adx.NewFakeUploader("FakeLogs")
		logsUploaders = append(logsUploaders, uploader)
		logsDatabases = append(logsDatabases, uploader.Database())
	}

	allowedDatabases = append(allowedDatabases, metricsDatabases...)
	allowedDatabases = append(allowedDatabases, logsDatabases...)

	uploadDispatcher := adx.NewDispatcher(append(logsUploaders, metricsUploaders...))
	if err := uploadDispatcher.Open(svcCtx); err != nil {
		logger.Fatalf("Failed to start upload dispatcher: %s", err)
	}
	defer uploadDispatcher.Close()

	var metricsKustoCli []metrics.StatementExecutor
	for _, cli := range metricsUploaders {
		metricsKustoCli = append(metricsKustoCli, cli)
	}

	var logsKustoCli []metrics.StatementExecutor
	for _, cli := range logsUploaders {
		logsKustoCli = append(logsKustoCli, cli)
	}

	svc, err := ingestor.NewService(ingestor.ServiceOpts{
		K8sCli:                 k8scli,
		K8sCtrlCli:             ctrlCli,
		LogsKustoCli:           logsKustoCli,
		MetricsKustoCli:        metricsKustoCli,
		MetricsDatabases:       metricsDatabases,
		AllowedDatabase:        metricsDatabases,
		LogsDatabases:          logsDatabases,
		Namespace:              namespace,
		Hostname:               hostname,
		Region:                 region,
		StorageDir:             storageDir,
		Uploader:               uploadDispatcher,
		DisablePeerTransfer:    disablePeerTransfer,
		PartitionSize:          partitionSize,
		MaxSegmentSize:         maxSegmentSize,
		MaxSegmentAge:          maxSegmentAge,
		MaxTransferSize:        maxTransferSize,
		MaxTransferAge:         maxTransferAge,
		MaxSegmentCount:        maxSegmentCount,
		MaxDiskUsage:           maxDiskUsage,
		MaxBatchSegments:       maxBatchSegments,
		EnableWALFsync:         enableWALFsync,
		MaxTransferConcurrency: maxTransferConcurrency,
		InsecureSkipVerify:     insecureSkipVerify,
		DropFilePrefixes:       dropPrefixes,
		SlowRequestThreshold:   slowRequestThreshold.Seconds(),
	})
	if err != nil {
		logger.Fatalf("Failed to create service: %s", err)
	}
	if err := svc.Open(svcCtx); err != nil {
		logger.Fatalf("Failed to start service: %s", err)
	}

	l, err := net.Listen("tcp", ":9090")
	if err != nil {
		logger.Fatalf("Failed to listen: %s", err)
	}
	if maxConns > 0 {
		logger.Infof("Limiting connections to %d", maxConns)
		l = limiter.LimitListener(l, maxConns)
	}
	defer l.Close()

	logger.Infof("Listening at %s", ":9090")
	mux := http.NewServeMux()
	mux.HandleFunc("/transfer", svc.HandleTransfer)
	mux.HandleFunc("/readyz", svc.HandleReady)

	logger.Infof("Metrics Listening at %s", ":9091")
	metricsMux := http.NewServeMux()
	metricsMux.Handle("/metrics", promhttp.Handler())
	metricsMux.HandleFunc("/debug/pprof/", pprof.Index)
	metricsMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
	metricsMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
	metricsMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
	metricsMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
	metricsMux.HandleFunc("/debug/store", svc.HandleDebugStore)

	srv := &http.Server{
		Handler: mux,
		// Close idle connections fairly often to establish new connections through the load balancer
		// so that long-lived connections don't stay pinned to the same node indefinitely.
		IdleTimeout: 5 * time.Minute,
	}
	srv.ErrorLog = newLogger()

	go func() {
		// Under high connection load and when the server is doing a lot of IO, this
		// can cause the server to be unresponsive.  This pins the accept goroutine
		// to a single CPU to reduce context switching and improve performance.
		runtime.LockOSThread()
		if err := pinToCPU(0); err != nil {
			logger.Warnf("Failed to pin to CPU: %s", err)
		}

		if err := srv.ServeTLS(l, cacert, key); err != nil {
			logger.Errorf(err.Error())
		}
	}()

	metricsSrv := &http.Server{Addr: ":9091", Handler: metricsMux}
	metricsSrv.ErrorLog = newLogger()
	go func() {
		if err := metricsSrv.ListenAndServe(); err != nil {
			logger.Errorf(err.Error())
		}
	}()

	// Capture SIGINT and SIGTERM to trigger a shutdown and upload/transfer of all pending segments.
	// This is best-effort, if the process is killed with SIGKILL or the shutdown takes too long
	// the segments will be delayed until the process is restarted.
	sc := make(chan os.Signal, 1)
	signal.Notify(sc, os.Interrupt, syscall.SIGTERM)
	go func() {
		sig := <-sc

		// Stop receiving new samples
		srv.Shutdown(context.Background())

		// Disable writes for internal processes (metrics)
		if err := svc.DisableWrites(); err != nil {
			logger.Errorf("Failed to disable writes: %s", err)
		}

		logger.Infof("Received signal %s, uploading pending segments...", sig.String())

		// Trigger shutdown of any pending background processes
		cancel()

		if err := metricsSrv.Shutdown(context.Background()); err != nil {
			logger.Errorf("Failed to shutdown metrics server: %s", err)
		}

		// Shutdown the server and cancel context
		err := svc.Close()
		if err != nil {
			logger.Errorf(err.Error())
		}
	}()

	// Only start the shutdown runner if running in a cluster
	if _, err := rest.InClusterConfig(); err == nil {
		go func() {
			sd := runner.NewShutDownRunner(k8scli, srv, svc)
			scheduler.RunForever(svcCtx, time.Minute, sd)
		}()
	}

	<-svcCtx.Done()
	return nil
}