in streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java [60:87]
public TransportProtocol getPreferredProtocol() {
if (source instanceof SpDataStream) {
return ((SpDataStream) source)
.getEventGrounding()
.getTransportProtocol();
} else {
for (SpProtocol prioritizedProtocol : prioritizedProtocols) {
if (prioritizedProtocol.getProtocolClass().equals(KafkaTransportProtocol.class.getCanonicalName())
&& supportsProtocol(KafkaTransportProtocol.class)) {
return kafkaTopic();
} else if (prioritizedProtocol.getProtocolClass().equals(JmsTransportProtocol.class.getCanonicalName())
&& supportsProtocol(JmsTransportProtocol.class)) {
return jmsTopic();
} else if (prioritizedProtocol.getProtocolClass().equals(MqttTransportProtocol.class.getCanonicalName())
&& supportsProtocol(MqttTransportProtocol.class)) {
return mqttTopic();
} else if (prioritizedProtocol.getProtocolClass().equals(NatsTransportProtocol.class.getCanonicalName())
&& supportsProtocol(NatsTransportProtocol.class)) {
return natsTopic();
} else if (prioritizedProtocol.getProtocolClass().equals(PulsarTransportProtocol.class.getCanonicalName())
&& supportsProtocol(PulsarTransportProtocol.class)) {
return new PulsarTransportProtocol(messagingSettings.getPulsarUrl(),
new SimpleTopicDefinition(outputTopic));
}
}
}
return kafkaTopic();
}