in v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java [114:245]
public PDone expand(PCollection<String> jsonStrings) {
ConnectionInformation connectionInformation =
new ConnectionInformation(options().getConnectionUrl());
ElasticsearchIO.ConnectionConfiguration config =
ElasticsearchIO.ConnectionConfiguration.create(
new String[] {connectionInformation.getElasticsearchURL().toString()},
options().getIndex(),
DOCUMENT_TYPE,
userAgent());
// If username and password are not blank, use them instead of ApiKey
if (StringUtils.isNotBlank(options().getElasticsearchUsername())
&& StringUtils.isNotBlank(options().getElasticsearchPassword())) {
config =
config
.withUsername(options().getElasticsearchUsername())
.withPassword(options().getElasticsearchPassword());
} else {
switch (options().getApiKeySource()) {
case "PLAINTEXT":
config = config.withApiKey(options().getApiKey());
break;
case "KMS":
config =
config.withApiKey(
KMSUtils.maybeDecrypt(
options().getApiKey(), options().getApiKeyKMSEncryptionKey())
.get());
break;
case "SECRET_MANAGER":
config = config.withApiKey(SecretManagerUtils.getSecret(options().getApiKeySecretId()));
break;
}
}
if (options().getTrustSelfSignedCerts() != null) {
config = config.withTrustSelfSignedCerts(options().getTrustSelfSignedCerts());
}
if (options().getDisableCertificateValidation() != null) {
config = config.withDisableCertificateValidation(options().getDisableCertificateValidation());
}
if (options().getSocketTimeout() != null) {
config = config.withSocketTimeout(options().getSocketTimeout());
}
ElasticsearchIO.Write elasticsearchWriter =
ElasticsearchIO.write()
.withConnectionConfiguration(config)
.withMaxBatchSize(options().getBatchSize())
.withMaxBatchSizeBytes(options().getBatchSizeBytes());
if (options().getPropertyAsId() != null) {
StringFieldValueExtractFn idFn =
StringFieldValueExtractFn.newBuilder()
.setPropertyName(options().getPropertyAsId())
.build();
elasticsearchWriter = elasticsearchWriter.withIdFn(idFn);
} else if (options().getJavaScriptIdFnGcsPath() != null
&& options().getJavaScriptIdFnName() != null) {
StringValueExtractorFn idFn =
StringValueExtractorFn.newBuilder()
.setFileSystemPath(options().getJavaScriptIdFnGcsPath())
.setFunctionName(options().getJavaScriptIdFnName())
.build();
elasticsearchWriter = elasticsearchWriter.withIdFn(idFn);
}
if (options().getPropertyAsIndex() != null) {
StringFieldValueExtractFn indexFn =
StringFieldValueExtractFn.newBuilder()
.setPropertyName(options().getPropertyAsIndex())
.build();
elasticsearchWriter = elasticsearchWriter.withIndexFn(indexFn);
} else if (options().getJavaScriptIndexFnGcsPath() != null
&& options().getJavaScriptIndexFnName() != null) {
StringValueExtractorFn indexFn =
StringValueExtractorFn.newBuilder()
.setFileSystemPath(options().getJavaScriptIndexFnGcsPath())
.setFunctionName(options().getJavaScriptIndexFnName())
.build();
elasticsearchWriter = elasticsearchWriter.withIndexFn(indexFn);
}
if (options().getJavaScriptTypeFnGcsPath() != null
&& options().getJavaScriptTypeFnName() != null) {
StringValueExtractorFn typeFn =
StringValueExtractorFn.newBuilder()
.setFileSystemPath(options().getJavaScriptTypeFnGcsPath())
.setFunctionName(options().getJavaScriptTypeFnName())
.build();
elasticsearchWriter = elasticsearchWriter.withTypeFn(typeFn);
}
if (options().getJavaScriptIsDeleteFnGcsPath() != null
&& options().getJavaScriptIsDeleteFnName() != null) {
BooleanValueExtractorFn isDeleteFn =
BooleanValueExtractorFn.newBuilder()
.setFileSystemPath(options().getJavaScriptIsDeleteFnGcsPath())
.setFunctionName(options().getJavaScriptIsDeleteFnName())
.build();
elasticsearchWriter = elasticsearchWriter.withIsDeleteFn(isDeleteFn);
}
if (options().getUsePartialUpdate() != null) {
elasticsearchWriter =
elasticsearchWriter.withUsePartialUpdate(
Boolean.TRUE.equals(options().getUsePartialUpdate()));
}
if (options().getBulkInsertMethod() != null) {
elasticsearchWriter =
elasticsearchWriter.withBulkInsertMethod(options().getBulkInsertMethod());
}
if (Optional.ofNullable(options().getMaxRetryAttempts()).isPresent()) {
elasticsearchWriter =
elasticsearchWriter.withRetryConfiguration(
ElasticsearchIO.RetryConfiguration.create(
options().getMaxRetryAttempts(), getDuration(options().getMaxRetryDuration())));
}
return jsonStrings.apply("WriteDocuments", elasticsearchWriter);
}