internal/telemetrygen/logs/exporter.go (98 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
// This file is forked from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/790e18f1733e71debc7608aed98ace654ac76a60/cmd/telemetrygen/internal/logs/exporter.go,
// which is licensed under Apache-2 and Copyright The OpenTelemetry Authors.
//
// This file does not contain functional modifications.
package logs
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/elastic/apm-perf/internal/telemetrygen/common"
)
type exporter interface {
export(plog.Logs) error
}
func newExporter(cfg *Config) (exporter, error) {
// Exporter with HTTP
if cfg.UseHTTP {
if cfg.Insecure {
return &httpClientExporter{
client: http.DefaultClient,
cfg: cfg,
}, nil
}
creds, err := common.GetTLSCredentialsForHTTPExporter(cfg.CaFile, cfg.ClientAuth)
if err != nil {
return nil, fmt.Errorf("failed to get TLS credentials: %w", err)
}
return &httpClientExporter{
client: &http.Client{Transport: &http.Transport{TLSClientConfig: creds}},
cfg: cfg,
}, nil
}
// Exporter with GRPC
var err error
var clientConn *grpc.ClientConn
if cfg.Insecure {
clientConn, err = grpc.NewClient(cfg.Endpoint(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
} else {
creds, err := common.GetTLSCredentialsForGRPCExporter(cfg.CaFile, cfg.ClientAuth)
if err != nil {
return nil, fmt.Errorf("failed to get TLS credentials: %w", err)
}
clientConn, err = grpc.NewClient(cfg.Endpoint(), grpc.WithTransportCredentials(creds))
if err != nil {
return nil, err
}
}
return &gRPCClientExporter{client: plogotlp.NewGRPCClient(clientConn)}, nil
}
type gRPCClientExporter struct {
client plogotlp.GRPCClient
}
func (e *gRPCClientExporter) export(logs plog.Logs) error {
req := plogotlp.NewExportRequestFromLogs(logs)
if _, err := e.client.Export(context.Background(), req); err != nil {
return err
}
return nil
}
type httpClientExporter struct {
client *http.Client
cfg *Config
}
func (e *httpClientExporter) export(logs plog.Logs) error {
scheme := "https"
if e.cfg.Insecure {
scheme = "http"
}
path := e.cfg.HTTPPath
url := fmt.Sprintf("%s://%s%s", scheme, e.cfg.Endpoint(), path)
req := plogotlp.NewExportRequestFromLogs(logs)
body, err := req.MarshalProto()
if err != nil {
return fmt.Errorf("failed to marshal logs to protobuf: %w", err)
}
httpReq, err := http.NewRequestWithContext(context.Background(), "POST", url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("failed to create logs HTTP request: %w", err)
}
for k, v := range e.cfg.Headers {
httpReq.Header.Set(k, v)
}
httpReq.Header.Set("Content-Type", "application/x-protobuf")
resp, err := e.client.Do(httpReq)
if err != nil {
return fmt.Errorf("failed to execute logs HTTP request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
var respData bytes.Buffer
_, _ = io.Copy(&respData, resp.Body)
return fmt.Errorf("log request failed with status %s (%s)", resp.Status, respData.String())
}
return nil
}