in modules/jms11-ext/src/main/java/org/apache/ignite/stream/jms11/JmsStreamer.java [166:268]
public void start() throws IgniteException {
if (!stopped)
throw new IgniteException("Attempted to start an already started JMS Streamer");
try {
A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite");
log = getIgnite().log();
A.notNull(transformer, "message transformer");
A.notNull(connectionFactory, "connection factory");
A.ensure(threads > 0, "threads > 0");
// handle batched && transacted parameter interaction
if (batched && !transacted) {
log.warning("Starting a Batched JMS Streamer without transacted flag = true. Setting it automatically.");
transacted = true;
}
// handle batch completion criteria
if (batched) {
A.ensure(batchClosureMillis > 0 || batchClosureSize > 0, "at least one of batch closure size or " +
"batch closure frequency must be specified when using batch consumption");
}
// check the parameters needed for durable subscriptions, if enabled
if (durableSubscription) {
A.notNullOrEmpty(clientId, "client id is compulsory when using durable subscriptions");
A.notNullOrEmpty(durableSubscriptionName, "durable subscription name is compulsory when using " +
"durable subscriptions");
}
// validate the destination; if we have an explicit destination, make sure it's of type Queue or Topic;
// else make sure that the destinationName and the destinationType are valid
if (destination == null) {
A.notNull(destinationType, "destination type");
A.ensure(destinationType.isAssignableFrom(Queue.class) || destinationType.isAssignableFrom(Topic.class),
"this streamer can only handle Queues or Topics.");
A.notNullOrEmpty(destinationName, "destination or destination name");
}
else if (destination instanceof Queue) {
destinationType = Queue.class;
}
else if (destination instanceof Topic) {
destinationType = Topic.class;
}
else {
throw new IllegalArgumentException("Invalid destination object. Can only handle Queues or Topics.");
}
// create a new connection and the client iD if relevant.
connection = connectionFactory.createConnection();
if (clientId != null && clientId.trim().length() > 0) {
connection.setClientID(clientId.trim());
}
connection.setExceptionListener(new IgniteJmsExceptionListener());
// build the JMS objects
if (destinationType == Queue.class) {
initializeJmsObjectsForQueue();
}
else {
initializeJmsObjectsForTopic();
}
stopped = false;
// start the JMS connection
connection.start();
// set up the scheduler service for committing batches
if (batched && batchClosureMillis > 0) {
scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(new Runnable() {
@Override public void run() {
for (Session ses : sessions) {
try {
ses.commit();
if (log.isDebugEnabled()) {
log.debug("Committing session from time-based batch completion [ses=" +
ses + "]");
}
}
catch (JMSException ignored) {
log.warning("Error while committing session: from batch time-based completion " +
"[ses=" + ses + "]");
}
}
for (IgniteJmsMessageListener ml : listeners) {
ml.resetBatchCounter();
}
}
}, batchClosureMillis, TimeUnit.MILLISECONDS);
}
}
catch (Throwable t) {
throw new IgniteException("Exception while initializing JmsStreamer", t);
}
}