in v2/kafka-to-kafka/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToKafka.java [60:175]
public static PipelineResult run(KafkaToKafkaOptions options) {
if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.SASL_PLAIN)) {
checkArgument(
options.getKafkaReadUsernameSecretId().trim().length() > 0,
"KafkaReadUsernameSecretId required to access username for source Kafka");
checkArgument(
options.getKafkaReadPasswordSecretId().trim().length() > 0,
"KafkaReadPasswordSecretId required to access password for source kafka");
} else if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.TLS)) {
checkArgument(
options.getKafkaReadTruststoreLocation().trim().length() > 0,
"KafkaReadTruststoreLocation for trust store certificate required for ssl authentication");
checkArgument(
options.getKafkaReadTruststorePasswordSecretId().trim().length() > 0,
"KafkaReadTruststorePassword for trust store password required for accessing truststore");
checkArgument(
options.getKafkaReadKeystoreLocation().trim().length() > 0,
"KafkaReadKeystoreLocation for key store location required for ssl authentication");
checkArgument(
options.getKafkaReadKeystorePasswordSecretId().trim().length() > 0,
"KafkaReadKeystorePassword for key store password required to access key store");
checkArgument(
options.getKafkaReadKeyPasswordSecretId().trim().length() > 0,
"KafkaReadKeyPasswordSecretId version for key password required for SSL authentication");
} else if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.NONE)
|| (options
.getKafkaReadAuthenticationMode()
.equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS))) {
} else {
throw new UnsupportedOperationException(
"Authentication method not supported: " + options.getKafkaReadAuthenticationMode());
}
if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.SASL_PLAIN)) {
checkArgument(
options.getKafkaWriteUsernameSecretId().trim().length() > 0,
"KafkaWriteUsernameSecretId required to access username for source Kafka");
checkArgument(
options.getKafkaWritePasswordSecretId().trim().length() > 0,
"KafkaWritePasswordSecretId required to access password for destination Kafka");
} else if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.TLS)) {
checkArgument(
options.getKafkaWriteTruststoreLocation().trim().length() > 0,
"KafkaWriteTruststoreLocation for trust store certificate required for ssl authentication");
checkArgument(
options.getKafkaWriteTruststorePasswordSecretId().trim().length() > 0,
"KafkaWriteTruststorePasswordSecretId for trust store password required for accessing truststore");
checkArgument(
options.getKafkaWriteKeystoreLocation().trim().length() > 0,
"KafkaWriteKeystoreLocation for key store location required for ssl authentication");
checkArgument(
options.getKafkaWriteKeystorePasswordSecretId().trim().length() > 0,
"KafkaWriteKeystorePasswordSecretId for key store password required to access key store");
checkArgument(
options.getKafkaWriteKeyPasswordSecretId().trim().length() > 0,
"KafkaWriteKeyPasswordSecretId for source key password secret id version required for SSL authentication");
} else if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.NONE)
|| options
.getKafkaWriteAuthenticationMethod()
.equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS)) {
// No additional validation is required for these auth mechanisms since they don't depend on
// any specific pipeline options.
} else {
throw new UnsupportedOperationException(
"Authentication method not supported: " + options.getKafkaWriteAuthenticationMethod());
}
String sourceTopic;
String sourceBootstrapServers;
if (options.getReadBootstrapServerAndTopic() != null) {
List<String> sourceBootstrapServerAndTopicList =
KafkaTopicUtils.getBootstrapServerAndTopic(
options.getReadBootstrapServerAndTopic(), options.getProject());
sourceTopic = sourceBootstrapServerAndTopicList.get(1);
sourceBootstrapServers = sourceBootstrapServerAndTopicList.get(0);
} else {
throw new IllegalArgumentException(
"Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
}
String destinationTopic;
String destinationBootstrapServers;
if (options.getWriteBootstrapServerAndTopic() != null) {
List<String> destinationBootstrapServerAndTopicList =
KafkaTopicUtils.getBootstrapServerAndTopic(
options.getWriteBootstrapServerAndTopic(), options.getProject());
destinationBootstrapServers = destinationBootstrapServerAndTopicList.get(0);
destinationTopic = destinationBootstrapServerAndTopicList.get(1);
} else {
throw new IllegalArgumentException(
"Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
}
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
"Read from Kafka",
KafkaTransform.readBytesFromKafka(
sourceBootstrapServers,
Collections.singletonList(sourceTopic),
KafkaConfig.fromReadOptions(options),
options.getEnableCommitOffsets())
.withoutMetadata())
.apply(
"Write to Kafka",
KafkaIO.<byte[], byte[]>write()
.withBootstrapServers(destinationBootstrapServers)
.withTopic(destinationTopic)
.withKeySerializer(ByteArraySerializer.class)
.withValueSerializer(ByteArraySerializer.class)
.withProducerConfigUpdates(KafkaConfig.fromWriteOptions(options))
.withProducerFactoryFn(new FileAwareProducerFactoryFn()));
return pipeline.run();
}