exporter/fileexporter/factory.go (169 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter"
import (
"context"
"io"
"os"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper"
"go.opentelemetry.io/collector/exporter/xexporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"gopkg.in/natefinch/lumberjack.v2"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
)
const (
// the number of old log files to retain
defaultMaxBackups = 100
// the format of encoded telemetry data
formatTypeJSON = "json"
formatTypeProto = "proto"
// the type of compression codec
compressionZSTD = "zstd"
defaultMaxOpenFiles = 100
defaultResourceAttribute = "fileexporter.path_segment"
)
type FileExporter interface {
component.Component
consumeTraces(_ context.Context, td ptrace.Traces) error
consumeMetrics(_ context.Context, md pmetric.Metrics) error
consumeLogs(_ context.Context, ld plog.Logs) error
consumeProfiles(_ context.Context, pd pprofile.Profiles) error
}
// NewFactory creates a factory for OTLP exporter.
func NewFactory() exporter.Factory {
return xexporter.NewFactory(
metadata.Type,
createDefaultConfig,
xexporter.WithTraces(createTracesExporter, metadata.TracesStability),
xexporter.WithMetrics(createMetricsExporter, metadata.MetricsStability),
xexporter.WithLogs(createLogsExporter, metadata.LogsStability),
xexporter.WithProfiles(createProfilesExporter, metadata.ProfilesStability))
}
func createDefaultConfig() component.Config {
return &Config{
FormatType: formatTypeJSON,
Rotation: &Rotation{MaxBackups: defaultMaxBackups},
GroupBy: &GroupBy{
ResourceAttribute: defaultResourceAttribute,
MaxOpenFiles: defaultMaxOpenFiles,
},
}
}
func createTracesExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Traces, error) {
fe := getOrCreateFileExporter(cfg, set.Logger)
return exporterhelper.NewTraces(
ctx,
set,
cfg,
fe.consumeTraces,
exporterhelper.WithStart(fe.Start),
exporterhelper.WithShutdown(fe.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
)
}
func createMetricsExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Metrics, error) {
fe := getOrCreateFileExporter(cfg, set.Logger)
return exporterhelper.NewMetrics(
ctx,
set,
cfg,
fe.consumeMetrics,
exporterhelper.WithStart(fe.Start),
exporterhelper.WithShutdown(fe.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
)
}
func createLogsExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Logs, error) {
fe := getOrCreateFileExporter(cfg, set.Logger)
return exporterhelper.NewLogs(
ctx,
set,
cfg,
fe.consumeLogs,
exporterhelper.WithStart(fe.Start),
exporterhelper.WithShutdown(fe.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
)
}
func createProfilesExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (xexporter.Profiles, error) {
fe := getOrCreateFileExporter(cfg, set.Logger)
return xexporterhelper.NewProfilesExporter(
ctx,
set,
cfg,
fe.consumeProfiles,
exporterhelper.WithStart(fe.Start),
exporterhelper.WithShutdown(fe.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
)
}
// getOrCreateFileExporter creates a FileExporter and caches it for a particular configuration,
// or returns the already cached one. Caching is required because the factory is asked trace and
// metric receivers separately when it gets CreateTraces() and CreateMetrics()
// but they must not create separate objects, they must use one Exporter object per configuration.
func getOrCreateFileExporter(cfg component.Config, logger *zap.Logger) FileExporter {
conf := cfg.(*Config)
fe := exporters.GetOrAdd(cfg, func() component.Component {
return newFileExporter(conf, logger)
})
c := fe.Unwrap()
return c.(FileExporter)
}
func newFileExporter(conf *Config, logger *zap.Logger) FileExporter {
if conf.GroupBy == nil || !conf.GroupBy.Enabled {
return &fileExporter{
conf: conf,
}
}
return &groupingFileExporter{
conf: conf,
logger: logger,
}
}
func newFileWriter(path string, shouldAppend bool, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) {
var wc io.WriteCloser
if rotation == nil {
fileFlags := os.O_RDWR | os.O_CREATE
if shouldAppend {
fileFlags |= os.O_APPEND
} else {
fileFlags |= os.O_TRUNC
}
f, err := os.OpenFile(path, fileFlags, 0o644)
if err != nil {
return nil, err
}
wc = newBufferedWriteCloser(f)
} else {
wc = &lumberjack.Logger{
Filename: path,
MaxSize: rotation.MaxMegabytes,
MaxAge: rotation.MaxDays,
MaxBackups: rotation.MaxBackups,
LocalTime: rotation.LocalTime,
}
}
return &fileWriter{
path: path,
file: wc,
exporter: export,
flushInterval: flushInterval,
}, nil
}
// This is the map of already created File exporters for particular configurations.
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTraces() and CreateMetrics() but they must not
// create separate objects, they must use one Exporter object per configuration.
var exporters = sharedcomponent.NewSharedComponents()