in modules/camel-ext/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java [74:141]
public void start() throws IgniteException {
// Ensure that the endpoint URI is provided.
A.notNullOrEmpty(endpointUri, "endpoint URI must be provided");
// Check that one and only one tuple extractor is provided.
A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null),
"tuple extractor missing");
// If a custom CamelContext is not provided, initialize one.
if (camelCtx == null)
camelCtx = new DefaultCamelContext();
// If the camel context is not started then simply start it up
if (!camelCtx.isStarted())
camelCtx.start();
if (!camelCtx.isRunAllowed())
throw new IgniteException("Failed to start Camel streamer (CamelContext not in a runnable state).");
log = getIgnite().log();
// Instantiate the Camel endpoint.
try {
endpoint = CamelContextHelper.getMandatoryEndpoint(camelCtx, endpointUri);
}
catch (Exception e) {
U.error(log, e);
camelCtx.stop();
throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
}
// Create the Camel consumer.
try {
consumer = endpoint.createConsumer(this);
}
catch (Exception e) {
U.error(log, e);
camelCtx.stop();
throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
}
// Start the Camel services.
try {
ServiceHelper.startService(camelCtx, endpoint, consumer);
}
catch (Exception e) {
U.error(log, e);
camelCtx.stop();
try {
ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
consumer = null;
endpoint = null;
}
catch (Exception e1) {
throw new IgniteException("Failed to start Camel streamer; failed to stop the context, endpoint or " +
"consumer during rollback of failed initialization [errMsg=" + e.getMessage() + ", stopErrMsg=" +
e1.getMessage() + ']');
}
throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
}
U.log(log, "Started Camel streamer consuming from endpoint URI: " + endpointUri);
}