public void start()

in modules/camel-ext/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java [74:141]


    public void start() throws IgniteException {
        // Ensure that the endpoint URI is provided.
        A.notNullOrEmpty(endpointUri, "endpoint URI must be provided");

        // Check that one and only one tuple extractor is provided.
        A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null),
            "tuple extractor missing");

        // If a custom CamelContext is not provided, initialize one.
        if (camelCtx == null)
            camelCtx = new DefaultCamelContext();

        // If the camel context is not started then simply start it up
        if (!camelCtx.isStarted())
            camelCtx.start();

        if (!camelCtx.isRunAllowed())
            throw new IgniteException("Failed to start Camel streamer (CamelContext not in a runnable state).");

        log = getIgnite().log();

        // Instantiate the Camel endpoint.
        try {
            endpoint = CamelContextHelper.getMandatoryEndpoint(camelCtx, endpointUri);
        }
        catch (Exception e) {
            U.error(log, e);

            camelCtx.stop();
            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
        }

        // Create the Camel consumer.
        try {
            consumer = endpoint.createConsumer(this);
        }
        catch (Exception e) {
            U.error(log, e);

            camelCtx.stop();
            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
        }

        // Start the Camel services.
        try {
            ServiceHelper.startService(camelCtx, endpoint, consumer);
        }
        catch (Exception e) {
            U.error(log, e);

            camelCtx.stop();
            try {
                ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);

                consumer = null;
                endpoint = null;
            }
            catch (Exception e1) {
                throw new IgniteException("Failed to start Camel streamer; failed to stop the context, endpoint or " +
                    "consumer during rollback of failed initialization [errMsg=" + e.getMessage() + ", stopErrMsg=" +
                    e1.getMessage() + ']');
            }

            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
        }

        U.log(log, "Started Camel streamer consuming from endpoint URI: " + endpointUri);
    }