cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/otel/otel.go (289 lines of code) (raw):
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package otelgo
import (
"context"
"errors"
"net/http"
"net/url"
"strings"
"time"
"github.com/google/uuid"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
type Attributes struct {
Method string
Status string
QueryType string
}
var (
attributeKeyDatabase = attribute.Key("database")
attributeKeyMethod = attribute.Key("method")
attributeKeyStatus = attribute.Key("status")
attributeKeyInstance = attribute.Key("instance")
attributeKeyQueryType = attribute.Key("query_type")
)
// OTelConfig holds configuration for OpenTelemetry.
type OTelConfig struct {
TracerEndpoint string
MetricEndpoint string
ServiceName string
TraceSampleRatio float64
OTELEnabled bool
Database string
Instance string
HealthCheckEnabled bool
HealthCheckEp string
ServiceVersion string
}
const (
requestCountMetric = "bigtable/cassandra_adapter/request_count"
latencyMetric = "bigtable/cassandra_adapter/roundtrip_latencies"
)
// OpenTelemetry provides methods to setup tracing and metrics.
type OpenTelemetry struct {
Config *OTelConfig
tracer trace.Tracer
requestCount metric.Int64Counter
requestLatency metric.Int64Histogram
logger *zap.Logger
}
// NewOpenTelemetry() initializes OpenTelemetry tracing and metrics components.
// It sets up the tracer and meter providers, configures health checks (if enabled),
// and returns an OpenTelemetry instance along with a shutdown function.
//
// Parameters:
// - ctx: Context for managing OpenTelemetry lifecycle.
// - config: Configuration struct for OpenTelemetry settings.
// - logger: Logger instance for capturing OpenTelemetry logs.
//
// Returns:
// - *OpenTelemetry: A configured instance of OpenTelemetry.
// - func(context.Context) error: A shutdown function to clean up resources.
// - error: An error if initialization fails.
func NewOpenTelemetry(ctx context.Context, config *OTelConfig, logger *zap.Logger) (*OpenTelemetry, func(context.Context) error, error) {
otelInst := &OpenTelemetry{Config: config, logger: logger}
var err error
otelInst.Config.OTELEnabled = config.OTELEnabled
if !config.OTELEnabled {
return otelInst, nil, nil
}
if config.HealthCheckEnabled {
resp, err := http.Get("http://" + config.HealthCheckEp)
if err != nil {
return nil, nil, err
}
if resp.StatusCode != 200 {
return nil, nil, errors.New("OTEL collector service is not up and running")
}
logger.Info("OTEL health check complete")
}
var shutdownFuncs []func(context.Context) error
otelResource := buildOtelResource(ctx, config)
// Initialize tracerProvider
tracerProvider, err := InitTracerProvider(ctx, config, otelResource)
if err != nil {
logger.Error("error while initializing the tracer provider", zap.Error(err))
return nil, nil, err
}
otel.SetTracerProvider(tracerProvider)
otelInst.tracer = tracerProvider.Tracer(config.ServiceName)
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
// Initialize MeterProvider
meterProvider, err := InitMeterProvider(ctx, config, otelResource)
if err != nil {
logger.Error("error while initializing the meter provider", zap.Error(err))
return nil, nil, err
}
otel.SetMeterProvider(meterProvider)
meter := meterProvider.Meter(config.ServiceName)
shutdownFuncs = append(shutdownFuncs, meterProvider.Shutdown)
shutdown := shutdownOpenTelemetryComponents(shutdownFuncs)
otelInst.requestCount, err = meter.Int64Counter(requestCountMetric, metric.WithDescription("Records metric for number of query requests coming in"), metric.WithUnit("1"))
if err != nil {
logger.Error("error during registering instrument for metric bigtable/cassandra_adapter/request_count", zap.Error(err))
return nil, nil, err
}
otelInst.requestLatency, err = meter.Int64Histogram(latencyMetric,
metric.WithDescription("Records latency for all query operations"),
metric.WithExplicitBucketBoundaries(0.0, 0.0010, 0.0013, 0.0016, 0.0020, 0.0024, 0.0031, 0.0038, 0.0048, 0.0060,
0.0075, 0.0093, 0.0116, 0.0146, 0.0182, 0.0227, 0.0284, 0.0355, 0.0444, 0.0555, 0.0694, 0.0867,
0.1084, 0.1355, 0.1694, 0.2118, 0.2647, 0.3309, 0.4136, 0.5170, 0.6462, 0.8078, 1.0097, 1.2622,
1.5777, 1.9722, 2.4652, 3.0815, 3.8519, 4.8148, 6.0185, 7.5232, 9.4040, 11.7549, 14.6937, 18.3671,
22.9589, 28.6986, 35.8732, 44.8416, 56.0519, 70.0649, 87.5812, 109.4764, 136.8456, 171.0569, 213.8212,
267.2765, 334.0956, 417.6195, 522.0244, 652.5304),
metric.WithUnit("ms"))
if err != nil {
logger.Error("error during registering instrument for metric bigtable/cassandra_adapter/roundtrip_latencies", zap.Error(err))
return nil, nil, err
}
return otelInst, shutdown, nil
}
// shutdownOpenTelemetryComponents() aggregates multiple shutdown functions into a single callable function.
// It iterates over all shutdown functions, executing them sequentially.
//
// Parameters:
// - shutdownFuncs: A slice of shutdown functions for OpenTelemetry components.
//
// Returns:
// - func(context.Context) error: A single shutdown function that cleans up all initialized components.
func shutdownOpenTelemetryComponents(shutdownFuncs []func(context.Context) error) func(context.Context) error {
return func(ctx context.Context) error {
var shutdownErr error
for _, shutdownFunc := range shutdownFuncs {
if err := shutdownFunc(ctx); err != nil {
shutdownErr = err
}
}
return shutdownErr
}
}
// InitTracerProvider() configures and initializes an OpenTelemetry TracerProvider.
// It sets up a gRPC-based OTLP trace exporter and applies the sampling strategy.
//
// Parameters:
// - ctx: Context for managing initialization.
// - config: OpenTelemetry configuration settings.
// - resource: OpenTelemetry resource with metadata.
//
// Returns:
// - *sdktrace.TracerProvider: A configured TracerProvider instance.
// - error: An error if initialization fails.
func InitTracerProvider(ctx context.Context, config *OTelConfig, resource *resource.Resource) (*sdktrace.TracerProvider, error) {
sampler := sdktrace.TraceIDRatioBased(config.TraceSampleRatio)
if config.TracerEndpoint == "" {
return nil, errors.New("tracer endpoint cannot be empty")
}
// Basic validation for incorrect endpoint format
if !isValidEndpoint(config.TracerEndpoint) {
return nil, errors.New("invalid tracer endpoint format")
}
traceExporter, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithEndpoint(config.TracerEndpoint),
otlptracegrpc.WithInsecure(),
)
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(traceExporter),
sdktrace.WithResource(resource),
sdktrace.WithSampler(sdktrace.ParentBased(sampler)),
)
return tp, nil
}
// InitMeterProvider() initializes an OpenTelemetry MeterProvider for collecting application metrics.
// It configures a gRPC exporter to send metrics data and applies filtering to exclude unnecessary gRPC metrics.
//
// Parameters:
// - ctx: Context for managing initialization.
// - config: OpenTelemetry configuration settings.
// - resource: OpenTelemetry resource with metadata.
//
// Returns:
// - *sdkmetric.MeterProvider: A configured MeterProvider instance.
// - error: An error if initialization fails.
func InitMeterProvider(ctx context.Context, config *OTelConfig, resource *resource.Resource) (*sdkmetric.MeterProvider, error) {
if config.MetricEndpoint == "" {
return nil, errors.New("metric endpoint cannot be empty")
}
// Basic validation for incorrect endpoint format
if !isValidEndpoint(config.MetricEndpoint) {
return nil, errors.New("invalid tracer endpoint format")
}
var views []sdkmetric.View
me, err := otlpmetricgrpc.New(ctx,
otlpmetricgrpc.WithEndpoint(config.MetricEndpoint),
otlpmetricgrpc.WithInsecure(),
)
if err != nil {
return nil, err
}
// Define views to filter out unwanted gRPC metrics
views = []sdkmetric.View{
sdkmetric.NewView(
sdkmetric.Instrument{Name: "rpc.client.*"}, // Wildcard pattern to match gRPC client metrics
sdkmetric.Stream{Aggregation: sdkmetric.AggregationDrop{}}, // Drop these metrics
)}
mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(me)),
sdkmetric.WithResource(resource),
sdkmetric.WithView(views...),
)
return mp, nil
}
// buildOtelResource() creates an OpenTelemetry resource containing metadata about the service.
// It uses GCP resource detectors and falls back to manually provided attributes if necessary.
//
// Parameters:
// - ctx: Context for managing initialization.
// - config: OpenTelemetry configuration settings.
//
// Returns:
// - *resource.Resource: A configured OpenTelemetry resource containing metadata.
func buildOtelResource(ctx context.Context, config *OTelConfig) *resource.Resource {
res, err := resource.New(ctx,
resource.WithSchemaURL(semconv.SchemaURL),
// Use the GCP resource detector!
resource.WithDetectors(gcp.NewDetector()),
// Keep the default detectors
resource.WithTelemetrySDK(),
resource.WithAttributes(
semconv.ServiceNameKey.String(config.ServiceName),
semconv.ServiceInstanceIDKey.String(uuid.New().String()),
semconv.ServiceVersionKey.String(config.ServiceVersion),
),
)
if err != nil {
// Default resource
return resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(config.ServiceName),
semconv.ServiceInstanceIDKey.String(uuid.New().String()),
semconv.ServiceVersionKey.String(config.ServiceVersion),
)
}
return res
}
// StartSpan() creates and starts a new trace span in OpenTelemetry.
// If OpenTelemetry is disabled, it returns the original context.
//
// Parameters:
// - ctx: The current execution context.
// - name: The name of the span to be created.
// - attrs: A list of attributes to associate with the span.
//
// Returns:
// - context.Context: The updated context containing the new span.
// - trace.Span: The created span instance.
func (o *OpenTelemetry) StartSpan(ctx context.Context, name string, attrs []attribute.KeyValue) (context.Context, trace.Span) {
if !o.Config.OTELEnabled {
return ctx, nil
}
ctx, span := o.tracer.Start(ctx, name, trace.WithAttributes(attrs...))
return ctx, span
}
// RecordError() logs an error inside an active trace span in OpenTelemetry.
// It updates the span's status to indicate an error has occurred.
//
// Parameters:
// - span: The active trace span where the error should be recorded.
// - err: The error to be recorded in the span. If nil, the span is marked as OK.
func (o *OpenTelemetry) RecordError(span trace.Span, err error) {
if !o.Config.OTELEnabled {
return
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
}
// EndSpan() finalizes the current span in OpenTelemetry.
// If OpenTelemetry is disabled, this function does nothing.
//
// Parameters:
// - span: The span to be ended.
func (o *OpenTelemetry) EndSpan(span trace.Span) {
if !o.Config.OTELEnabled {
return
}
span.End()
}
// RecordMetrics() records request count and latency metrics in OpenTelemetry.
// It determines whether the request was successful or failed based on the error parameter.
//
// Parameters:
// - ctx: The execution context for OpenTelemetry.
// - method: The name of the method being recorded.
// - startTime: The start time of the request for latency calculation.
// - queryType: The type of query being executed (e.g., "select", "insert").
// - err: The error encountered, if any. Used to determine success/failure status.
func (o *OpenTelemetry) RecordMetrics(ctx context.Context, method string, startTime time.Time, queryType string, err error) {
status := "OK"
if err != nil {
status = "failure"
}
o.RecordRequestCountMetric(ctx, Attributes{
Method: method,
Status: status,
QueryType: queryType,
})
o.RecordLatencyMetric(ctx, startTime, Attributes{
Method: method,
QueryType: queryType,
})
}
// RecordLatencyMetric() records the latency of an operation in OpenTelemetry.
// It dynamically builds metric attributes before sending the recorded value.
//
// Parameters:
// - ctx: The execution context.
// - startTime: The time when the operation started, used for latency calculation.
// - attrs: Additional attributes to associate with the latency metric.
func (o *OpenTelemetry) RecordLatencyMetric(ctx context.Context, startTime time.Time, attrs Attributes) {
if !o.Config.OTELEnabled {
return
}
// Build attributes dynamically
attr := []attribute.KeyValue{
attributeKeyInstance.String(o.Config.Instance),
attributeKeyDatabase.String(o.Config.Database),
attributeKeyMethod.String(attrs.Method),
attributeKeyQueryType.String(attrs.QueryType),
}
attr = append(attr, attributeKeyMethod.String(attrs.Method))
attr = append(attr, attributeKeyQueryType.String(attrs.QueryType))
o.requestLatency.Record(ctx, int64(time.Since(startTime).Milliseconds()), metric.WithAttributes(attr...))
}
// RecordRequestCountMetric() increments the request count metric in OpenTelemetry.
// It dynamically builds metric attributes before sending the recorded value.
//
// Parameters:
// - ctx: The execution context.
// - attrs: Attributes associated with the request (e.g., method, status).
func (o *OpenTelemetry) RecordRequestCountMetric(ctx context.Context, attrs Attributes) {
if !o.Config.OTELEnabled {
return
}
// Build attributes dynamically
attr := []attribute.KeyValue{
attributeKeyInstance.String(o.Config.Instance),
attributeKeyDatabase.String(o.Config.Database),
attributeKeyMethod.String(attrs.Method),
attributeKeyQueryType.String(attrs.QueryType),
attributeKeyStatus.String(attrs.Status),
}
attr = append(attr, attributeKeyMethod.String(attrs.Method))
attr = append(attr, attributeKeyQueryType.String(attrs.QueryType))
attr = append(attr, attributeKeyStatus.String(attrs.Status))
o.requestCount.Add(ctx, 1, metric.WithAttributes(attr...))
}
// AddAnnotation() adds an event annotation to the active span in the given context.
//
// Parameters:
// - ctx: The execution context containing the span.
// - event: The event name to be added as an annotation.
func AddAnnotation(ctx context.Context, event string) {
span := trace.SpanFromContext(ctx)
span.AddEvent(event)
}
// AddAnnotationWithAttr() adds an event annotation with attributes to the active span.
//
// Parameters:
// - ctx: The execution context containing the span.
// - event: The event name to be added as an annotation.
// - attr: A list of attributes to attach to the annotation.
func AddAnnotationWithAttr(ctx context.Context, event string, attr []attribute.KeyValue) {
span := trace.SpanFromContext(ctx)
span.AddEvent(event, trace.WithAttributes(attr...))
}
// isValidEndpoint checks if the given endpoint is a valid host:port format
func isValidEndpoint(endpoint string) bool {
if strings.Contains(endpoint, "://") {
parsedURL, err := url.Parse(endpoint)
if err != nil {
return false
}
// Check if the original endpoint string had an empty host.
if strings.HasPrefix(endpoint, parsedURL.Scheme+"://:") {
return false
}
if parsedURL.Host == "" || parsedURL.Port() == "" {
return false
}
return true
}
parts := strings.Split(endpoint, ":")
return len(parts) == 2 && parts[0] != "" && parts[1] != ""
}