x-pack/filebeat/input/awss3/input.go (47 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package awss3 import ( "fmt" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/statestore" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/go-concert/unison" ) const inputName = "aws-s3" func Plugin(store statestore.States) v2.Plugin { return v2.Plugin{ Name: inputName, Stability: feature.Stable, Deprecated: false, Info: "Collect logs from s3", Manager: &s3InputManager{store: store}, } } type s3InputManager struct { store statestore.States } func (im *s3InputManager) Init(grp unison.Group) error { return nil } func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) { config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return nil, err } awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) if err != nil { return nil, fmt.Errorf("initializing AWS config: %w", err) } if config.RegionName != "" { // The awsConfig now contains the region from the credential profile or default region // if the region is explicitly set in the config, then it wins awsConfig.Region = config.RegionName } if config.QueueURL != "" { return newSQSReaderInput(config, awsConfig), nil } if config.BucketARN != "" || config.AccessPointARN != "" || config.NonAWSBucketName != "" { return newS3PollerInput(config, awsConfig, im.store) } return nil, fmt.Errorf("configuration has no SQS queue URL and no S3 bucket ARN") } // boolPtr returns a pointer to b. func boolPtr(b bool) *bool { return &b }