void FailoverTransport::oneway()

in activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp [398:545]


void FailoverTransport::oneway(const Pointer<Command> command) {

    Pointer<Exception> error;

    try {

        synchronized(&this->impl->reconnectMutex) {

            if (command != NULL && this->impl->connectedTransport == NULL) {

                if (command->isShutdownInfo()) {
                    // Skipping send of ShutdownInfo command when not connected.
                    return;
                }

                if (command->isRemoveInfo() || command->isMessageAck()) {
                    // Simulate response to RemoveInfo command or Ack as they will be stale.
                    stateTracker.track(command);

                    if (command->isResponseRequired()) {
                        Pointer<Response> response(new Response());
                        response->setCorrelationId(command->getCommandId());
                        this->impl->myTransportListener->onCommand(response);
                    }

                    return;
                } else if (command->isMessagePull()) {
                    // Simulate response to MessagePull if timed as we can't honor that now.
                    Pointer<MessagePull> pullRequest = command.dynamicCast<MessagePull>();
                    if (pullRequest->getTimeout() != 0) {
                        Pointer<MessageDispatch> dispatch(new MessageDispatch());
                        dispatch->setConsumerId(pullRequest->getConsumerId());
                        dispatch->setDestination(pullRequest->getDestination());
                        this->impl->myTransportListener->onCommand(dispatch);
                    }

                    return;
                }
            }

            // Keep trying until the message is sent.
            for (int i = 0; !this->impl->closed; i++) {
                try {

                    // Wait for transport to be connected.
                    Pointer<Transport> transport = this->impl->connectedTransport;
                    long long start = System::currentTimeMillis();
                    bool timedout = false;

                    while (transport == NULL && !this->impl->closed &&
                           this->impl->connectionFailure == NULL && this->impl->willReconnect()) {

                        long long end = System::currentTimeMillis();
                        if (command->isMessage() && this->impl->timeout > 0 && (end - start > this->impl->timeout)) {
                            timedout = true;
                            break;
                        }

                        this->impl->reconnectMutex.wait(100);
                        transport = this->impl->connectedTransport;
                    }

                    if (transport == NULL) {
                        // Previous loop may have exited due to us being disposed.
                        if (this->impl->closed) {
                            error.reset(new IOException(__FILE__, __LINE__, "Transport disposed."));
                        } else if (this->impl->connectionFailure != NULL) {
                            error = this->impl->connectionFailure;
                        } else if (timedout == true) {
                            error.reset(new IOException(__FILE__, __LINE__,
                                "Failover timeout of %d ms reached.", this->impl->timeout));
                        } else if (!this->impl->willReconnect()) {
                            error.reset(new IOException(__FILE__, __LINE__,
                                "Maximum reconnection attempts exceeded"));
                        } else {
                            error.reset(new IOException(__FILE__, __LINE__, "Unexpected failure."));
                        }

                        break;
                    }

                    // If it was a request and it was not being tracked by the state
                    // tracker, then hold it in the requestMap so that we can replay
                    // it later.
                    Pointer<Tracked> tracked;
                    try {
                        tracked = stateTracker.track(command);
                        synchronized(&this->impl->requestMap) {
                            if (tracked != NULL && tracked->isWaitingForResponse()) {
                                this->impl->requestMap.put(command->getCommandId(), tracked);
                            } else if (tracked == NULL && command->isResponseRequired()) {
                                this->impl->requestMap.put(command->getCommandId(), command);
                            }
                        }
                    } catch (Exception& ex) {
                        ex.setMark(__FILE__, __LINE__);
                        error.reset(ex.clone());
                        break;
                    }

                    // Send the message.
                    try {
                        transport->oneway(command);
                        stateTracker.trackBack(command);
                        if (command->isShutdownInfo()) {
                            this->impl->shutdown = true;
                        }
                    } catch (IOException& e) {

                        e.setMark(__FILE__, __LINE__);

                        // If the command was not tracked.. we will retry in this method
                        if (tracked == NULL && this->impl->canReconnect()) {

                            // since we will retry in this method.. take it out of the
                            // request map so that it is not sent 2 times on recovery
                            if (command->isResponseRequired()) {
                                this->impl->requestMap.remove(command->getCommandId());
                            }

                            // re-throw the exception so it will handled by the outer catch
                            throw;
                        } else {
                            // Trigger the reconnect since we can't count on inactivity or
                            // other socket events to trip the failover condition.
                            handleTransportFailure(e);
                        }
                    }

                    return;
                } catch (IOException& e) {
                    e.setMark(__FILE__, __LINE__);
                    handleTransportFailure(e);
                }
            }
        }
    } catch (InterruptedException& ex) {
        Thread::currentThread()->interrupt();
        throw InterruptedIOException(__FILE__, __LINE__, "FailoverTransport oneway() interrupted");
    }
    AMQ_CATCHALL_NOTHROW()

    if (!this->impl->closed) {
        if (error != NULL) {
            throw IOException(*error);
        }
    }
}