public IEventSink CreateInstance()

in Amazon.KinesisTap.AWS/AWSEventSinkFactory.cs [104:344]


        public IEventSink CreateInstance(string sinkType, IPlugInContext context)
        {
            var appDataFileProvider = context.Services.GetService<IAppDataFileProvider>();
            IConfiguration config = context.Configuration;
            ILogger logger = context.Logger;
            var failoverEnabled = false;
            switch (sinkType.ToLower())
            {
                // Extending old sinks to support the functionality.
                case CLOUD_WATCH_LOG:
                case CLOUD_WATCH_LOG_EMF:
                    var cwlOptions = new AWSBufferedSinkOptions();
                    ParseBufferedSinkOptions(config, cwlOptions);
                    // Failover
                    if (bool.TryParse(config["FailoverEnabled"], out failoverEnabled) && failoverEnabled)
                    {
                        return (config["FailoverStrategy"]) switch
                        {
                            PriorityFailover => CreateInstance(CloudWatchLogsWithPriorityFailover, context),
                            WeightedLoadBalanceFailover => CreateInstance(CloudWatchLogsWithWeightedLoadBalanceFailover, context),
                            RoundTripTimeBasedFailover => CreateInstance(CloudWatchLogsWithRoundTripTimeBasedFailover, context),
                            _ => CreateInstance(CloudWatchLogsWithLoadBalanceFailover, context),
                        };
                    }
                    //override some options based on CloudWatchLogs quota
                    cwlOptions.MaxBatchSize = 1000;
                    cwlOptions.MaxBatchBytes = 1024 * 1000;
                    cwlOptions.QueueSizeItems = 1000;

                    return new AsyncCloudWatchLogsSink(config[ConfigConstants.ID], context.SessionName,
                        config["LogGroup"], config["LogStream"],
                        AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>(context),
                        appDataFileProvider,
                        context.Logger, context.Metrics, context.BookmarkManager, context.NetworkStatus, cwlOptions);

                //return new CloudWatchLogsSink(context, AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>(context));
                case CLOUD_WATCH:
                    // Failover
                    if (bool.TryParse(config["FailoverEnabled"], out failoverEnabled) && failoverEnabled)
                    {
                        return (config["FailoverStrategy"]) switch
                        {
                            PriorityFailover => CreateInstance(CloudWatchWithPriorityFailover, context),
                            WeightedLoadBalanceFailover => CreateInstance(CloudWatchWithWeightedLoadBalanceFailover, context),
                            RoundTripTimeBasedFailover => CreateInstance(CloudWatchWithRoundTripTimeBasedFailover, context),
                            _ => CreateInstance(CloudWatchWithLoadBalanceFailover, context),
                        };
                    }
                    return new CloudWatchSink(60, context, AWSUtilities.CreateAWSClient<AmazonCloudWatchClient>(context));
                case KINESIS_FIREHOSE:
                    // Failover
                    if (bool.TryParse(config["FailoverEnabled"], out failoverEnabled) && failoverEnabled)
                    {
                        return (config["FailoverStrategy"]) switch
                        {
                            PriorityFailover => CreateInstance(KinesisFirehoseWithPriorityFailover, context),
                            WeightedLoadBalanceFailover => CreateInstance(KinesisFirehoseWithWeightedLoadBalanceFailover, context),
                            RoundTripTimeBasedFailover => CreateInstance(KinesisFirehoseWithRoundTripTimeBasedFailover, context),
                            _ => CreateInstance(KinesisFirehoseWithLoadBalanceFailover, context),
                        };
                    }
                    return new KinesisFirehoseSink(context, AWSUtilities.CreateAWSClient<AmazonKinesisFirehoseClient>(context));
                case KINESIS_STREAM:
                    // Failover
                    if (bool.TryParse(config["FailoverEnabled"], out failoverEnabled) && failoverEnabled)
                    {
                        return (config["FailoverStrategy"]) switch
                        {
                            PriorityFailover => CreateInstance(KinesisStreamWithPriorityFailover, context),
                            WeightedLoadBalanceFailover => CreateInstance(KinesisStreamWithWeightedLoadBalanceFailover, context),
                            RoundTripTimeBasedFailover => CreateInstance(KinesisStreamWithRoundTripTimeBasedFailover, context),
                            _ => CreateInstance(KinesisStreamWithLoadBalanceFailover, context),
                        };
                    }
                    return new KinesisStreamSink(context, AWSUtilities.CreateAWSClient<AmazonKinesisClient>(context));

                case TELEMETRICS:
                    //If RedirectToSinkId is specified, we use TelemetryConnector. Otherwise, TelemtryClient
                    var redirectToSinkId = config[ConfigConstants.REDIRECT_TO_SINK_ID];
                    ITelemetricsClient telemetricsClient = null;
                    if (string.IsNullOrWhiteSpace(redirectToSinkId))
                    {
                        var cognitoIdentity = new AmazonCognitoIdentityClient(new AnonymousAWSCredentials(), RegionEndpoint.USWest2);
                        telemetricsClient = new TelemetricsClient(cognitoIdentity, context.ParameterStore);
                    }
                    else
                    {
                        telemetricsClient = new TelemetricsSinkConnector(context);
                        context.ContextData[ConfigConstants.TELEMETRY_CONNECTOR] = telemetricsClient; //Make telemetricsClient available to caller
                    }
                    return new TelemetricsSink($"_{TELEMETRICS}", TELEMETRICS_DEFAULT_INTERVAL * 1000, telemetricsClient, context.Logger);
                case FILE_SYSTEM:
                    return new FileSystemEventSink(context);
                case S3:
                    return new S3Sink(context, AWSUtilities.CreateAWSClient<AmazonS3Client>(context));

                // Exposed directly as well
                // DIRECT - Priority region failover
                case CloudWatchLogsWithPriorityFailover:
                case CloudWatchLogsEMFWithPriorityFailover:
                    {
                        var failoverSinkRegionStrategy = new PriorityRegionFailover<AmazonCloudWatchLogsClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>);
                        return new CloudWatchLogsSink(
                            context, new FailoverSink<AmazonCloudWatchLogsClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case CloudWatchWithPriorityFailover:
                    {
                        var failoverSinkRegionStrategy = new PriorityRegionFailover<AmazonCloudWatchClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonCloudWatchClient>);
                        return new CloudWatchSink
                            (60, context, new FailoverSink<AmazonCloudWatchClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case KinesisFirehoseWithPriorityFailover:
                    {
                        var failoverSinkRegionStrategy = new PriorityRegionFailover<AmazonKinesisFirehoseClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonKinesisFirehoseClient>);
                        return new KinesisFirehoseSink(
                            context, new FailoverSink<AmazonKinesisFirehoseClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case KinesisStreamWithPriorityFailover:
                    {
                        var failoverSinkRegionStrategy = new PriorityRegionFailover<AmazonKinesisClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonKinesisClient>);
                        return new KinesisStreamSink(
                            context, new FailoverSink<AmazonKinesisClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }

                // DIRECT - Load-balance region failover
                case CloudWatchLogsWithLoadBalanceFailover:
                case CloudWatchLogsEMFWithLoadBalanceFailover:
                    {
                        var failoverSinkRegionStrategy = new LoadBalanceRegionFailover<AmazonCloudWatchLogsClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>);
                        return new CloudWatchLogsSink(
                            context, new FailoverSink<AmazonCloudWatchLogsClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case CloudWatchWithLoadBalanceFailover:
                    {
                        var failoverSinkRegionStrategy = new LoadBalanceRegionFailover<AmazonCloudWatchClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonCloudWatchClient>);
                        return new CloudWatchSink
                            (60, context, new FailoverSink<AmazonCloudWatchClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case KinesisFirehoseWithLoadBalanceFailover:
                    {
                        var failoverSinkRegionStrategy = new LoadBalanceRegionFailover<AmazonKinesisFirehoseClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonKinesisFirehoseClient>);
                        return new KinesisFirehoseSink(
                            context, new FailoverSink<AmazonKinesisFirehoseClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case KinesisStreamWithLoadBalanceFailover:
                    {
                        var failoverSinkRegionStrategy = new LoadBalanceRegionFailover<AmazonKinesisClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonKinesisClient>);
                        return new KinesisStreamSink(
                            context, new FailoverSink<AmazonKinesisClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }

                // DIRECT - Weighted load-balance region failover
                case CloudWatchLogsWithWeightedLoadBalanceFailover:
                case CloudWatchLogsEMFWithWeightedLoadBalanceFailover:
                    {
                        var failoverSinkRegionStrategy = new WeightedLoadBalanceRegionFailover<AmazonCloudWatchLogsClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>);
                        return new CloudWatchLogsSink(
                            context, new FailoverSink<AmazonCloudWatchLogsClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case CloudWatchWithWeightedLoadBalanceFailover:
                    {
                        var failoverSinkRegionStrategy = new WeightedLoadBalanceRegionFailover<AmazonCloudWatchClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonCloudWatchClient>);
                        return new CloudWatchSink
                            (60, context, new FailoverSink<AmazonCloudWatchClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case KinesisFirehoseWithWeightedLoadBalanceFailover:
                    {
                        var failoverSinkRegionStrategy = new WeightedLoadBalanceRegionFailover<AmazonKinesisFirehoseClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonKinesisFirehoseClient>);
                        return new KinesisFirehoseSink(
                            context, new FailoverSink<AmazonKinesisFirehoseClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case KinesisStreamWithWeightedLoadBalanceFailover:
                    {
                        var failoverSinkRegionStrategy = new WeightedLoadBalanceRegionFailover<AmazonKinesisClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonKinesisClient>);
                        return new KinesisStreamSink(
                            context, new FailoverSink<AmazonKinesisClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }

                // DIRECT - Round trip time based region failover
                case CloudWatchLogsWithRoundTripTimeBasedFailover:
                case CloudWatchLogsEMFWithRoundTripTimeBasedFailover:
                    {
                        var failoverSinkRegionStrategy = new RoundTripTimeBasedRegionFailover<AmazonCloudWatchLogsClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>, CloudWatchLogsSink.CheckServiceReachable);
                        return new CloudWatchLogsSink(
                            context, new FailoverSink<AmazonCloudWatchLogsClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case CloudWatchWithRoundTripTimeBasedFailover:
                    {
                        var failoverSinkRegionStrategy = new RoundTripTimeBasedRegionFailover<AmazonCloudWatchClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonCloudWatchClient>, CloudWatchSink.CheckServiceReachable);
                        return new CloudWatchSink
                            (60, context, new FailoverSink<AmazonCloudWatchClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case KinesisFirehoseWithRoundTripTimeBasedFailover:
                    {
                        var failoverSinkRegionStrategy = new RoundTripTimeBasedRegionFailover<AmazonKinesisFirehoseClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonKinesisFirehoseClient>, KinesisFirehoseSink.CheckServiceReachable);
                        return new KinesisFirehoseSink(
                            context, new FailoverSink<AmazonKinesisFirehoseClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }
                case KinesisStreamWithRoundTripTimeBasedFailover:
                    {
                        var failoverSinkRegionStrategy = new RoundTripTimeBasedRegionFailover<AmazonKinesisClient>(
                            context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
                            AWSUtilities.CreateAWSClient<AmazonKinesisClient>, KinesisStreamSink.CheckServiceReachable);
                        return new KinesisStreamSink(
                            context, new FailoverSink<AmazonKinesisClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
                    }

                default:
                    throw new NotImplementedException($"Sink type {sinkType} is not implemented by AWSEventSinkFactory.");
            }
        }