init/fluent_bit_init_process.go (282 lines of code) (raw):
package main
import (
"encoding/json"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"reflect"
"regexp"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/sirupsen/logrus"
)
// static paths
const (
s3FileDirectoryPath = "/init/fluent-bit-init-s3-files/"
mainConfigFile = "/init/fluent-bit-init.conf"
originalMainConfigFile = "/fluent-bit/etc/fluent-bit.conf"
invokeFile = "/init/invoke_fluent_bit.sh"
)
var (
// default Fluent Bit command
baseCommand = "exec /fluent-bit/bin/fluent-bit -e /fluent-bit/firehose.so -e /fluent-bit/cloudwatch.so -e /fluent-bit/kinesis.so"
// global s3 client and flag
s3Client *s3.S3
s3ClientCreated bool = false
// global ecs metadata region
metadataRegion string = ""
)
// HTTPClient interface
type HTTPClient interface {
Get(url string) (*http.Response, error)
}
// S3Downloader interface
type S3Downloader interface {
Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*s3manager.Downloader)) (int64, error)
}
// all values in the structure are empty strings by default
type ECSTaskMetadata struct {
AWS_REGION string `json:"AWSRegion"`
AWS_AVAILABILITY_ZONE string `json:"AvailabilityZone"`
ECS_CLUSTER string `json:"Cluster"` // Cluster name
ECS_TASK_ARN string `json:"TaskARN"`
ECS_TASK_ID string `json:"TaskID"`
ECS_FAMILY string `json:"Family"`
ECS_LAUNCH_TYPE string `json:"LaunchType"` // Task launch type will be an empty string if container agent is under version 1.45.0
ECS_REVISION string `json:"Revision"` // Revision number
ECS_TASK_DEFINITION string `json:"TaskDefinition"` // TaskDefinition = "family:revision"
}
// get ECS Task Metadata via endpoint V4
func getECSTaskMetadata(httpClient HTTPClient) ECSTaskMetadata {
var metadata ECSTaskMetadata
ecsTaskMetadataEndpointV4 := os.Getenv("ECS_CONTAINER_METADATA_URI_V4")
if ecsTaskMetadataEndpointV4 == "" {
logrus.Warnln("[FluentBit Init Process] Unable to get ECS Metadata, ignore this warning if not running on ECS")
return metadata
}
res, err := httpClient.Get(ecsTaskMetadataEndpointV4 + "/task")
if err != nil {
logrus.Fatalf("[FluentBit Init Process] Failed to get ECS Metadata via HTTP Get: %s\n", err)
}
response, err := ioutil.ReadAll(res.Body)
if err != nil {
logrus.Fatalf("[FluentBit Init Process] Failed to read ECS Metadata from HTTP response: %s\n", err)
}
res.Body.Close()
err = json.Unmarshal(response, &metadata)
if err != nil {
logrus.Fatalf("[FluentBit Init Process] Failed to unmarshal ECS metadata: %s\n", err)
}
taskARN, err := arn.Parse(metadata.ECS_TASK_ARN)
if err != nil {
logrus.Fatalf("[FluentBit Init Process] Failed to parse ECS TaskARN: %s %s\n", metadata.ECS_TASK_ARN, err)
}
resourceID := strings.Split(taskARN.Resource, "/")
taskID := resourceID[len(resourceID)-1]
metadata.ECS_TASK_ID = taskID
metadata.AWS_REGION = taskARN.Region
metadata.ECS_TASK_DEFINITION = metadata.ECS_FAMILY + ":" + metadata.ECS_REVISION
// per ECS task metadata docs, Cluster can be an ARN or the name
if (strings.Contains(metadata.ECS_CLUSTER, "/")) {
clusterARN, err := arn.Parse(metadata.ECS_CLUSTER)
if err != nil {
logrus.Fatalf("[FluentBit Init Process] Failed to parse ECS Cluster ARN: %s %s\n", metadata.ECS_CLUSTER, err)
}
arnSplit := strings.Split(clusterARN.Resource, "/")
clusterName := arnSplit[len(arnSplit)-1]
metadata.ECS_CLUSTER = clusterName
}
// set global ecs metadata region for S3 client
metadataRegion = metadata.AWS_REGION
return metadata
}
// set ECS Task Metadata as environment variables in the invoke_fluent_bit.sh
func setECSTaskMetadata(metadata ECSTaskMetadata, filePath string) {
invokeFile := openFile(filePath)
defer invokeFile.Close()
// set the FLB_AWS_USER_AGENT env var as "init" to get the image usage
initUsage := "export FLB_AWS_USER_AGENT=ecs-init\n"
_, err := invokeFile.WriteString(initUsage)
if err != nil {
logrus.Errorln(err)
logrus.Warnf("[FluentBit Init Process] Cannot write %s in the invoke_fluent_bit.sh\n", initUsage[:len(initUsage)-2])
}
t := reflect.TypeOf(metadata)
v := reflect.ValueOf(metadata)
for i := 0; i < t.NumField(); i++ {
if v.Field(i).Interface().(string) == "" {
continue
}
writeContent := "export " + t.Field(i).Name + "=" + v.Field(i).Interface().(string) + "\n"
_, err := invokeFile.WriteString(writeContent)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot write %s in the invoke_fluent_bit.sh\n", writeContent[:len(writeContent)-2])
}
}
}
// create Fluent Bit command to use "-c" to specify the new main config file
func createCommand(command *string, filePath string) {
*command = *command + " -c " + filePath
}
// get our built in config files or files from s3
// process built-in config files directly
// add S3 config files to directory "/init/fluent-bit-init-s3-files/"
func getAllConfigFiles() {
// get all env vars in the container
envs := os.Environ()
// find all env vars match specified prefix
for _, env := range envs {
var envKey string
var envValue string
env_kv := strings.SplitN(env, "=", 2)
if len(env_kv) != 2 {
logrus.Fatalf("[FluentBit Init Process] Unrecognizable environment variables: %s\n", env)
}
envKey = string(env_kv[0])
envValue = string(env_kv[1])
s3_regex, _ := regexp.Compile("aws_fluent_bit_init_[sS]3")
file_regex, _ := regexp.Compile("aws_fluent_bit_init_[fF]ile")
matched_s3 := s3_regex.MatchString(envKey)
matched_file := file_regex.MatchString(envKey)
// if this env var's value is an arn, download the config file first, then process it
if matched_s3 {
s3FilePath := getS3ConfigFile(envValue)
s3FileName := strings.SplitN(s3FilePath, "/", -1)
processConfigFile(s3FileDirectoryPath + s3FileName[len(s3FileName)-1])
}
// if this env var's value is a path of our built-in config file, process is derectly
if matched_file {
processConfigFile(envValue)
}
}
}
func processConfigFile(path string) {
contentBytes, err := ioutil.ReadFile(path)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot open file: %s\n", path)
}
content := string(contentBytes)
if strings.Contains(content, "[PARSER]") || strings.Contains(content, "[MULTILINE_PARSER]") {
// this is a parser config file, change command
updateCommand(path)
} else {
// this is not a parser config file. @INCLUDE
writeInclude(path, mainConfigFile)
}
}
func getS3ConfigFile(userInput string) string {
// Preparation for downloading S3 config files
if !s3ClientCreated {
createS3Client()
}
// e.g. "arn:aws:s3:::user-bucket/s3_parser.conf"
s3ARN, err := arn.Parse(userInput)
if err != nil {
logrus.Fatalf("[FluentBit Init Process] Could not parse arn: %s\n", userInput)
}
bucketAndFile := strings.SplitN(s3ARN.Resource, "/", 2)
if len(bucketAndFile) != 2 {
logrus.Fatalf("[FluentBit Init Process] Could not parse arn: %s\n", userInput)
}
bucketName := bucketAndFile[0]
s3FilePath := bucketAndFile[1]
// get bucket region
input := &s3.GetBucketLocationInput{
Bucket: aws.String(bucketName),
}
output, err := s3Client.GetBucketLocation(input)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot get bucket region of %s + %s, you must be the bucket owner to implement this operation\n", bucketName, s3FilePath)
}
bucketRegion := aws.StringValue(output.LocationConstraint)
// Buckets in Region us-east-1 have a LocationConstraint of null
// https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#GetBucketLocationOutput
if bucketRegion == "" {
bucketRegion = "us-east-1"
}
// create a downloader
s3Downloader := createS3Downloader(bucketRegion)
// download file from S3 and store in the directory "/init/fluent-bit-init-s3-files/"
downloadS3ConfigFile(s3Downloader, s3FilePath, bucketName, s3FileDirectoryPath)
return s3FilePath
}
// create a S3 client as the global S3 client for reuse
func createS3Client() {
region := "us-east-1"
if metadataRegion != "" {
region = metadataRegion
}
s3Client = s3.New(session.Must(session.NewSession(&aws.Config{
// if not specify region here, missingregion error will raise when get bucket location
Region: aws.String(region),
})))
s3ClientCreated = true
}
func createS3Downloader(bucketRegion string) S3Downloader {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(bucketRegion)},
)
if err != nil {
logrus.Errorln(err)
logrus.Fatalln("[FluentBit Init Process] Cannot creat a new session")
}
// need to specify session region!
s3Downloader := s3manager.NewDownloader(sess)
return s3Downloader
}
func downloadS3ConfigFile(s3Downloader S3Downloader, s3FilePath, bucketName, s3FileDirectory string) {
s3FileName := strings.SplitN(s3FilePath, "/", -1)
fileFromS3 := createFile(s3FileDirectory+s3FileName[len(s3FileName)-1], false)
defer fileFromS3.Close()
_, err := s3Downloader.Download(fileFromS3,
&s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(s3FilePath),
})
if err != nil {
logrus.Warnf("[FluentBit Init Process] Cannot download %s from s3, retrying...\n", s3FileName)
_, error := s3Downloader.Download(fileFromS3,
&s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(s3FilePath),
})
if error != nil {
logrus.Errorln(error)
logrus.Fatalf("[FluentBit Init Process] Cannot download %s from s3\n", s3FileName)
}
}
}
// use @INCLUDE to add config files to the main config file
func writeInclude(configFilePath, mainConfigFilePath string) {
mainConfigFile := openFile(mainConfigFilePath)
defer mainConfigFile.Close()
writeContent := "@INCLUDE " + configFilePath + "\n"
_, err := mainConfigFile.WriteString(writeContent)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot write %s in main config file: %s\n", writeContent[:len(writeContent)-2], mainConfigFilePath)
}
}
// change the fluent bit cammand to use "-R" to specift Parser config file
func updateCommand(parserFilePath string) {
baseCommand = baseCommand + " -R " + parserFilePath
logrus.Infoln("[FluentBit Init Process] Command is change to -> " + baseCommand)
}
// change the invoke_fluent_bit.sh
// which will declare ECS Task Metadata as environment variables
// and finally invoke Fluent Bit
func modifyInvokeFile(filePath string) {
invokeFile := openFile(filePath)
defer invokeFile.Close()
_, err := invokeFile.WriteString(baseCommand)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot write %s in invoke_fluent_bit.sh\n", baseCommand)
}
}
// create a file, when flag is true, the file will be closed automatically after creation
func createFile(filePath string, AutoClose bool) *os.File {
if err := os.MkdirAll(filepath.Dir(filePath), 0700); err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot create the Directory: %s\n", filepath.Dir(filePath))
}
file, err := os.Create(filePath)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot create the file: %s\n", filePath)
}
if AutoClose {
defer file.Close()
}
return file
}
func openFile(filePath string) *os.File {
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, 0700)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Unable to read %s\n", filePath)
}
return file
}
func main() {
// create the invoke_fluent_bit.sh
// which will declare ECS Task Metadata as environment variables
// and finally invoke Fluent Bit
createFile(invokeFile, true)
// get ECS Task Metadata and set the region for S3 client
httpClient := &http.Client{}
metadata := getECSTaskMetadata(httpClient)
// set ECS Task Metada as env vars in the invoke_fluent_bit.sh
setECSTaskMetadata(metadata, invokeFile)
// create main config file which will be used invoke Fluent Bit
createFile(mainConfigFile, true)
// add @INCLUDE in main config file to include original main config file
writeInclude(originalMainConfigFile, mainConfigFile)
// create Fluent Bit command to use "-c" to specify new main config file
createCommand(&baseCommand, mainConfigFile)
// get our built in config files or files from s3
// process built-in config files directly
// add S3 config files to directory "/init/fluent-bit-init-s3-files/"
getAllConfigFiles()
// modify invoke_fluent_bit.sh, invoke fluent bit
// this function will be called at the end
// any error appear above will cause exit this process,
// will not write Fluent Bit command in the finvoke_fluent_bit.sh so Fluent Bit will not be invoked
modifyInvokeFile(invokeFile)
}