projects/alloydb-autoscaler/src/autoscaler-core/common/open-telemetry-meter-provider.ts (293 lines of code) (raw):

/** * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License */ /** @fileoverview Provides functionality to export metrics to Open Telemetry. */ import {MetricExporter as GcpMetricExporter} from '@google-cloud/opentelemetry-cloud-monitoring-exporter'; import {GcpDetectorSync} from '@google-cloud/opentelemetry-resource-util'; import { diag, DiagLogLevel, Exception as OpenTelemetryException, MeterProvider as IMeterProvider, } from '@opentelemetry/api'; import { loggingErrorHandler as openTelemetryLoggingErrorHandler, setGlobalErrorHandler as openTelemetrySetGlobalErrorHandler, } from '@opentelemetry/core'; import {OTLPMetricExporter} from '@opentelemetry/exporter-metrics-otlp-grpc'; import {IResource, Resource} from '@opentelemetry/resources'; import {SEMRESATTRS_K8S_POD_NAME} from '@opentelemetry/semantic-conventions'; import { MeterProvider, PeriodicExportingMetricReader, PushMetricExporter, } from '@opentelemetry/sdk-metrics'; import { promiseWithResolvers, PromiseWithResolvers, sleep, } from './promise-wrapper'; import {DiagToPinoLogger} from './diag-logger-wrapper'; import {createLogger} from './logger'; import pino from 'pino'; /** Exporting modes for OpenTelemetry. */ export enum ExporterMode { GCM_ONLY_FLUSHING, OTEL_PERIODIC, OTEL_ONLY_FLUSHING, } type ExporterParams = { PERIODIC_EXPORT_INTERVAL: number; FLUSH_MIN_INTERVAL: number; FLUSH_MAX_ATTEMPTS: number; FLUSH_ENABLED: boolean; }; const EXPORTER_PARAMETERS: Record<ExporterMode, ExporterParams> = { // GCM direct pushing is only done in Cloud functions deployments, where // we only flush directly. // [ExporterMode.GCM_ONLY_FLUSHING]: { PERIODIC_EXPORT_INTERVAL: 0x7fffffff, // approx 24 days in milliseconds FLUSH_MIN_INTERVAL: 10_000, FLUSH_MAX_ATTEMPTS: 6, FLUSH_ENABLED: true, }, // OTEL collector cannot handle receiving metrics from a single process // more frequently than its batching timeout, as it does not aggregate // them and reports the multiple metrics to the upstream metrics management // interface (eg GCM) which will then cause Duplicate TimeSeries errors. // // So when using flushing, disable periodic export, and when using periodic // export, disable flushing! // // OTEL collector mode is set by specifying the environment variable // OTEL_COLLECTOR_URL which is the address of the collector, // and whether to use flushing or periodic export is determined // by the environment variable OTEL_IS_LONG_RUNNING_PROCESS [ExporterMode.OTEL_ONLY_FLUSHING]: { PERIODIC_EXPORT_INTERVAL: 0x7fffffff, // approx 24 days in milliseconds FLUSH_MIN_INTERVAL: 15_000, FLUSH_MAX_ATTEMPTS: 6, FLUSH_ENABLED: true, }, [ExporterMode.OTEL_PERIODIC]: { PERIODIC_EXPORT_INTERVAL: 20_000, // OTEL collector batches every 10s FLUSH_MIN_INTERVAL: 0, FLUSH_MAX_ATTEMPTS: 0, FLUSH_ENABLED: false, }, }; /** Provides a Meter to interact with OpenTelemetry */ export interface IOpenTelemetryMeterProvider { getMeterProvider(): Promise<IMeterProvider>; tryFlush(): Promise<void>; setTryFlushIsEnabled(isTryFlushEnabled: boolean): void; } /** Exporter to interact with OpenTelemetry. */ export class OpenTelemetryMeterProvider implements IOpenTelemetryMeterProvider { protected static resourceAttributes: Record<string, string> = {}; protected static exporterMode: ExporterMode | null; protected static exporterParams: ExporterParams | null; protected static meterProvider: MeterProvider | null; protected static isInitialized: boolean = false; protected static pendingInit: PromiseWithResolvers<null> | null = null; // Creating a default, but this is will be overriden when initialized. private static diagPinoLogger: DiagToPinoLogger = new DiagToPinoLogger( createLogger() ); protected static openTelemetryErrorCount: number = 0; protected static pendingFlush: PromiseWithResolvers<null> | null = null; protected static isTryFlushEnabled: boolean = true; protected static lastForceFlushTime: number = 0; /** Initializes OpenTelemetryMeterProvider. */ constructor(logger: pino.Logger) { OpenTelemetryMeterProvider.initializeTelemetry(); OpenTelemetryMeterProvider.diagPinoLogger = new DiagToPinoLogger(logger); } /** * Gets the MeterProvider to interact with OpenTelemetry. * @return OpenTelemetry meter provider. */ async getMeterProvider(): Promise<MeterProvider> { if (!OpenTelemetryMeterProvider.meterProvider) { await OpenTelemetryMeterProvider.initializeTelemetry(); } if (!OpenTelemetryMeterProvider.meterProvider) { throw new Error('OpenTelemetry initialization failed.'); } return OpenTelemetryMeterProvider.meterProvider; } /** * Specify whether the tryFlush function should try to flush or not. * * In long-running processes, disabling flushing will give better results * while in short-lived processes, without flushing, counters may not * be reported to cloud monitoring. */ setTryFlushIsEnabled(isTryFlushEnabled: boolean) { OpenTelemetryMeterProvider.isTryFlushEnabled = isTryFlushEnabled; } /** * Sets the diag logger to a custom one. * @param logger DiagToPinoLogger to use for logging. */ static setLogger(logger: DiagToPinoLogger) { OpenTelemetryMeterProvider.diagPinoLogger = logger; } /** * Try to flush any as-yet-unsent counters to cloud montioring. * if setTryFlushEnabled(false) has been called, this function is a no-op. * * Will only actually call forceFlush once every MIN_FORCE_FLUSH_INTERVAL * seconds. It will retry if an error is detected during flushing. * * (Note on transient errors: in a long running process, these are not an * issue as periodic export will succeed next time, but in short-lived * processes there is not a 'next time', so we need to check for errors * and retry) */ async tryFlush() { if (OpenTelemetryMeterProvider.pendingInit) { await OpenTelemetryMeterProvider.pendingInit.promise; } if ( !OpenTelemetryMeterProvider.isTryFlushEnabled || !OpenTelemetryMeterProvider.exporterParams?.FLUSH_ENABLED ) { // Flushing disabled, do nothing! return; } // Avoid simultaneous flushing. if (OpenTelemetryMeterProvider.pendingFlush) { await OpenTelemetryMeterProvider.pendingFlush.promise; return; } OpenTelemetryMeterProvider.pendingFlush = promiseWithResolvers(); if ( !OpenTelemetryMeterProvider.exporterParams || !OpenTelemetryMeterProvider.meterProvider ) { // This should not happen since we are awaiting the init. // Done to avoid typing issues. throw new Error('Counters have not been initialized.'); } try { await OpenTelemetryMeterProvider.waitUntilNextFlushTime(); // OpenTelemetry's forceFlush() will always succeed, even if the backend // fails and reports an error... // // So we use the OpenTelemetry Global Error Handler installed above // to keep a count of the number of errors reported, and if an error // is reported during a flush, we wait a while and try again. // Not perfect, but the best we can do. // // To avoid end-users seeing these errors, we supress error messages // until the very last flush attempt. // // Note that if the OpenTelemetry metrics are exported to Google Cloud // Monitoring, the first time a counter is used, it will fail to be // exported and will need to be retried. let attempts = OpenTelemetryMeterProvider.exporterParams.FLUSH_MAX_ATTEMPTS; while (attempts > 0) { const beforeFlushErrorCount = OpenTelemetryMeterProvider.openTelemetryErrorCount; // Suppress OTEL Diag error messages on all but the last flush attempt. OpenTelemetryMeterProvider.diagPinoLogger.suppressErrors = attempts > 1; await OpenTelemetryMeterProvider.meterProvider.forceFlush(); OpenTelemetryMeterProvider.diagPinoLogger.suppressErrors = false; OpenTelemetryMeterProvider.lastForceFlushTime = Date.now(); const afterFlushErrorCount = OpenTelemetryMeterProvider.openTelemetryErrorCount; if (beforeFlushErrorCount === afterFlushErrorCount) { // Success! return; } OpenTelemetryMeterProvider.diagPinoLogger.warn( 'Opentelemetry reported errors during flushing, retrying.' ); await sleep( OpenTelemetryMeterProvider.exporterParams.FLUSH_MIN_INTERVAL ); attempts--; } if (attempts <= 0) { OpenTelemetryMeterProvider.diagPinoLogger.error( 'Failed to flush counters after ' + OpenTelemetryMeterProvider.exporterParams.FLUSH_MAX_ATTEMPTS + 'attempts - see OpenTelemetry logging' ); } } catch (e) { OpenTelemetryMeterProvider.diagPinoLogger.error( `Error while flushing counters: ${e}` ); } finally { OpenTelemetryMeterProvider.pendingFlush.resolve(null); OpenTelemetryMeterProvider.pendingFlush = null; } } /** * Initialize the OpenTelemetry metric exporters. * * If called more than once, will wait for the first call to complete. */ static async initializeTelemetry(): Promise<void> { if (OpenTelemetryMeterProvider.isInitialized) return; if (OpenTelemetryMeterProvider.pendingInit) { await OpenTelemetryMeterProvider.pendingInit.promise; return; } OpenTelemetryMeterProvider.pendingInit = promiseWithResolvers(); OpenTelemetryMeterProvider.initializeOpenTelemetryLogging(); try { OpenTelemetryMeterProvider.diagPinoLogger.debug('initializing metrics'); const resources = await OpenTelemetryMeterProvider.getResources(); const [exporter, exporterMode] = OpenTelemetryMeterProvider.getExporterAndMode(); OpenTelemetryMeterProvider.exporterMode = exporterMode; OpenTelemetryMeterProvider.exporterParams = EXPORTER_PARAMETERS[exporterMode]; await OpenTelemetryMeterProvider.setMeterProvider(resources, exporter); } catch (e) { OpenTelemetryMeterProvider.pendingInit.reject(e); } OpenTelemetryMeterProvider.isInitialized = true; OpenTelemetryMeterProvider.pendingInit.resolve(null); } /** Sets up OpenTelemetry client libraries for logging. */ private static initializeOpenTelemetryLogging() { diag.setLogger(OpenTelemetryMeterProvider.diagPinoLogger, { logLevel: DiagLogLevel.INFO, suppressOverrideMessage: true, }); openTelemetrySetGlobalErrorHandler((error: OpenTelemetryException) => { OpenTelemetryMeterProvider.openTelemetryErrorCount++; // Delegate to Otel's own error handler for stringification. openTelemetryLoggingErrorHandler()(error); }); } /** Gets resources. */ private static async getResources(): Promise<IResource> { const resources = new GcpDetectorSync() .detect() .merge(new Resource(OpenTelemetryMeterProvider.resourceAttributes)); if (resources.waitForAsyncAttributes) { await resources.waitForAsyncAttributes(); } if (process.env.KUBERNETES_SERVICE_HOST) { if (process.env.K8S_POD_NAME) { OpenTelemetryMeterProvider.resourceAttributes[ SEMRESATTRS_K8S_POD_NAME ] = process.env.K8S_POD_NAME; } else { OpenTelemetryMeterProvider.diagPinoLogger.warn( 'WARNING: running under Kubernetes, but K8S_POD_NAME ' + 'environment variable is not set. ' + 'This may lead to Send TimeSeries errors' ); } } return resources; } /** Gets the exporter and its mode. */ private static getExporterAndMode(): [PushMetricExporter, ExporterMode] { let exporterMode; if (process.env.OTEL_COLLECTOR_URL) { switch (process.env.OTEL_IS_LONG_RUNNING_PROCESS) { case 'true': exporterMode = ExporterMode.OTEL_PERIODIC; break; case 'false': exporterMode = ExporterMode.OTEL_ONLY_FLUSHING; break; default: throw new Error( 'Invalid value for env var OTEL_IS_LONG_RUNNING_PROCESS: ' + `"${process.env.OTEL_IS_LONG_RUNNING_PROCESS}"` ); } OpenTelemetryMeterProvider.diagPinoLogger.info( `Counters mode: ${exporterMode} OTEL collector: ` + process.env.OTEL_COLLECTOR_URL ); return [ new OTLPMetricExporter({ url: process.env.OTEL_COLLECTOR_URL, // @ts-expect-error: CompressionAlgorithm.NONE is not exported. compression: 'none', }), exporterMode, ]; } exporterMode = ExporterMode.GCM_ONLY_FLUSHING; OpenTelemetryMeterProvider.diagPinoLogger.info( `Counters mode: ${exporterMode} using GCP monitoring` ); return [ new GcpMetricExporter({prefix: 'workload.googleapis.com'}), exporterMode, ]; } /** Creates the meter provider for open telemetry. */ private static async setMeterProvider( resources: IResource, exporter: PushMetricExporter ) { if ( // Do not use bare !OpenTelemetryMeterProvider.exporterMode since enum // could be 0 while holding the first value. OpenTelemetryMeterProvider.exporterMode === null || OpenTelemetryMeterProvider.exporterMode === undefined || !OpenTelemetryMeterProvider.exporterParams ) { // This should not happen since exporter mode will be called and defined // first. This avoids null type errors. throw new Error('Exporter mode must be defined first.'); } OpenTelemetryMeterProvider.meterProvider = new MeterProvider({ resource: resources, readers: [ new PeriodicExportingMetricReader({ exportIntervalMillis: OpenTelemetryMeterProvider.exporterParams.PERIODIC_EXPORT_INTERVAL, exportTimeoutMillis: OpenTelemetryMeterProvider.exporterParams.PERIODIC_EXPORT_INTERVAL, exporter: exporter, }), ], }); } /** Waits until the next force flush time. */ private static async waitUntilNextFlushTime() { if ( OpenTelemetryMeterProvider.exporterParams === null || OpenTelemetryMeterProvider.exporterParams === undefined ) { // This should not happen since we are awaiting the init. // Done to avoid typing issues. throw new Error('Counters have not been initialized.'); } // If flushed recently, wait for the min interval to pass. const millisUntilNextForceFlush = OpenTelemetryMeterProvider.lastForceFlushTime + OpenTelemetryMeterProvider.exporterParams.FLUSH_MIN_INTERVAL - Date.now(); if (millisUntilNextForceFlush > 0) { // Wait until we can force flush again! OpenTelemetryMeterProvider.diagPinoLogger.debug( 'Counters.tryFlush() waiting until flushing again' ); await sleep(millisUntilNextForceFlush); } } }