in src/main/java/software/amazon/event/kafkaconnector/EventBridgeWriter.java [70:152]
public EventBridgeWriter(EventBridgeSinkConfig config) {
this.config = config;
var ebEndpointUri =
StringUtils.trim(this.config.endpointURI).isBlank()
? null
: URI.create(this.config.endpointURI);
var retryPolicy =
RetryPolicy.forRetryMode(RetryMode.STANDARD).toBuilder()
.numRetries(this.config.maxRetries)
.build();
var name = PropertiesUtil.getConnectorName();
var version = PropertiesUtil.getConnectorVersion();
var userAgentPrefix = String.format("%s/%s", name, version);
var clientConfig =
ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.putAdvancedOption(USER_AGENT_PREFIX, userAgentPrefix)
.build();
var credentialsProvider =
EventBridgeAwsCredentialsProviderFactory.getAwsCredentialsProvider(config);
var client =
EventBridgeAsyncClient.builder()
.region(Region.of(this.config.region))
.endpointOverride(ebEndpointUri)
.httpClientBuilder(AwsCrtAsyncHttpClient.builder())
.overrideConfiguration(clientConfig)
.credentialsProvider(credentialsProvider)
.build();
this.ebClient = client;
this.eventBridgeMapper = new DefaultEventBridgeMapper(config);
this.batching = new DefaultEventBridgeBatching();
if ((config.offloadingDefaultS3Bucket != null) && !config.offloadingDefaultS3Bucket.isEmpty()) {
var s3EndpointUri =
StringUtils.trim(this.config.offloadingDefaultS3EndpointURI).isBlank()
? null
: URI.create(this.config.offloadingDefaultS3EndpointURI);
var s3client =
S3AsyncClient.builder()
.credentialsProvider(credentialsProvider)
.endpointOverride(s3EndpointUri)
.forcePathStyle(s3EndpointUri != null)
.httpClientBuilder(AwsCrtAsyncHttpClient.builder())
.overrideConfiguration(clientConfig)
.region(Region.of(this.config.region))
.build();
var bucketName = StringUtils.trim(config.offloadingDefaultS3Bucket);
var jsonPathExp = StringUtils.trim(config.offloadingDefaultFieldRef);
log.info(
"S3 offloading is activated with bucket: {}, JSON path: {} and endpoint override: {}",
bucketName,
jsonPathExp,
s3EndpointUri == null ? "-" : s3EndpointUri);
offloading = new S3EventBridgeEventDetailValueOffloading(s3client, bucketName, jsonPathExp);
} else {
log.info("S3 offloading is deactivated");
offloading = new NoOpEventBridgeEventDetailValueOffloading();
}
log.trace(
"EventBridgeWriter client config: {}",
ReflectionToStringBuilder.toString(
client.serviceClientConfiguration(), ToStringStyle.DEFAULT_STYLE, true));
// fail fast if credentials cannot be resolved
log.info("Resolving iam credentials");
try {
credentialsProvider.resolveCredentials();
} catch (Exception e) {
throw new ConnectException(e);
}
}