systemtest/cmd/sendotlp/main.go (319 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"context"
"crypto/tls"
"errors"
"flag"
"fmt"
"net"
"net/url"
"os"
"time"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.8.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zapgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
grpcinsecure "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
)
var (
endpoint = flag.String(
"endpoint",
getenvDefault("OTEL_EXPORTER_OTLP_ENDPOINT", "http://127.0.0.1:8200"),
"target URL to which sendotlp will send spans and metrics ($OTEL_EXPORTER_OTLP_ENDPOINT)",
)
secretToken = flag.String(
"secret-token",
"",
"Elastic APM secret token. Recommended over $OTEL_EXPORTER_OTLP_HEADERS as logs exporter does not respect $OTEL_EXPORTER_OTLP_HEADERS. Note: setting this overrides $OTEL_EXPORTER_OTLP_HEADERS",
)
apiKey = flag.String(
"api-key",
"",
"Elastic APM API key. Recommended over $OTEL_EXPORTER_OTLP_HEADERS as logs exporter does not respect $OTEL_EXPORTER_OTLP_HEADERS. Note: setting this overrides $OTEL_EXPORTER_OTLP_HEADERS and -secretToken",
)
logLevel = zap.LevelFlag(
"loglevel", zapcore.InfoLevel,
"set log level to one of: DEBUG, INFO (default), WARN, ERROR, DPANIC, PANIC, FATAL",
)
protocol = flag.String(
"protocol",
getenvDefault("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc"),
"set transport protocol to one of: grpc (default), http/protobuf",
)
insecure = flag.Bool(
"insecure",
false,
"skip the server's TLS certificate verification",
)
)
func getenvDefault(key, defaultVal string) string {
val := os.Getenv(key)
if val != "" {
return val
}
return defaultVal
}
func main() {
flag.Parse()
zapcfg := zap.NewProductionConfig()
zapcfg.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder
zapcfg.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
zapcfg.Encoding = "console"
zapcfg.Level = zap.NewAtomicLevelAt(*logLevel)
logger, err := zapcfg.Build()
if err != nil {
panic(err)
}
defer logger.Sync()
grpclog.SetLogger(zapgrpc.NewLogger(logger, zapgrpc.WithDebug()))
if err := Main(context.Background(), logger.Sugar()); err != nil {
logger.Fatal("error sending data", zap.Error(err))
}
}
func Main(ctx context.Context, logger *zap.SugaredLogger) (result error) {
endpointURL, err := url.Parse(*endpoint)
if err != nil {
return fmt.Errorf("failed to parse endpoint: %w", err)
}
switch endpointURL.Scheme {
case "http":
if endpointURL.Port() == "" {
endpointURL.Host = net.JoinHostPort(endpointURL.Host, "80")
}
case "https":
if endpointURL.Port() == "" {
endpointURL.Host = net.JoinHostPort(endpointURL.Host, "443")
}
default:
return fmt.Errorf("endpoint must be prefixed with http:// or https://")
}
otlpExporters, err := newOTLPExporters(ctx, endpointURL)
if err != nil {
return err
}
tracerProvider := sdktrace.NewTracerProvider(sdktrace.WithSyncer(otlpExporters.trace))
defer func() {
if err := tracerProvider.Shutdown(ctx); err != nil {
result = errors.Join(result,
fmt.Errorf("error shutting down tracer provider: %w", err),
)
}
}()
meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(
otlpExporters.metric,
sdkmetric.WithInterval(time.Hour),
)),
sdkmetric.WithView(sdkmetric.NewView(
sdkmetric.Instrument{Name: "*"},
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: []float64{1, 10, 100, 1000, 10000},
},
},
)),
)
defer func() {
if err := meterProvider.Shutdown(ctx); err != nil {
result = errors.Join(result,
fmt.Errorf("error shutting down meter provider: %w", err),
)
}
}()
logger.Infof("sending OTLP data to %s (%s)", endpointURL.String(), *protocol)
// Generate some data. Metrics are sent when the controller is stopped, traces and logs
// are sent immediately.
if err := generateMetrics(ctx, meterProvider.Meter("sendotlp")); err != nil {
return err
}
if err := generateSpans(ctx, tracerProvider.Tracer("sendotlp")); err != nil {
return err
}
if err := generateLogs(ctx, otlpExporters.log); err != nil {
return err
}
return nil
}
func generateSpans(ctx context.Context, tracer trace.Tracer) error {
ctx, parent := tracer.Start(ctx, "parent")
defer parent.End()
_, child1 := tracer.Start(ctx, "child1")
time.Sleep(10 * time.Millisecond)
child1.AddEvent("an arbitrary event")
child1.End()
_, child2 := tracer.Start(ctx, "child2")
time.Sleep(10 * time.Millisecond)
child2.RecordError(errors.New("an exception occurred"))
child2.End()
return nil
}
func generateMetrics(ctx context.Context, meter metric.Meter) error {
counter, err := meter.Float64Counter("float64_counter")
if err != nil {
return err
}
counter.Add(ctx, 1)
hist, err := meter.Int64Histogram("int64_histogram")
if err != nil {
return err
}
hist.Record(ctx, 1)
hist.Record(ctx, 10)
hist.Record(ctx, 10)
hist.Record(ctx, 100)
hist.Record(ctx, 100)
hist.Record(ctx, 100)
return nil
}
func generateLogs(ctx context.Context, logger otlplogExporter) error {
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
attribs := rl.Resource().Attributes()
attribs.PutStr(
string(semconv.ServiceNameKey),
getenvDefault("OTEL_SERVICE_NAME", "unknown_service"),
)
sl := rl.ScopeLogs().AppendEmpty().LogRecords()
record := sl.AppendEmpty()
record.Body().SetStr("test record")
record.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
record.SetSeverityNumber(plog.SeverityNumberFatal)
record.SetSeverityText("fatal")
return logger.Export(ctx, logs)
}
type otlpExporters struct {
cleanup func(context.Context) error
trace *otlptrace.Exporter
metric sdkmetric.Exporter
log otlplogExporter
}
func newOTLPExporters(ctx context.Context, endpointURL *url.URL) (*otlpExporters, error) {
switch *protocol {
case "grpc":
return newOTLPGRPCExporters(ctx, endpointURL)
case "http/protobuf":
return newOTLPHTTPExporters(ctx, endpointURL)
default:
return nil, fmt.Errorf("invalid protocol %q", *protocol)
}
}
func newOTLPGRPCExporters(ctx context.Context, endpointURL *url.URL) (*otlpExporters, error) {
var transportCredentials credentials.TransportCredentials
switch endpointURL.Scheme {
case "http":
// If http:// is specified, then use insecure (plaintext).
transportCredentials = grpcinsecure.NewCredentials()
case "https":
transportCredentials = credentials.NewTLS(&tls.Config{InsecureSkipVerify: *insecure})
}
grpcConn, err := grpc.DialContext(ctx, endpointURL.Host, grpc.WithTransportCredentials(transportCredentials))
if err != nil {
return nil, err
}
cleanup := func(context.Context) error {
return grpcConn.Close()
}
traceOptions := []otlptracegrpc.Option{otlptracegrpc.WithGRPCConn(grpcConn)}
metricOptions := []otlpmetricgrpc.Option{otlpmetricgrpc.WithGRPCConn(grpcConn)}
var headers map[string]string
if *apiKey != "" {
// If -api-key is specified then we set headers explicitly,
// overriding anything set in $OTEL_EXPORTER_OTLP_HEADERS and -secret-token.
headers = map[string]string{"Authorization": "ApiKey " + *apiKey}
} else if *secretToken != "" {
// If -secret-token is specified then we set headers explicitly,
// overriding anything set in $OTEL_EXPORTER_OTLP_HEADERS.
headers = map[string]string{"Authorization": "Bearer " + *secretToken}
}
if headers != nil {
traceOptions = append(traceOptions, otlptracegrpc.WithHeaders(headers))
metricOptions = append(metricOptions, otlpmetricgrpc.WithHeaders(headers))
}
otlpTraceExporter, err := otlptracegrpc.New(ctx, traceOptions...)
if err != nil {
cleanup(ctx)
return nil, err
}
otlpMetricExporter, err := otlpmetricgrpc.New(ctx, metricOptions...)
if err != nil {
cleanup(ctx)
return nil, err
}
return &otlpExporters{
cleanup: cleanup,
trace: otlpTraceExporter,
metric: otlpMetricExporter,
log: &otlploggrpcExporter{
client: plogotlp.NewGRPCClient(grpcConn),
headers: headers, // logs exporter does not respect OTEL_EXPORTER_OTLP_HEADERS
},
}, nil
}
func newOTLPHTTPExporters(ctx context.Context, endpointURL *url.URL) (*otlpExporters, error) {
tlsConfig := &tls.Config{InsecureSkipVerify: *insecure}
traceOptions := []otlptracehttp.Option{
otlptracehttp.WithEndpoint(endpointURL.Host),
otlptracehttp.WithTLSClientConfig(tlsConfig),
}
metricOptions := []otlpmetrichttp.Option{
otlpmetrichttp.WithEndpoint(endpointURL.Host),
otlpmetrichttp.WithTLSClientConfig(tlsConfig),
}
if endpointURL.Scheme == "http" {
traceOptions = append(traceOptions, otlptracehttp.WithInsecure())
metricOptions = append(metricOptions, otlpmetrichttp.WithInsecure())
}
if *secretToken != "" {
// If -secret-token is specified then we set headers explicitly,
// overriding anything set in $OTEL_EXPORTER_OTLP_HEADERS.
headers := map[string]string{"Authorization": "Bearer " + *secretToken}
traceOptions = append(traceOptions, otlptracehttp.WithHeaders(headers))
metricOptions = append(metricOptions, otlpmetrichttp.WithHeaders(headers))
}
cleanup := func(context.Context) error { return nil }
otlpTraceExporter, err := otlptracehttp.New(ctx, traceOptions...)
if err != nil {
cleanup(ctx)
return nil, err
}
otlpMetricExporter, err := otlpmetrichttp.New(ctx, metricOptions...)
if err != nil {
cleanup(ctx)
return nil, err
}
return &otlpExporters{
cleanup: cleanup,
trace: otlpTraceExporter,
metric: otlpMetricExporter,
log: &otlploghttpExporter{},
}, nil
}
type otlplogExporter interface {
Export(ctx context.Context, logs plog.Logs) error
}
// otlploggrpcExporter is a simple synchronous log exporter using GRPC
type otlploggrpcExporter struct {
client plogotlp.GRPCClient
headers map[string]string
}
func (e *otlploggrpcExporter) Export(ctx context.Context, logs plog.Logs) error {
req := plogotlp.NewExportRequestFromLogs(logs)
md := metadata.New(e.headers)
ctx = metadata.NewOutgoingContext(ctx, md)
_, err := e.client.Export(ctx, req)
if err != nil {
return err
}
// TODO: parse response for error
return nil
}
// otlploghttpExporter is a simple synchronous log exporter using protobuf over HTTP
type otlploghttpExporter struct {
}
func (e *otlploghttpExporter) Export(ctx context.Context, logs plog.Logs) error {
// TODO: implement
return nil
}