in qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java [141:196]
protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
this.connection = connection;
this.acknowledgementMode = acknowledgementMode;
if (acknowledgementMode == SESSION_TRANSACTED) {
setTransactionContext(new JmsLocalTransactionContext(this));
} else {
setTransactionContext(new JmsNoTxTransactionContext());
}
sessionInfo = new JmsSessionInfo(sessionId);
sessionInfo.setAcknowledgementMode(acknowledgementMode);
sessionInfo.setSendAcksAsync(connection.isForceAsyncAcks());
sessionInfo.setMessageIDPolicy(connection.getMessageIDPolicy().copy());
sessionInfo.setPrefetchPolicy(connection.getPrefetchPolicy().copy());
sessionInfo.setPresettlePolicy(connection.getPresettlePolicy().copy());
sessionInfo.setRedeliveryPolicy(connection.getRedeliveryPolicy().copy());
sessionInfo.setDeserializationPolicy(connection.getDeserializationPolicy());
connection.createResource(sessionInfo, new ProviderSynchronization() {
@Override
public void onPendingSuccess() {
connection.addSession(sessionInfo, JmsSession.this);
}
@Override
public void onPendingFailure(ProviderException cause) {
}
});
// We always keep an open TX if transacted so start now.
try {
getTransactionContext().begin();
} catch (Exception e) {
// failed, close the AMQP session before we throw
try {
connection.destroyResource(sessionInfo, new ProviderSynchronization() {
@Override
public void onPendingSuccess() {
connection.removeSession(sessionInfo);
}
@Override
public void onPendingFailure(ProviderException cause) {
connection.removeSession(sessionInfo);
}
});
} catch (Exception ex) {
// Ignore, throw original error
}
throw e;
}
}