in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java [136:197]
public void stop(AsyncResult request) {
Receiver receiver = getEndpoint();
if (receiver.getRemoteCredit() <= 0) {
if (receiver.getQueued() == 0) {
// We have no remote credit and all the deliveries have been processed.
request.onSuccess();
} else {
// There are still deliveries to process, wait for them to be.
if (getDrainTimeout() > 0) {
// If the remote doesn't respond we will close the consumer and break any
// blocked receive or stop calls that are waiting, unless the consumer is
// a participant in a transaction in which case we will just fail the request
// and leave the consumer open since the TX needs it to remain active.
final ScheduledFuture<?> future = getSession().schedule(() -> {
LOG.trace("Consumer {} stop timed out awaiting message processing", getConsumerId());
ProviderException cause = new ProviderOperationTimedOutException("Consumer stop timed out awaiting message processing");
if (session.isTransacted() && session.getTransactionContext().isInTransaction(getConsumerId())) {
stopRequest.onFailure(cause);
stopRequest = null;
} else {
closeResource(session.getProvider(), cause, false);
session.getProvider().pumpToProtonTransport();
}
}, getDrainTimeout());
stopRequest = new ScheduledRequest(future, request);
} else {
stopRequest = request;
}
LOG.trace("Consumer {} stop awaiting queued delivery processing", getConsumerId());
}
} else {
// TODO: We don't actually want the additional messages that could be sent while
// draining. We could explicitly reduce credit first, or possibly use 'echo' instead
// of drain if it was supported. We would first need to understand what happens
// if we reduce credit below the number of messages already in-flight before
// the peer sees the update.
stopRequest = request;
receiver.drain(0);
if (getDrainTimeout() > 0) {
// If the remote doesn't respond we will close the consumer and break any
// blocked receive or stop calls that are waiting, unless the consumer is
// a participant in a transaction in which case we will just fail the request
// and leave the consumer open since the TX needs it to remain active.
final ScheduledFuture<?> future = getSession().schedule(() -> {
LOG.trace("Consumer {} drain request timed out", getConsumerId());
ProviderException cause = new ProviderOperationTimedOutException("Remote did not respond to a drain request in time");
if (session.isTransacted() && session.getTransactionContext().isInTransaction(getConsumerId())) {
stopRequest.onFailure(cause);
stopRequest = null;
} else {
closeResource(session.getProvider(), cause, false);
session.getProvider().pumpToProtonTransport();
}
}, getDrainTimeout());
stopRequest = new ScheduledRequest(future, stopRequest);
}
}
}