public EventBridgeWriter()

in src/main/java/software/amazon/event/kafkaconnector/EventBridgeWriter.java [70:152]


  public EventBridgeWriter(EventBridgeSinkConfig config) {
    this.config = config;

    var ebEndpointUri =
        StringUtils.trim(this.config.endpointURI).isBlank()
            ? null
            : URI.create(this.config.endpointURI);

    var retryPolicy =
        RetryPolicy.forRetryMode(RetryMode.STANDARD).toBuilder()
            .numRetries(this.config.maxRetries)
            .build();

    var name = PropertiesUtil.getConnectorName();
    var version = PropertiesUtil.getConnectorVersion();
    var userAgentPrefix = String.format("%s/%s", name, version);

    var clientConfig =
        ClientOverrideConfiguration.builder()
            .retryPolicy(retryPolicy)
            .putAdvancedOption(USER_AGENT_PREFIX, userAgentPrefix)
            .build();

    var credentialsProvider =
        EventBridgeAwsCredentialsProviderFactory.getAwsCredentialsProvider(config);

    var client =
        EventBridgeAsyncClient.builder()
            .region(Region.of(this.config.region))
            .endpointOverride(ebEndpointUri)
            .httpClientBuilder(AwsCrtAsyncHttpClient.builder())
            .overrideConfiguration(clientConfig)
            .credentialsProvider(credentialsProvider)
            .build();

    this.ebClient = client;

    this.eventBridgeMapper = new DefaultEventBridgeMapper(config);
    this.batching = new DefaultEventBridgeBatching();

    if ((config.offloadingDefaultS3Bucket != null) && !config.offloadingDefaultS3Bucket.isEmpty()) {

      var s3EndpointUri =
          StringUtils.trim(this.config.offloadingDefaultS3EndpointURI).isBlank()
              ? null
              : URI.create(this.config.offloadingDefaultS3EndpointURI);

      var s3client =
          S3AsyncClient.builder()
              .credentialsProvider(credentialsProvider)
              .endpointOverride(s3EndpointUri)
              .forcePathStyle(s3EndpointUri != null)
              .httpClientBuilder(AwsCrtAsyncHttpClient.builder())
              .overrideConfiguration(clientConfig)
              .region(Region.of(this.config.region))
              .build();
      var bucketName = StringUtils.trim(config.offloadingDefaultS3Bucket);
      var jsonPathExp = StringUtils.trim(config.offloadingDefaultFieldRef);

      log.info(
          "S3 offloading is activated with bucket: {}, JSON path: {} and endpoint override: {}",
          bucketName,
          jsonPathExp,
          s3EndpointUri == null ? "-" : s3EndpointUri);
      offloading = new S3EventBridgeEventDetailValueOffloading(s3client, bucketName, jsonPathExp);
    } else {
      log.info("S3 offloading is deactivated");
      offloading = new NoOpEventBridgeEventDetailValueOffloading();
    }

    log.trace(
        "EventBridgeWriter client config: {}",
        ReflectionToStringBuilder.toString(
            client.serviceClientConfiguration(), ToStringStyle.DEFAULT_STYLE, true));

    // fail fast if credentials cannot be resolved
    log.info("Resolving iam credentials");
    try {
      credentialsProvider.resolveCredentials();
    } catch (Exception e) {
      throw new ConnectException(e);
    }
  }