in pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java [74:283]
public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetails sinkDetails) throws IOException {
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
boolean isBuiltin =
!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive()
.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
if (sinkConfig.getTenant() != null) {
functionDetailsBuilder.setTenant(sinkConfig.getTenant());
}
if (sinkConfig.getNamespace() != null) {
functionDetailsBuilder.setNamespace(sinkConfig.getNamespace());
}
if (sinkConfig.getName() != null) {
functionDetailsBuilder.setName(sinkConfig.getName());
}
if (sinkConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(sinkConfig.getLogTopic());
}
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
if (sinkConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
} else {
functionDetailsBuilder.setParallelism(1);
}
if (sinkDetails.getFunctionClassName() != null) {
functionDetailsBuilder.setClassName(sinkDetails.getFunctionClassName());
} else {
functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
}
if (sinkConfig.getTransformFunctionConfig() != null) {
functionDetailsBuilder.setUserConfig(sinkConfig.getTransformFunctionConfig());
}
if (sinkConfig.getProcessingGuarantees() != null) {
functionDetailsBuilder.setProcessingGuarantees(
convertProcessingGuarantee(sinkConfig.getProcessingGuarantees()));
} else {
functionDetailsBuilder.setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE);
}
// set source spec
// source spec classname should be empty so that the default pulsar source will be used
Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
if (sinkConfig.getInputs() != null) {
sinkConfig.getInputs().forEach(topicName ->
sourceSpecBuilder.putInputSpecs(topicName,
Function.ConsumerSpec.newBuilder()
.setIsRegexPattern(false)
.build()));
}
if (!StringUtils.isEmpty(sinkConfig.getTopicsPattern())) {
sourceSpecBuilder.putInputSpecs(sinkConfig.getTopicsPattern(),
Function.ConsumerSpec.newBuilder()
.setIsRegexPattern(true)
.build());
}
if (sinkConfig.getTopicToSerdeClassName() != null) {
sinkConfig.getTopicToSerdeClassName().forEach((topicName, serde) -> {
sourceSpecBuilder.putInputSpecs(topicName,
Function.ConsumerSpec.newBuilder()
.setSerdeClassName(serde == null ? "" : serde)
.setIsRegexPattern(false)
.build());
});
}
if (sinkConfig.getTopicToSchemaType() != null) {
sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
sourceSpecBuilder.putInputSpecs(topicName,
Function.ConsumerSpec.newBuilder()
.setSchemaType(schemaType == null ? "" : schemaType)
.setIsRegexPattern(false)
.build());
});
}
if (sinkConfig.getInputSpecs() != null) {
sinkConfig.getInputSpecs().forEach((topic, spec) -> {
Function.ConsumerSpec.Builder bldr = Function.ConsumerSpec.newBuilder()
.setIsRegexPattern(spec.isRegexPattern());
if (StringUtils.isNotBlank(spec.getSchemaType())) {
bldr.setSchemaType(spec.getSchemaType());
} else if (StringUtils.isNotBlank(spec.getSerdeClassName())) {
bldr.setSerdeClassName(spec.getSerdeClassName());
}
if (spec.getReceiverQueueSize() != null) {
bldr.setReceiverQueueSize(Function.ConsumerSpec.ReceiverQueueSize.newBuilder()
.setValue(spec.getReceiverQueueSize()).build());
}
if (spec.getCryptoConfig() != null) {
bldr.setCryptoSpec(CryptoUtils.convert(spec.getCryptoConfig()));
}
if (spec.getMessagePayloadProcessorConfig() != null) {
bldr.setMessagePayloadProcessorSpec(
MessagePayloadProcessorUtils.convert(spec.getMessagePayloadProcessorConfig()));
}
bldr.putAllConsumerProperties(spec.getConsumerProperties());
bldr.setPoolMessages(spec.isPoolMessages());
sourceSpecBuilder.putInputSpecs(topic, bldr.build());
});
}
if (sinkDetails.getTypeArg() != null) {
sourceSpecBuilder.setTypeClassName(sinkDetails.getTypeArg());
}
if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
}
// Set subscription type
Function.SubscriptionType subType;
if ((sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering())
|| FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees())) {
subType = Function.SubscriptionType.FAILOVER;
} else if (sinkConfig.getRetainKeyOrdering() != null && sinkConfig.getRetainKeyOrdering()) {
subType = Function.SubscriptionType.KEY_SHARED;
} else {
subType = Function.SubscriptionType.SHARED;
}
sourceSpecBuilder.setSubscriptionType(subType);
if (sinkConfig.getAutoAck() != null) {
functionDetailsBuilder.setAutoAck(sinkConfig.getAutoAck());
} else {
functionDetailsBuilder.setAutoAck(true);
}
if (sinkConfig.getTimeoutMs() != null) {
sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
}
if (sinkConfig.getCleanupSubscription() != null) {
sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
} else {
sourceSpecBuilder.setCleanupSubscription(true);
}
if (sinkConfig.getNegativeAckRedeliveryDelayMs() != null && sinkConfig.getNegativeAckRedeliveryDelayMs() > 0) {
sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(sinkConfig.getNegativeAckRedeliveryDelayMs());
}
if (sinkConfig.getSourceSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.EARLIEST);
} else {
sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.LATEST);
}
functionDetailsBuilder.setSource(sourceSpecBuilder);
if (sinkConfig.getRetainKeyOrdering() != null) {
functionDetailsBuilder.setRetainKeyOrdering(sinkConfig.getRetainKeyOrdering());
}
if (sinkConfig.getRetainOrdering() != null) {
functionDetailsBuilder.setRetainOrdering(sinkConfig.getRetainOrdering());
}
if (sinkConfig.getMaxMessageRetries() != null && sinkConfig.getMaxMessageRetries() > 0) {
Function.RetryDetails.Builder retryDetails = Function.RetryDetails.newBuilder();
retryDetails.setMaxMessageRetries(sinkConfig.getMaxMessageRetries());
if (StringUtils.isNotBlank(sinkConfig.getDeadLetterTopic())) {
retryDetails.setDeadLetterTopic(sinkConfig.getDeadLetterTopic());
}
functionDetailsBuilder.setRetryDetails(retryDetails);
}
// set up sink spec
Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
if (sinkDetails.getSinkClassName() != null) {
sinkSpecBuilder.setClassName(sinkDetails.getSinkClassName());
}
if (isBuiltin) {
String builtin = sinkConfig.getArchive().replaceFirst("^builtin://", "");
sinkSpecBuilder.setBuiltin(builtin);
}
if (!isEmpty(sinkConfig.getTransformFunction())
&& sinkConfig.getTransformFunction().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
functionDetailsBuilder.setBuiltin(sinkConfig.getTransformFunction().replaceFirst("^builtin://", ""));
}
if (sinkConfig.getConfigs() != null) {
sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs()));
}
if (sinkConfig.getSecrets() != null && !sinkConfig.getSecrets().isEmpty()) {
functionDetailsBuilder.setSecretsMap(new Gson().toJson(sinkConfig.getSecrets()));
}
if (sinkDetails.getTypeArg() != null) {
sinkSpecBuilder.setTypeClassName(sinkDetails.getTypeArg());
}
functionDetailsBuilder.setSink(sinkSpecBuilder);
// use default resources if resources not set
Resources resources = Resources.mergeWithDefault(sinkConfig.getResources());
Function.Resources.Builder bldr = Function.Resources.newBuilder();
bldr.setCpu(resources.getCpu());
bldr.setRam(resources.getRam());
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);
if (isNotBlank(sinkConfig.getRuntimeFlags())) {
functionDetailsBuilder.setRuntimeFlags(sinkConfig.getRuntimeFlags());
}
functionDetailsBuilder.setComponentType(FunctionDetails.ComponentType.SINK);
if (!StringUtils.isEmpty(sinkConfig.getCustomRuntimeOptions())) {
functionDetailsBuilder.setCustomRuntimeOptions(sinkConfig.getCustomRuntimeOptions());
}
return FunctionConfigUtils.validateFunctionDetails(functionDetailsBuilder.build());
}