in Output.cpp [341:414]
bool Output::handle_events(bool checkOpen) {
_queue->Rollback(_cursor_handle);
if (_ack_mode) {
_ack_queue->Init(_queue, _cursor_handle);
_ack_reader->Init(_event_writer, _writer, _ack_queue);
_ack_reader->Start();
}
while(!IsStopping() && (!checkOpen || _writer->IsOpen())) {
std::pair<std::shared_ptr<QueueItem>,bool> get_ret;
do {
get_ret = _queue->Get(_cursor_handle, 100, !_ack_mode);
} while((!get_ret.first && !get_ret.second) && (!checkOpen || _writer->IsOpen()));
if (get_ret.first && (!checkOpen || _writer->IsOpen()) && !IsStopping()) {
Event event(get_ret.first->Data(), get_ret.first->Size());
bool filtered = _event_filter && _event_filter->IsEventFiltered(event);
if (!filtered) {
if (_ack_mode) {
// Avoid racing with receiver, add ack before sending event
if (!_ack_queue->Add(EventId(event.Seconds(), event.Milliseconds(), event.Serial()),
get_ret.first->Priority(), get_ret.first->Sequence(),
_ack_timeout)) {
if (!_ack_queue->IsClosed()) {
Logger::Error("Output(%s): Timeout waiting for Acks", _name.c_str());
}
break;
}
}
auto ret = _event_writer->WriteEvent(event, _writer.get());
if (ret == IEventWriter::NOOP) {
if (_ack_mode) {
// The event was not sent, so remove it's ack
_ack_queue->Remove(EventId(event.Seconds(), event.Milliseconds(), event.Serial()));
// And update the auto cursor
_ack_queue->SetAutoCursor(get_ret.first->Priority(), get_ret.first->Sequence());
}
} else if (ret != IWriter::OK) {
break;
}
if (!_ack_mode) {
_queue->Commit(_cursor_handle, get_ret.first->Priority(), get_ret.first->Sequence());
}
} else {
if (_ack_mode) {
_ack_queue->SetAutoCursor(get_ret.first->Priority(), get_ret.first->Sequence());
} else {
_queue->Commit(_cursor_handle, get_ret.first->Priority(), get_ret.first->Sequence());
}
}
}
}
if (_ack_mode) {
// Wait a short time for final acks to arrive
_ack_queue->Wait(100);
}
// writer must be closed before calling _ack_reader->Stop(), or the stop may hang until the connection is closed remotely.
_writer->Close();
if (_ack_mode) {
_ack_reader->Stop();
}
if (!IsStopping()) {
Logger::Info("Output(%s): Connection lost", _name.c_str());
}
return !IsStopping();
}