func newKinesisOutput()

in fluent-bit-kinesis.go [59:164]


func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, error) {
	stream := output.FLBPluginConfigKey(ctx, "stream")
	logrus.Infof("[kinesis %d] plugin parameter stream = '%s'", pluginID, stream)
	region := output.FLBPluginConfigKey(ctx, "region")
	logrus.Infof("[kinesis %d] plugin parameter region = '%s'", pluginID, region)
	dataKeys := output.FLBPluginConfigKey(ctx, "data_keys")
	logrus.Infof("[kinesis %d] plugin parameter data_keys = '%s'", pluginID, dataKeys)
	partitionKey := output.FLBPluginConfigKey(ctx, "partition_key")
	logrus.Infof("[kinesis %d] plugin parameter partition_key = '%s'", pluginID, partitionKey)
	roleARN := output.FLBPluginConfigKey(ctx, "role_arn")
	logrus.Infof("[kinesis %d] plugin parameter role_arn = '%s'", pluginID, roleARN)
	kinesisEndpoint := output.FLBPluginConfigKey(ctx, "endpoint")
	logrus.Infof("[kinesis %d] plugin parameter endpoint = '%s'", pluginID, kinesisEndpoint)
	stsEndpoint := output.FLBPluginConfigKey(ctx, "sts_endpoint")
	logrus.Infof("[kinesis %d] plugin parameter sts_endpoint = '%s'", pluginID, stsEndpoint)
	appendNewline := output.FLBPluginConfigKey(ctx, "append_newline")
	logrus.Infof("[kinesis %d] plugin parameter append_newline = %s", pluginID, appendNewline)
	timeKey := output.FLBPluginConfigKey(ctx, "time_key")
	logrus.Infof("[kinesis %d] plugin parameter time_key = '%s'", pluginID, timeKey)
	timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format")
	logrus.Infof("[kinesis %d] plugin parameter time_key_format = '%s'", pluginID, timeKeyFmt)
	concurrency := output.FLBPluginConfigKey(ctx, "experimental_concurrency")
	logrus.Infof("[kinesis %d] plugin parameter experimental_concurrency = '%s'", pluginID, concurrency)
	concurrencyRetries := output.FLBPluginConfigKey(ctx, "experimental_concurrency_retries")
	logrus.Infof("[kinesis %d] plugin parameter experimental_concurrency_retries = '%s'", pluginID, concurrencyRetries)
	logKey := output.FLBPluginConfigKey(ctx, "log_key")
	logrus.Infof("[kinesis %d] plugin parameter log_key = '%s'", pluginID, logKey)
	aggregation := output.FLBPluginConfigKey(ctx, "aggregation")
	logrus.Infof("[kinesis %d] plugin parameter aggregation = '%s'", pluginID, aggregation)
	compression := output.FLBPluginConfigKey(ctx, "compression")
	logrus.Infof("[kinesis %d] plugin parameter compression = '%s'", pluginID, compression)
	replaceDots := output.FLBPluginConfigKey(ctx, "replace_dots")
	logrus.Infof("[kinesis %d] plugin parameter replace_dots = '%s'", pluginID, replaceDots)

	if stream == "" || region == "" {
		return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID)
	}

	if partitionKey == "log" {
		return nil, fmt.Errorf("[kinesis %d] 'log' cannot be set as the partition key", pluginID)
	}

	if partitionKey == "" {
		logrus.Infof("[kinesis %d] no partition key provided. A random one will be generated.", pluginID)
	}

	appendNL := false
	if strings.ToLower(appendNewline) == "true" {
		appendNL = true
	}

	isAggregate := false
	if strings.ToLower(aggregation) == "true" {
		isAggregate = true
	}

	if isAggregate && partitionKey != "" {
		logrus.Errorf("[kinesis %d]  WARNING: The options 'aggregation' and  'partition_key' should not be used simultaneously", pluginID)
	}

	var concurrencyInt, concurrencyRetriesInt int
	var err error
	if concurrency != "" {
		concurrencyInt, err = strconv.Atoi(concurrency)
		if err != nil {
			logrus.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value %s specified: %v", pluginID, concurrency, err)
			return nil, err
		}
		if concurrencyInt < 0 {
			return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency)
		}

		if concurrencyInt > maximumConcurrency {
			return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be less than or equal to %d", pluginID, concurrency, maximumConcurrency)
		}

		if concurrencyInt > 0 {
			logrus.Warnf("[kinesis %d] WARNING: Enabling concurrency can lead to data loss.  If 'experimental_concurrency_retries' is reached data will be lost.", pluginID)
		}
	}

	if concurrencyRetries != "" {
		concurrencyRetriesInt, err = strconv.Atoi(concurrencyRetries)
		if err != nil {
			return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified: %v", pluginID, concurrencyRetries, err)
		}
		if concurrencyRetriesInt < 0 {
			return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries)
		}
	} else {
		concurrencyRetriesInt = defaultConcurrentRetries
	}

	var comp kinesis.CompressionType
	if strings.ToLower(compression) == string(kinesis.CompressionZlib) {
		comp = kinesis.CompressionZlib
	} else if strings.ToLower(compression) == string(kinesis.CompressionGzip) {
		comp = kinesis.CompressionGzip
	} else if strings.ToLower(compression) == string(kinesis.CompressionNone) || compression == "" {
		comp = kinesis.CompressionNone
	} else {
		return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'gzip', 'none', or undefined", pluginID, compression)
	}

	return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID)
}