public void stop()

in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java [136:197]


    public void stop(AsyncResult request) {
        Receiver receiver = getEndpoint();
        if (receiver.getRemoteCredit() <= 0) {
            if (receiver.getQueued() == 0) {
                // We have no remote credit and all the deliveries have been processed.
                request.onSuccess();
            } else {
                // There are still deliveries to process, wait for them to be.
                if (getDrainTimeout() > 0) {
                    // If the remote doesn't respond we will close the consumer and break any
                    // blocked receive or stop calls that are waiting, unless the consumer is
                    // a participant in a transaction in which case we will just fail the request
                    // and leave the consumer open since the TX needs it to remain active.
                    final ScheduledFuture<?> future = getSession().schedule(() -> {
                        LOG.trace("Consumer {} stop timed out awaiting message processing", getConsumerId());
                        ProviderException cause = new ProviderOperationTimedOutException("Consumer stop timed out awaiting message processing");
                        if (session.isTransacted() && session.getTransactionContext().isInTransaction(getConsumerId())) {
                            stopRequest.onFailure(cause);
                            stopRequest = null;
                        } else {
                            closeResource(session.getProvider(), cause, false);
                            session.getProvider().pumpToProtonTransport();
                        }
                    }, getDrainTimeout());

                    stopRequest = new ScheduledRequest(future, request);
                } else {
                    stopRequest = request;
                }

                LOG.trace("Consumer {} stop awaiting queued delivery processing", getConsumerId());
            }
        } else {
            // TODO: We don't actually want the additional messages that could be sent while
            // draining. We could explicitly reduce credit first, or possibly use 'echo' instead
            // of drain if it was supported. We would first need to understand what happens
            // if we reduce credit below the number of messages already in-flight before
            // the peer sees the update.
            stopRequest = request;
            receiver.drain(0);

            if (getDrainTimeout() > 0) {
                // If the remote doesn't respond we will close the consumer and break any
                // blocked receive or stop calls that are waiting, unless the consumer is
                // a participant in a transaction in which case we will just fail the request
                // and leave the consumer open since the TX needs it to remain active.
                final ScheduledFuture<?> future = getSession().schedule(() -> {
                    LOG.trace("Consumer {} drain request timed out", getConsumerId());
                    ProviderException cause = new ProviderOperationTimedOutException("Remote did not respond to a drain request in time");
                    if (session.isTransacted() && session.getTransactionContext().isInTransaction(getConsumerId())) {
                        stopRequest.onFailure(cause);
                        stopRequest = null;
                    } else {
                        closeResource(session.getProvider(), cause, false);
                        session.getProvider().pumpToProtonTransport();
                    }
                }, getDrainTimeout());

                stopRequest = new ScheduledRequest(future, stopRequest);
            }
        }
    }