in pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java [93:374]
public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFunctionDetails extractedDetails)
throws IllegalArgumentException {
boolean isBuiltin = !StringUtils.isEmpty(functionConfig.getJar())
&& functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
// Setup source
Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
if (functionConfig.getInputs() != null) {
functionConfig.getInputs().forEach((topicName ->
sourceSpecBuilder.putInputSpecs(topicName,
Function.ConsumerSpec.newBuilder()
.setIsRegexPattern(false)
.build())
));
}
if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) {
sourceSpecBuilder.putInputSpecs(functionConfig.getTopicsPattern(),
Function.ConsumerSpec.newBuilder()
.setIsRegexPattern(true)
.build());
}
if (functionConfig.getCustomSerdeInputs() != null) {
functionConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> {
sourceSpecBuilder.putInputSpecs(topicName,
Function.ConsumerSpec.newBuilder()
.setSerdeClassName(serdeClassName)
.setIsRegexPattern(false)
.build());
});
}
if (functionConfig.getCustomSchemaInputs() != null) {
functionConfig.getCustomSchemaInputs().forEach((topicName, conf) -> {
try {
ConsumerConfig consumerConfig = OBJECT_MAPPER.readValue(conf, ConsumerConfig.class);
sourceSpecBuilder.putInputSpecs(topicName,
Function.ConsumerSpec.newBuilder()
.setSchemaType(consumerConfig.getSchemaType())
.putAllSchemaProperties(consumerConfig.getSchemaProperties())
.putAllConsumerProperties(consumerConfig.getConsumerProperties())
.setIsRegexPattern(false)
.build());
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(
String.format("Incorrect custom schema inputs,Topic %s ", topicName));
}
});
}
if (functionConfig.getInputSpecs() != null) {
functionConfig.getInputSpecs().forEach((topicName, consumerConf) -> {
Function.ConsumerSpec.Builder bldr = Function.ConsumerSpec.newBuilder()
.setIsRegexPattern(consumerConf.isRegexPattern());
if (isNotBlank(consumerConf.getSchemaType())) {
bldr.setSchemaType(consumerConf.getSchemaType());
} else if (isNotBlank(consumerConf.getSerdeClassName())) {
bldr.setSerdeClassName(consumerConf.getSerdeClassName());
}
if (consumerConf.getReceiverQueueSize() != null) {
bldr.setReceiverQueueSize(Function.ConsumerSpec.ReceiverQueueSize.newBuilder()
.setValue(consumerConf.getReceiverQueueSize()).build());
}
if (consumerConf.getSchemaProperties() != null) {
bldr.putAllSchemaProperties(consumerConf.getSchemaProperties());
}
if (consumerConf.getCryptoConfig() != null) {
bldr.setCryptoSpec(CryptoUtils.convert(consumerConf.getCryptoConfig()));
}
if (consumerConf.getMessagePayloadProcessorConfig() != null) {
bldr.setMessagePayloadProcessorSpec(
MessagePayloadProcessorUtils.convert(consumerConf.getMessagePayloadProcessorConfig()));
}
bldr.putAllConsumerProperties(consumerConf.getConsumerProperties());
bldr.setPoolMessages(consumerConf.isPoolMessages());
sourceSpecBuilder.putInputSpecs(topicName, bldr.build());
});
}
// Set subscription type
Function.SubscriptionType subType;
if ((functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering())
|| FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE
.equals(functionConfig.getProcessingGuarantees())) {
subType = Function.SubscriptionType.FAILOVER;
} else if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
subType = Function.SubscriptionType.KEY_SHARED;
} else {
subType = Function.SubscriptionType.SHARED;
}
sourceSpecBuilder.setSubscriptionType(subType);
// Set subscription name
if (isNotBlank(functionConfig.getSubName())) {
sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName());
}
// Set subscription position
if (functionConfig.getSubscriptionPosition() != null) {
Function.SubscriptionPosition subPosition;
if (SubscriptionInitialPosition.Earliest == functionConfig.getSubscriptionPosition()) {
subPosition = Function.SubscriptionPosition.EARLIEST;
} else {
subPosition = Function.SubscriptionPosition.LATEST;
}
sourceSpecBuilder.setSubscriptionPosition(subPosition);
}
if (functionConfig.getSkipToLatest() != null) {
sourceSpecBuilder.setSkipToLatest(functionConfig.getSkipToLatest());
} else {
sourceSpecBuilder.setSkipToLatest(false);
}
if (extractedDetails.getTypeArg0() != null) {
sourceSpecBuilder.setTypeClassName(extractedDetails.getTypeArg0());
} else if (StringUtils.isNotEmpty(functionConfig.getInputTypeClassName())) {
sourceSpecBuilder.setTypeClassName(functionConfig.getInputTypeClassName());
}
if (functionConfig.getTimeoutMs() != null) {
sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs());
// We use negative acks for fast tracking failures
sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(functionConfig.getTimeoutMs());
}
if (functionConfig.getCleanupSubscription() != null) {
sourceSpecBuilder.setCleanupSubscription(functionConfig.getCleanupSubscription());
} else {
sourceSpecBuilder.setCleanupSubscription(true);
}
functionDetailsBuilder.setSource(sourceSpecBuilder);
// Setup sink
Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
if (functionConfig.getOutput() != null) {
sinkSpecBuilder.setTopic(functionConfig.getOutput());
}
if (!StringUtils.isBlank(functionConfig.getOutputSerdeClassName())) {
sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName());
}
if (!StringUtils.isBlank(functionConfig.getOutputSchemaType())) {
sinkSpecBuilder.setSchemaType(functionConfig.getOutputSchemaType());
}
if (functionConfig.getForwardSourceMessageProperty() == Boolean.TRUE) {
sinkSpecBuilder.setForwardSourceMessageProperty(functionConfig.getForwardSourceMessageProperty());
}
if (functionConfig.getCustomSchemaOutputs() != null && functionConfig.getOutput() != null) {
String conf = functionConfig.getCustomSchemaOutputs().get(functionConfig.getOutput());
try {
if (StringUtils.isNotEmpty(conf)) {
ConsumerConfig consumerConfig = OBJECT_MAPPER.readValue(conf, ConsumerConfig.class);
sinkSpecBuilder.putAllSchemaProperties(consumerConfig.getSchemaProperties());
sinkSpecBuilder.putAllConsumerProperties(consumerConfig.getConsumerProperties());
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(
String.format("Incorrect custom schema outputs,Topic %s ", functionConfig.getOutput()));
}
}
if (extractedDetails.getTypeArg1() != null) {
sinkSpecBuilder.setTypeClassName(extractedDetails.getTypeArg1());
} else if (StringUtils.isNotEmpty(functionConfig.getOutputTypeClassName())) {
sinkSpecBuilder.setTypeClassName(functionConfig.getOutputTypeClassName());
}
if (functionConfig.getProducerConfig() != null) {
sinkSpecBuilder.setProducerSpec(convertProducerConfigToProducerSpec(functionConfig.getProducerConfig()));
}
if (functionConfig.getBatchBuilder() != null) {
Function.ProducerSpec.Builder builder = sinkSpecBuilder.getProducerSpec() != null
? sinkSpecBuilder.getProducerSpec().toBuilder()
: Function.ProducerSpec.newBuilder();
sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(functionConfig.getBatchBuilder()).build());
}
functionDetailsBuilder.setSink(sinkSpecBuilder);
if (functionConfig.getTenant() != null) {
functionDetailsBuilder.setTenant(functionConfig.getTenant());
}
if (functionConfig.getNamespace() != null) {
functionDetailsBuilder.setNamespace(functionConfig.getNamespace());
}
if (functionConfig.getName() != null) {
functionDetailsBuilder.setName(functionConfig.getName());
}
if (functionConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
}
if (functionConfig.getRuntime() != null) {
functionDetailsBuilder.setRuntime(FunctionCommon.convertRuntime(functionConfig.getRuntime()));
}
if (functionConfig.getProcessingGuarantees() != null) {
functionDetailsBuilder.setProcessingGuarantees(
FunctionCommon.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
}
if (functionConfig.getRetainKeyOrdering() != null) {
functionDetailsBuilder.setRetainKeyOrdering(functionConfig.getRetainKeyOrdering());
}
if (functionConfig.getRetainOrdering() != null) {
functionDetailsBuilder.setRetainOrdering(functionConfig.getRetainOrdering());
}
if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
Function.RetryDetails.Builder retryBuilder = Function.RetryDetails.newBuilder();
retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries());
if (isNotEmpty(functionConfig.getDeadLetterTopic())) {
retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic());
}
functionDetailsBuilder.setRetryDetails(retryBuilder);
}
Map<String, Object> configs = new HashMap<>();
if (functionConfig.getUserConfig() != null) {
configs.putAll(functionConfig.getUserConfig());
}
// windowing related
WindowConfig windowConfig = functionConfig.getWindowConfig();
if (windowConfig != null) {
// Windows Function not support MANUAL and EFFECTIVELY_ONCE.
if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE
|| functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.MANUAL) {
throw new IllegalArgumentException(
"Windows Function not support "
+ functionConfig.getProcessingGuarantees() + " delivery semantics.");
} else {
// Override functionConfig.getProcessingGuarantees to MANUAL, and set windowsFunction is guarantees
windowConfig.setProcessingGuarantees(WindowConfig.ProcessingGuarantees
.valueOf(functionDetailsBuilder.getProcessingGuarantees().name()));
functionDetailsBuilder.setProcessingGuarantees(Function.ProcessingGuarantees.MANUAL);
}
windowConfig.setActualWindowFunctionClassName(extractedDetails.getFunctionClassName());
configs.put(WindowConfig.WINDOW_CONFIG_KEY, windowConfig);
// set class name to window function executor
functionDetailsBuilder.setClassName("org.apache.pulsar.functions.windowing.WindowFunctionExecutor");
} else {
if (extractedDetails.getFunctionClassName() != null) {
functionDetailsBuilder.setClassName(extractedDetails.getFunctionClassName());
}
}
if (!configs.isEmpty()) {
functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
}
if (functionConfig.getSecrets() != null && !functionConfig.getSecrets().isEmpty()) {
functionDetailsBuilder.setSecretsMap(new Gson().toJson(functionConfig.getSecrets()));
}
if (functionConfig.getAutoAck() != null) {
functionDetailsBuilder.setAutoAck(functionConfig.getAutoAck());
} else {
functionDetailsBuilder.setAutoAck(true);
}
if (functionConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
} else {
functionDetailsBuilder.setParallelism(1);
}
// use default resources if resources not set
Resources resources = Resources.mergeWithDefault(functionConfig.getResources());
Function.Resources.Builder bldr = Function.Resources.newBuilder();
bldr.setCpu(resources.getCpu());
bldr.setRam(resources.getRam());
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);
if (!StringUtils.isEmpty(functionConfig.getRuntimeFlags())) {
functionDetailsBuilder.setRuntimeFlags(functionConfig.getRuntimeFlags());
}
functionDetailsBuilder.setComponentType(FunctionDetails.ComponentType.FUNCTION);
if (!StringUtils.isEmpty(functionConfig.getCustomRuntimeOptions())) {
functionDetailsBuilder.setCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions());
}
if (isBuiltin) {
String builtin = functionConfig.getJar().replaceFirst("^builtin://", "");
functionDetailsBuilder.setBuiltin(builtin);
}
return validateFunctionDetails(functionDetailsBuilder.build());
}