in src/main/java/com/google/cloud/solutions/routes/MqttToCloudPubSubRoute.java [71:102]
public void configure() throws Exception {
SupervisingRouteController supervising = getCamelContext().getRouteController().supervising();
supervising.setBackOffDelay(200);
supervising.setIncludeRoutes("paho-mqtt5:*");
// Configure the client id as an endpoint parameter instead of using a component
// parameter (and the corresponding property)
// so that we can have each client connecting with its own (dynamically
// generated) unique client id. See
// https://camel.apache.org/components/3.18.x/paho-mqtt5-component.html for
// details about the configuration options
String routeStart =
"paho-mqtt5:" + mqttSourceTopic + "?" + "clientId=" + mqttFromSourceTopicClientId;
String routeDestination =
"google-pubsub:" + cloudPubSubProjectId + ":" + cloudPubSubDestinationTopicName;
LOG.infof("Apache Camel route start: %s", routeStart);
LOG.infof("Apache Camel route destination: %s", routeDestination);
from(routeStart)
.id(mqttToCloudPubSubRouteId)
.process(
exchange -> {
String mqttTopic =
exchange.getIn().getHeader(PahoMqtt5Constants.MQTT_TOPIC, String.class);
Map<String, String> headers = new HashMap<>();
headers.put(SOURCE_MQTT_TOPIC_HEADER_NAME, mqttTopic);
exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, headers);
})
.to(routeDestination);
}