in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java [69:128]
public void begin(final JmsTransactionId txId, final AsyncResult request) throws ProviderException {
if (current != null) {
throw new ProviderIllegalStateException("Begin called while a TX is still Active.");
}
final AsyncResult declareCompletion = new AsyncResult() {
@Override
public void onSuccess() {
current = txId;
cachedAcceptedState = new TransactionalState();
cachedAcceptedState.setOutcome(Accepted.getInstance());
cachedAcceptedState.setTxnId(getAmqpTransactionId());
cachedTransactedState = new TransactionalState();
cachedTransactedState.setTxnId(getAmqpTransactionId());
request.onSuccess();
}
@Override
public void onFailure(ProviderException result) {
current = null;
cachedAcceptedState = null;
cachedTransactedState = null;
request.onFailure(result);
}
@Override
public boolean isComplete() {
return current != null;
}
};
if (coordinator == null || coordinator.isClosed()) {
AmqpTransactionCoordinatorBuilder builder =
new AmqpTransactionCoordinatorBuilder(this, session.getResourceInfo());
builder.buildResource(new AsyncResult() {
@Override
public void onSuccess() {
try {
coordinator.declare(txId, declareCompletion);
} catch (ProviderException e) {
request.onFailure(e);
}
}
@Override
public void onFailure(ProviderException result) {
request.onFailure(result);
}
@Override
public boolean isComplete() {
return request.isComplete();
}
});
} else {
coordinator.declare(txId, declareCompletion);
}
}