x-pack/filebeat/input/awss3/input.go (47 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package awss3
import (
"fmt"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/statestore"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/go-concert/unison"
)
const inputName = "aws-s3"
func Plugin(store statestore.States) v2.Plugin {
return v2.Plugin{
Name: inputName,
Stability: feature.Stable,
Deprecated: false,
Info: "Collect logs from s3",
Manager: &s3InputManager{store: store},
}
}
type s3InputManager struct {
store statestore.States
}
func (im *s3InputManager) Init(grp unison.Group) error {
return nil
}
func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig)
if err != nil {
return nil, fmt.Errorf("initializing AWS config: %w", err)
}
if config.RegionName != "" {
// The awsConfig now contains the region from the credential profile or default region
// if the region is explicitly set in the config, then it wins
awsConfig.Region = config.RegionName
}
if config.QueueURL != "" {
return newSQSReaderInput(config, awsConfig), nil
}
if config.BucketARN != "" || config.AccessPointARN != "" || config.NonAWSBucketName != "" {
return newS3PollerInput(config, awsConfig, im.store)
}
return nil, fmt.Errorf("configuration has no SQS queue URL and no S3 bucket ARN")
}
// boolPtr returns a pointer to b.
func boolPtr(b bool) *bool { return &b }