in activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp [398:545]
void FailoverTransport::oneway(const Pointer<Command> command) {
Pointer<Exception> error;
try {
synchronized(&this->impl->reconnectMutex) {
if (command != NULL && this->impl->connectedTransport == NULL) {
if (command->isShutdownInfo()) {
// Skipping send of ShutdownInfo command when not connected.
return;
}
if (command->isRemoveInfo() || command->isMessageAck()) {
// Simulate response to RemoveInfo command or Ack as they will be stale.
stateTracker.track(command);
if (command->isResponseRequired()) {
Pointer<Response> response(new Response());
response->setCorrelationId(command->getCommandId());
this->impl->myTransportListener->onCommand(response);
}
return;
} else if (command->isMessagePull()) {
// Simulate response to MessagePull if timed as we can't honor that now.
Pointer<MessagePull> pullRequest = command.dynamicCast<MessagePull>();
if (pullRequest->getTimeout() != 0) {
Pointer<MessageDispatch> dispatch(new MessageDispatch());
dispatch->setConsumerId(pullRequest->getConsumerId());
dispatch->setDestination(pullRequest->getDestination());
this->impl->myTransportListener->onCommand(dispatch);
}
return;
}
}
// Keep trying until the message is sent.
for (int i = 0; !this->impl->closed; i++) {
try {
// Wait for transport to be connected.
Pointer<Transport> transport = this->impl->connectedTransport;
long long start = System::currentTimeMillis();
bool timedout = false;
while (transport == NULL && !this->impl->closed &&
this->impl->connectionFailure == NULL && this->impl->willReconnect()) {
long long end = System::currentTimeMillis();
if (command->isMessage() && this->impl->timeout > 0 && (end - start > this->impl->timeout)) {
timedout = true;
break;
}
this->impl->reconnectMutex.wait(100);
transport = this->impl->connectedTransport;
}
if (transport == NULL) {
// Previous loop may have exited due to us being disposed.
if (this->impl->closed) {
error.reset(new IOException(__FILE__, __LINE__, "Transport disposed."));
} else if (this->impl->connectionFailure != NULL) {
error = this->impl->connectionFailure;
} else if (timedout == true) {
error.reset(new IOException(__FILE__, __LINE__,
"Failover timeout of %d ms reached.", this->impl->timeout));
} else if (!this->impl->willReconnect()) {
error.reset(new IOException(__FILE__, __LINE__,
"Maximum reconnection attempts exceeded"));
} else {
error.reset(new IOException(__FILE__, __LINE__, "Unexpected failure."));
}
break;
}
// If it was a request and it was not being tracked by the state
// tracker, then hold it in the requestMap so that we can replay
// it later.
Pointer<Tracked> tracked;
try {
tracked = stateTracker.track(command);
synchronized(&this->impl->requestMap) {
if (tracked != NULL && tracked->isWaitingForResponse()) {
this->impl->requestMap.put(command->getCommandId(), tracked);
} else if (tracked == NULL && command->isResponseRequired()) {
this->impl->requestMap.put(command->getCommandId(), command);
}
}
} catch (Exception& ex) {
ex.setMark(__FILE__, __LINE__);
error.reset(ex.clone());
break;
}
// Send the message.
try {
transport->oneway(command);
stateTracker.trackBack(command);
if (command->isShutdownInfo()) {
this->impl->shutdown = true;
}
} catch (IOException& e) {
e.setMark(__FILE__, __LINE__);
// If the command was not tracked.. we will retry in this method
if (tracked == NULL && this->impl->canReconnect()) {
// since we will retry in this method.. take it out of the
// request map so that it is not sent 2 times on recovery
if (command->isResponseRequired()) {
this->impl->requestMap.remove(command->getCommandId());
}
// re-throw the exception so it will handled by the outer catch
throw;
} else {
// Trigger the reconnect since we can't count on inactivity or
// other socket events to trip the failover condition.
handleTransportFailure(e);
}
}
return;
} catch (IOException& e) {
e.setMark(__FILE__, __LINE__);
handleTransportFailure(e);
}
}
}
} catch (InterruptedException& ex) {
Thread::currentThread()->interrupt();
throw InterruptedIOException(__FILE__, __LINE__, "FailoverTransport oneway() interrupted");
}
AMQ_CATCHALL_NOTHROW()
if (!this->impl->closed) {
if (error != NULL) {
throw IOException(*error);
}
}
}