public void start()

in modules/jms11-ext/src/main/java/org/apache/ignite/stream/jms11/JmsStreamer.java [166:268]


    public void start() throws IgniteException {
        if (!stopped)
            throw new IgniteException("Attempted to start an already started JMS Streamer");

        try {
            A.notNull(getStreamer(), "streamer");
            A.notNull(getIgnite(), "ignite");

            log = getIgnite().log();

            A.notNull(transformer, "message transformer");
            A.notNull(connectionFactory, "connection factory");
            A.ensure(threads > 0, "threads > 0");

            // handle batched && transacted parameter interaction
            if (batched && !transacted) {
                log.warning("Starting a Batched JMS Streamer without transacted flag = true. Setting it automatically.");
                transacted = true;
            }

            // handle batch completion criteria
            if (batched) {
                A.ensure(batchClosureMillis > 0 || batchClosureSize > 0, "at least one of batch closure size or " +
                    "batch closure frequency must be specified when using batch consumption");
            }

            // check the parameters needed for durable subscriptions, if enabled
            if (durableSubscription) {
                A.notNullOrEmpty(clientId, "client id is compulsory when using durable subscriptions");
                A.notNullOrEmpty(durableSubscriptionName, "durable subscription name is compulsory when using " +
                    "durable subscriptions");
            }

            // validate the destination; if we have an explicit destination, make sure it's of type Queue or Topic;
            // else make sure that the destinationName and the destinationType are valid
            if (destination == null) {
                A.notNull(destinationType, "destination type");
                A.ensure(destinationType.isAssignableFrom(Queue.class) || destinationType.isAssignableFrom(Topic.class),
                    "this streamer can only handle Queues or Topics.");
                A.notNullOrEmpty(destinationName, "destination or destination name");
            }
            else if (destination instanceof Queue) {
                destinationType = Queue.class;
            }
            else if (destination instanceof Topic) {
                destinationType = Topic.class;
            }
            else {
                throw new IllegalArgumentException("Invalid destination object. Can only handle Queues or Topics.");
            }

            // create a new connection and the client iD if relevant.
            connection = connectionFactory.createConnection();
            if (clientId != null && clientId.trim().length() > 0) {
                connection.setClientID(clientId.trim());
            }

            connection.setExceptionListener(new IgniteJmsExceptionListener());

            // build the JMS objects
            if (destinationType == Queue.class) {
                initializeJmsObjectsForQueue();
            }
            else {
                initializeJmsObjectsForTopic();
            }

            stopped = false;

            // start the JMS connection
            connection.start();

            // set up the scheduler service for committing batches
            if (batched && batchClosureMillis > 0) {
                scheduler = Executors.newScheduledThreadPool(1);
                scheduler.schedule(new Runnable() {
                    @Override public void run() {
                        for (Session session : sessions) {
                            try {
                                session.commit();
                                if (log.isDebugEnabled()) {
                                    log.debug("Committing session from time-based batch completion [session=" +
                                        session + "]");
                                }
                            }
                            catch (JMSException ignored) {
                                log.warning("Error while committing session: from batch time-based completion " +
                                    "[session=" + session + "]");
                            }
                        }
                        for (IgniteJmsMessageListener ml : listeners) {
                            ml.resetBatchCounter();
                        }
                    }
                }, batchClosureMillis, TimeUnit.MILLISECONDS);
            }

        }
        catch (Throwable t) {
            throw new IgniteException("Exception while initializing JmsStreamer", t);
        }

    }