public PDone expand()

in v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java [114:245]


  public PDone expand(PCollection<String> jsonStrings) {
    ConnectionInformation connectionInformation =
        new ConnectionInformation(options().getConnectionUrl());

    ElasticsearchIO.ConnectionConfiguration config =
        ElasticsearchIO.ConnectionConfiguration.create(
            new String[] {connectionInformation.getElasticsearchURL().toString()},
            options().getIndex(),
            DOCUMENT_TYPE,
            userAgent());

    // If username and password are not blank, use them instead of ApiKey
    if (StringUtils.isNotBlank(options().getElasticsearchUsername())
        && StringUtils.isNotBlank(options().getElasticsearchPassword())) {
      config =
          config
              .withUsername(options().getElasticsearchUsername())
              .withPassword(options().getElasticsearchPassword());
    } else {
      switch (options().getApiKeySource()) {
        case "PLAINTEXT":
          config = config.withApiKey(options().getApiKey());
          break;
        case "KMS":
          config =
              config.withApiKey(
                  KMSUtils.maybeDecrypt(
                          options().getApiKey(), options().getApiKeyKMSEncryptionKey())
                      .get());
          break;
        case "SECRET_MANAGER":
          config = config.withApiKey(SecretManagerUtils.getSecret(options().getApiKeySecretId()));
          break;
      }
    }

    if (options().getTrustSelfSignedCerts() != null) {
      config = config.withTrustSelfSignedCerts(options().getTrustSelfSignedCerts());
    }

    if (options().getDisableCertificateValidation() != null) {
      config = config.withDisableCertificateValidation(options().getDisableCertificateValidation());
    }

    if (options().getSocketTimeout() != null) {
      config = config.withSocketTimeout(options().getSocketTimeout());
    }

    ElasticsearchIO.Write elasticsearchWriter =
        ElasticsearchIO.write()
            .withConnectionConfiguration(config)
            .withMaxBatchSize(options().getBatchSize())
            .withMaxBatchSizeBytes(options().getBatchSizeBytes());

    if (options().getPropertyAsId() != null) {
      StringFieldValueExtractFn idFn =
          StringFieldValueExtractFn.newBuilder()
              .setPropertyName(options().getPropertyAsId())
              .build();

      elasticsearchWriter = elasticsearchWriter.withIdFn(idFn);
    } else if (options().getJavaScriptIdFnGcsPath() != null
        && options().getJavaScriptIdFnName() != null) {
      StringValueExtractorFn idFn =
          StringValueExtractorFn.newBuilder()
              .setFileSystemPath(options().getJavaScriptIdFnGcsPath())
              .setFunctionName(options().getJavaScriptIdFnName())
              .build();

      elasticsearchWriter = elasticsearchWriter.withIdFn(idFn);
    }

    if (options().getPropertyAsIndex() != null) {
      StringFieldValueExtractFn indexFn =
          StringFieldValueExtractFn.newBuilder()
              .setPropertyName(options().getPropertyAsIndex())
              .build();

      elasticsearchWriter = elasticsearchWriter.withIndexFn(indexFn);
    } else if (options().getJavaScriptIndexFnGcsPath() != null
        && options().getJavaScriptIndexFnName() != null) {
      StringValueExtractorFn indexFn =
          StringValueExtractorFn.newBuilder()
              .setFileSystemPath(options().getJavaScriptIndexFnGcsPath())
              .setFunctionName(options().getJavaScriptIndexFnName())
              .build();

      elasticsearchWriter = elasticsearchWriter.withIndexFn(indexFn);
    }

    if (options().getJavaScriptTypeFnGcsPath() != null
        && options().getJavaScriptTypeFnName() != null) {
      StringValueExtractorFn typeFn =
          StringValueExtractorFn.newBuilder()
              .setFileSystemPath(options().getJavaScriptTypeFnGcsPath())
              .setFunctionName(options().getJavaScriptTypeFnName())
              .build();

      elasticsearchWriter = elasticsearchWriter.withTypeFn(typeFn);
    }

    if (options().getJavaScriptIsDeleteFnGcsPath() != null
        && options().getJavaScriptIsDeleteFnName() != null) {
      BooleanValueExtractorFn isDeleteFn =
          BooleanValueExtractorFn.newBuilder()
              .setFileSystemPath(options().getJavaScriptIsDeleteFnGcsPath())
              .setFunctionName(options().getJavaScriptIsDeleteFnName())
              .build();

      elasticsearchWriter = elasticsearchWriter.withIsDeleteFn(isDeleteFn);
    }

    if (options().getUsePartialUpdate() != null) {
      elasticsearchWriter =
          elasticsearchWriter.withUsePartialUpdate(
              Boolean.TRUE.equals(options().getUsePartialUpdate()));
    }

    if (options().getBulkInsertMethod() != null) {
      elasticsearchWriter =
          elasticsearchWriter.withBulkInsertMethod(options().getBulkInsertMethod());
    }

    if (Optional.ofNullable(options().getMaxRetryAttempts()).isPresent()) {
      elasticsearchWriter =
          elasticsearchWriter.withRetryConfiguration(
              ElasticsearchIO.RetryConfiguration.create(
                  options().getMaxRetryAttempts(), getDuration(options().getMaxRetryDuration())));
    }

    return jsonStrings.apply("WriteDocuments", elasticsearchWriter);
  }