in Amazon.KinesisTap.AWS/AWSEventSinkFactory.cs [104:344]
public IEventSink CreateInstance(string sinkType, IPlugInContext context)
{
var appDataFileProvider = context.Services.GetService<IAppDataFileProvider>();
IConfiguration config = context.Configuration;
ILogger logger = context.Logger;
var failoverEnabled = false;
switch (sinkType.ToLower())
{
// Extending old sinks to support the functionality.
case CLOUD_WATCH_LOG:
case CLOUD_WATCH_LOG_EMF:
var cwlOptions = new AWSBufferedSinkOptions();
ParseBufferedSinkOptions(config, cwlOptions);
// Failover
if (bool.TryParse(config["FailoverEnabled"], out failoverEnabled) && failoverEnabled)
{
return (config["FailoverStrategy"]) switch
{
PriorityFailover => CreateInstance(CloudWatchLogsWithPriorityFailover, context),
WeightedLoadBalanceFailover => CreateInstance(CloudWatchLogsWithWeightedLoadBalanceFailover, context),
RoundTripTimeBasedFailover => CreateInstance(CloudWatchLogsWithRoundTripTimeBasedFailover, context),
_ => CreateInstance(CloudWatchLogsWithLoadBalanceFailover, context),
};
}
//override some options based on CloudWatchLogs quota
cwlOptions.MaxBatchSize = 1000;
cwlOptions.MaxBatchBytes = 1024 * 1000;
cwlOptions.QueueSizeItems = 1000;
return new AsyncCloudWatchLogsSink(config[ConfigConstants.ID], context.SessionName,
config["LogGroup"], config["LogStream"],
AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>(context),
appDataFileProvider,
context.Logger, context.Metrics, context.BookmarkManager, context.NetworkStatus, cwlOptions);
//return new CloudWatchLogsSink(context, AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>(context));
case CLOUD_WATCH:
// Failover
if (bool.TryParse(config["FailoverEnabled"], out failoverEnabled) && failoverEnabled)
{
return (config["FailoverStrategy"]) switch
{
PriorityFailover => CreateInstance(CloudWatchWithPriorityFailover, context),
WeightedLoadBalanceFailover => CreateInstance(CloudWatchWithWeightedLoadBalanceFailover, context),
RoundTripTimeBasedFailover => CreateInstance(CloudWatchWithRoundTripTimeBasedFailover, context),
_ => CreateInstance(CloudWatchWithLoadBalanceFailover, context),
};
}
return new CloudWatchSink(60, context, AWSUtilities.CreateAWSClient<AmazonCloudWatchClient>(context));
case KINESIS_FIREHOSE:
// Failover
if (bool.TryParse(config["FailoverEnabled"], out failoverEnabled) && failoverEnabled)
{
return (config["FailoverStrategy"]) switch
{
PriorityFailover => CreateInstance(KinesisFirehoseWithPriorityFailover, context),
WeightedLoadBalanceFailover => CreateInstance(KinesisFirehoseWithWeightedLoadBalanceFailover, context),
RoundTripTimeBasedFailover => CreateInstance(KinesisFirehoseWithRoundTripTimeBasedFailover, context),
_ => CreateInstance(KinesisFirehoseWithLoadBalanceFailover, context),
};
}
return new KinesisFirehoseSink(context, AWSUtilities.CreateAWSClient<AmazonKinesisFirehoseClient>(context));
case KINESIS_STREAM:
// Failover
if (bool.TryParse(config["FailoverEnabled"], out failoverEnabled) && failoverEnabled)
{
return (config["FailoverStrategy"]) switch
{
PriorityFailover => CreateInstance(KinesisStreamWithPriorityFailover, context),
WeightedLoadBalanceFailover => CreateInstance(KinesisStreamWithWeightedLoadBalanceFailover, context),
RoundTripTimeBasedFailover => CreateInstance(KinesisStreamWithRoundTripTimeBasedFailover, context),
_ => CreateInstance(KinesisStreamWithLoadBalanceFailover, context),
};
}
return new KinesisStreamSink(context, AWSUtilities.CreateAWSClient<AmazonKinesisClient>(context));
case TELEMETRICS:
//If RedirectToSinkId is specified, we use TelemetryConnector. Otherwise, TelemtryClient
var redirectToSinkId = config[ConfigConstants.REDIRECT_TO_SINK_ID];
ITelemetricsClient telemetricsClient = null;
if (string.IsNullOrWhiteSpace(redirectToSinkId))
{
var cognitoIdentity = new AmazonCognitoIdentityClient(new AnonymousAWSCredentials(), RegionEndpoint.USWest2);
telemetricsClient = new TelemetricsClient(cognitoIdentity, context.ParameterStore);
}
else
{
telemetricsClient = new TelemetricsSinkConnector(context);
context.ContextData[ConfigConstants.TELEMETRY_CONNECTOR] = telemetricsClient; //Make telemetricsClient available to caller
}
return new TelemetricsSink($"_{TELEMETRICS}", TELEMETRICS_DEFAULT_INTERVAL * 1000, telemetricsClient, context.Logger);
case FILE_SYSTEM:
return new FileSystemEventSink(context);
case S3:
return new S3Sink(context, AWSUtilities.CreateAWSClient<AmazonS3Client>(context));
// Exposed directly as well
// DIRECT - Priority region failover
case CloudWatchLogsWithPriorityFailover:
case CloudWatchLogsEMFWithPriorityFailover:
{
var failoverSinkRegionStrategy = new PriorityRegionFailover<AmazonCloudWatchLogsClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>);
return new CloudWatchLogsSink(
context, new FailoverSink<AmazonCloudWatchLogsClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case CloudWatchWithPriorityFailover:
{
var failoverSinkRegionStrategy = new PriorityRegionFailover<AmazonCloudWatchClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonCloudWatchClient>);
return new CloudWatchSink
(60, context, new FailoverSink<AmazonCloudWatchClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case KinesisFirehoseWithPriorityFailover:
{
var failoverSinkRegionStrategy = new PriorityRegionFailover<AmazonKinesisFirehoseClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonKinesisFirehoseClient>);
return new KinesisFirehoseSink(
context, new FailoverSink<AmazonKinesisFirehoseClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case KinesisStreamWithPriorityFailover:
{
var failoverSinkRegionStrategy = new PriorityRegionFailover<AmazonKinesisClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonKinesisClient>);
return new KinesisStreamSink(
context, new FailoverSink<AmazonKinesisClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
// DIRECT - Load-balance region failover
case CloudWatchLogsWithLoadBalanceFailover:
case CloudWatchLogsEMFWithLoadBalanceFailover:
{
var failoverSinkRegionStrategy = new LoadBalanceRegionFailover<AmazonCloudWatchLogsClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>);
return new CloudWatchLogsSink(
context, new FailoverSink<AmazonCloudWatchLogsClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case CloudWatchWithLoadBalanceFailover:
{
var failoverSinkRegionStrategy = new LoadBalanceRegionFailover<AmazonCloudWatchClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonCloudWatchClient>);
return new CloudWatchSink
(60, context, new FailoverSink<AmazonCloudWatchClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case KinesisFirehoseWithLoadBalanceFailover:
{
var failoverSinkRegionStrategy = new LoadBalanceRegionFailover<AmazonKinesisFirehoseClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonKinesisFirehoseClient>);
return new KinesisFirehoseSink(
context, new FailoverSink<AmazonKinesisFirehoseClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case KinesisStreamWithLoadBalanceFailover:
{
var failoverSinkRegionStrategy = new LoadBalanceRegionFailover<AmazonKinesisClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonKinesisClient>);
return new KinesisStreamSink(
context, new FailoverSink<AmazonKinesisClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
// DIRECT - Weighted load-balance region failover
case CloudWatchLogsWithWeightedLoadBalanceFailover:
case CloudWatchLogsEMFWithWeightedLoadBalanceFailover:
{
var failoverSinkRegionStrategy = new WeightedLoadBalanceRegionFailover<AmazonCloudWatchLogsClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>);
return new CloudWatchLogsSink(
context, new FailoverSink<AmazonCloudWatchLogsClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case CloudWatchWithWeightedLoadBalanceFailover:
{
var failoverSinkRegionStrategy = new WeightedLoadBalanceRegionFailover<AmazonCloudWatchClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonCloudWatchClient>);
return new CloudWatchSink
(60, context, new FailoverSink<AmazonCloudWatchClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case KinesisFirehoseWithWeightedLoadBalanceFailover:
{
var failoverSinkRegionStrategy = new WeightedLoadBalanceRegionFailover<AmazonKinesisFirehoseClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonKinesisFirehoseClient>);
return new KinesisFirehoseSink(
context, new FailoverSink<AmazonKinesisFirehoseClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case KinesisStreamWithWeightedLoadBalanceFailover:
{
var failoverSinkRegionStrategy = new WeightedLoadBalanceRegionFailover<AmazonKinesisClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonKinesisClient>);
return new KinesisStreamSink(
context, new FailoverSink<AmazonKinesisClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
// DIRECT - Round trip time based region failover
case CloudWatchLogsWithRoundTripTimeBasedFailover:
case CloudWatchLogsEMFWithRoundTripTimeBasedFailover:
{
var failoverSinkRegionStrategy = new RoundTripTimeBasedRegionFailover<AmazonCloudWatchLogsClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonCloudWatchLogsClient>, CloudWatchLogsSink.CheckServiceReachable);
return new CloudWatchLogsSink(
context, new FailoverSink<AmazonCloudWatchLogsClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case CloudWatchWithRoundTripTimeBasedFailover:
{
var failoverSinkRegionStrategy = new RoundTripTimeBasedRegionFailover<AmazonCloudWatchClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonCloudWatchClient>, CloudWatchSink.CheckServiceReachable);
return new CloudWatchSink
(60, context, new FailoverSink<AmazonCloudWatchClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case KinesisFirehoseWithRoundTripTimeBasedFailover:
{
var failoverSinkRegionStrategy = new RoundTripTimeBasedRegionFailover<AmazonKinesisFirehoseClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonKinesisFirehoseClient>, KinesisFirehoseSink.CheckServiceReachable);
return new KinesisFirehoseSink(
context, new FailoverSink<AmazonKinesisFirehoseClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
case KinesisStreamWithRoundTripTimeBasedFailover:
{
var failoverSinkRegionStrategy = new RoundTripTimeBasedRegionFailover<AmazonKinesisClient>(
context, ConfigConstants.DEFAULT_MIN_WAIT_BEFORE_REGION_FAILBACK_FIRST_RETRY_IN_MINUTES * 60 * 1000,
AWSUtilities.CreateAWSClient<AmazonKinesisClient>, KinesisStreamSink.CheckServiceReachable);
return new KinesisStreamSink(
context, new FailoverSink<AmazonKinesisClient>(context, failoverSinkRegionStrategy), failoverSinkRegionStrategy);
}
default:
throw new NotImplementedException($"Sink type {sinkType} is not implemented by AWSEventSinkFactory.");
}
}