func()

in router/pkg/metric/router_runtime_metrics.go [49:304]


func (r *RuntimeMetrics) Start() error {

	// lock prevents a race between batch observer and instrument registration.
	var (
		lock            sync.Mutex
		processCPUUsage otelmetric.Float64ObservableGauge
		heapAlloc       otelmetric.Int64ObservableUpDownCounter
		heapIdle        otelmetric.Int64ObservableUpDownCounter
		heapInuse       otelmetric.Int64ObservableUpDownCounter
		heapObjects     otelmetric.Int64ObservableUpDownCounter
		heapReleased    otelmetric.Int64ObservableUpDownCounter
		heapSys         otelmetric.Int64ObservableUpDownCounter
		liveObjects     otelmetric.Int64ObservableUpDownCounter
		goroutinesCount otelmetric.Int64ObservableUpDownCounter
		goVersion       otelmetric.Int64ObservableUpDownCounter

		gcCount      otelmetric.Int64ObservableCounter
		pauseTotalNs otelmetric.Int64ObservableCounter
		gcPauseNs    otelmetric.Int64Histogram

		lastNumGC    uint32
		lastMemStats time.Time
		memStats     goruntime.MemStats
	)

	lock.Lock()
	defer lock.Unlock()

	runtimeUptime, err := r.meter.Int64ObservableGauge(
		"process.uptime",
		otelmetric.WithUnit("s"),
		otelmetric.WithDescription("Seconds since application was initialized"),
	)
	if err != nil {
		return err
	}

	if processCPUUsage, err = r.meter.Float64ObservableGauge(
		"process.cpu.usage",
		otelmetric.WithUnit("percent"),
		otelmetric.WithDescription(
			"Total CPU usage of this process in percentage of host total CPU capacity",
		),
	); err != nil {
		return err
	}

	if heapAlloc, err = r.meter.Int64ObservableUpDownCounter(
		"process.runtime.go.mem.heap_alloc",
		otelmetric.WithUnit("By"),
		otelmetric.WithDescription("Bytes of allocated heap objects"),
	); err != nil {
		return err
	}

	if heapIdle, err = r.meter.Int64ObservableUpDownCounter(
		"process.runtime.go.mem.heap_idle",
		otelmetric.WithUnit("By"),
		otelmetric.WithDescription("Bytes in idle (unused) spans"),
	); err != nil {
		return err
	}

	if heapInuse, err = r.meter.Int64ObservableUpDownCounter(
		"process.runtime.go.mem.heap_inuse",
		otelmetric.WithUnit("By"),
		otelmetric.WithDescription("Bytes in in-use spans"),
	); err != nil {
		return err
	}

	if heapObjects, err = r.meter.Int64ObservableUpDownCounter(
		"process.runtime.go.mem.heap_objects",
		otelmetric.WithDescription("Number of allocated heap objects"),
	); err != nil {
		return err
	}

	// FYI see https://github.com/golang/go/issues/32284 to help
	// understand the meaning of this value.
	if heapReleased, err = r.meter.Int64ObservableUpDownCounter(
		"process.runtime.go.mem.heap_released",
		otelmetric.WithUnit("By"),
		otelmetric.WithDescription("Bytes of idle spans whose physical memory has been returned to the OS"),
	); err != nil {
		return err
	}

	if heapSys, err = r.meter.Int64ObservableUpDownCounter(
		"process.runtime.go.mem.heap_sys",
		otelmetric.WithUnit("By"),
		otelmetric.WithDescription("Bytes of heap memory obtained from the OS"),
	); err != nil {
		return err
	}

	if liveObjects, err = r.meter.Int64ObservableUpDownCounter(
		"process.runtime.go.mem.live_objects",
		otelmetric.WithDescription("Number of live objects is the number of cumulative Mallocs - Frees"),
	); err != nil {
		return err
	}

	if gcCount, err = r.meter.Int64ObservableCounter(
		"process.runtime.go.gc.count",
		otelmetric.WithDescription("Number of completed garbage collection cycles"),
	); err != nil {
		return err
	}

	if goroutinesCount, err = r.meter.Int64ObservableUpDownCounter(
		"process.runtime.go.goroutines.count",
		otelmetric.WithDescription("Number of goroutines that currently exist"),
	); err != nil {
		return err
	}

	if goVersion, err = r.meter.Int64ObservableUpDownCounter(
		"process.runtime.go.info",
		otelmetric.WithDescription("Information about the Go runtime environment"),
	); err != nil {
		return err
	}

	// Note that the following could be derived as a sum of
	// individual pauses, but we may lose individual pauses if the
	// observation interval is too slow.
	if pauseTotalNs, err = r.meter.Int64ObservableCounter(
		"process.runtime.go.gc.pause_total",
		otelmetric.WithUnit("ns"),
		otelmetric.WithDescription("Cumulative nanoseconds in GC stop-the-world pauses since the program started"),
	); err != nil {
		return err
	}

	if gcPauseNs, err = r.meter.Int64Histogram(
		"process.runtime.go.gc.pause",
		otelmetric.WithUnit("ns"),
		otelmetric.WithDescription("Amount of nanoseconds in GC stop-the-world pauses"),
	); err != nil {
		return err
	}

	serverUptime, err := r.meter.Int64ObservableGauge(
		"server.uptime",
		otelmetric.WithUnit("s"),
		otelmetric.WithDescription("Seconds since the server started. Resets between router config changes."),
	)
	if err != nil {
		return err
	}

	p, err := process.NewProcess(int32(os.Getpid()))
	if err != nil {
		return err
	}

	now := time.Now()

	rc, err := r.meter.RegisterCallback(
		func(ctx context.Context, o otelmetric.Observer) error {
			lock.Lock()
			defer lock.Unlock()

			/**
			* Process CPU usage. Support on Linux, Mac, and Windows but not on BSD.
			 */

			processCpuUsageInPercent, err := p.PercentWithContext(ctx, 0)

			// If the process CPU usage is not available, we just skip the observation.
			if err == nil {
				o.ObserveFloat64(processCPUUsage,
					processCpuUsageInPercent,
					otelmetric.WithAttributes(r.baseAttributes...),
				)
			}

			/**
			* Server uptime. Everytime the store is reloaded, the server uptime is reset.
			 */

			o.ObserveInt64(serverUptime,
				int64(time.Since(now).Seconds()),
				otelmetric.WithAttributes(r.baseAttributes...),
			)

			/*
			* Process uptime
			 */
			o.ObserveInt64(runtimeUptime, int64(time.Since(r.processStartTime).Seconds()),
				otelmetric.WithAttributes(r.baseAttributes...),
			)

			/**
			* Go runtime metrics
			 */

			o.ObserveInt64(goVersion, 1,
				otelmetric.WithAttributes(AttributeGoInfoVersion.String(goruntime.Version())),
				otelmetric.WithAttributes(r.baseAttributes...),
			)
			o.ObserveInt64(goroutinesCount, int64(goruntime.NumGoroutine()), otelmetric.WithAttributes(r.baseAttributes...))

			/**
			* Go runtime memory stats
			 */

			now := time.Now()
			if now.Sub(lastMemStats) >= DefaultMinimumReadMemStatsInterval {
				goruntime.ReadMemStats(&memStats)
				lastMemStats = now
			}

			o.ObserveInt64(heapAlloc, int64(memStats.HeapAlloc), otelmetric.WithAttributes(r.baseAttributes...))
			o.ObserveInt64(heapIdle, int64(memStats.HeapIdle), otelmetric.WithAttributes(r.baseAttributes...))
			o.ObserveInt64(heapInuse, int64(memStats.HeapInuse), otelmetric.WithAttributes(r.baseAttributes...))
			o.ObserveInt64(heapObjects, int64(memStats.HeapObjects), otelmetric.WithAttributes(r.baseAttributes...))
			o.ObserveInt64(heapReleased, int64(memStats.HeapReleased), otelmetric.WithAttributes(r.baseAttributes...))
			o.ObserveInt64(heapSys, int64(memStats.HeapSys), otelmetric.WithAttributes(r.baseAttributes...))
			o.ObserveInt64(liveObjects, int64(memStats.Mallocs-memStats.Frees), otelmetric.WithAttributes(r.baseAttributes...))
			o.ObserveInt64(gcCount, int64(memStats.NumGC), otelmetric.WithAttributes(r.baseAttributes...))
			o.ObserveInt64(pauseTotalNs, int64(memStats.PauseTotalNs), otelmetric.WithAttributes(r.baseAttributes...))

			computeGCPauses(ctx, gcPauseNs, memStats.PauseNs[:], lastNumGC, memStats.NumGC)

			lastNumGC = memStats.NumGC

			return nil
		},
		heapAlloc,
		heapIdle,
		heapInuse,
		heapObjects,
		heapReleased,
		heapSys,
		liveObjects,
		goroutinesCount,
		goVersion,

		gcCount,
		pauseTotalNs,

		processCPUUsage,
		runtimeUptime,
		serverUptime,
	)

	if err != nil {
		return err
	}

	r.instrumentRegistrations = append(r.instrumentRegistrations, rc)

	return nil
}