bool Output::handle_events()

in Output.cpp [341:414]


bool Output::handle_events(bool checkOpen) {
    _queue->Rollback(_cursor_handle);

    if (_ack_mode) {
        _ack_queue->Init(_queue, _cursor_handle);
        _ack_reader->Init(_event_writer, _writer, _ack_queue);
        _ack_reader->Start();
    }

    while(!IsStopping() && (!checkOpen || _writer->IsOpen())) {
        std::pair<std::shared_ptr<QueueItem>,bool> get_ret;
        do {
            get_ret = _queue->Get(_cursor_handle, 100, !_ack_mode);
        } while((!get_ret.first && !get_ret.second) && (!checkOpen || _writer->IsOpen()));

        if (get_ret.first && (!checkOpen || _writer->IsOpen()) && !IsStopping()) {
            Event event(get_ret.first->Data(), get_ret.first->Size());
            bool filtered = _event_filter && _event_filter->IsEventFiltered(event);
            if (!filtered) {
                if (_ack_mode) {
                    // Avoid racing with receiver, add ack before sending event
                    if (!_ack_queue->Add(EventId(event.Seconds(), event.Milliseconds(), event.Serial()),
                                         get_ret.first->Priority(), get_ret.first->Sequence(),
                                         _ack_timeout)) {
                        if (!_ack_queue->IsClosed()) {
                            Logger::Error("Output(%s): Timeout waiting for Acks", _name.c_str());
                        }
                        break;
                    }
                }

                auto ret = _event_writer->WriteEvent(event, _writer.get());
                if (ret == IEventWriter::NOOP) {
                    if (_ack_mode) {
                        // The event was not sent, so remove it's ack
                        _ack_queue->Remove(EventId(event.Seconds(), event.Milliseconds(), event.Serial()));
                        // And update the auto cursor
                        _ack_queue->SetAutoCursor(get_ret.first->Priority(), get_ret.first->Sequence());
                    }
                } else if (ret != IWriter::OK) {
                    break;
                }

                if (!_ack_mode) {
                    _queue->Commit(_cursor_handle, get_ret.first->Priority(), get_ret.first->Sequence());
                }
            } else {
                if (_ack_mode) {
                    _ack_queue->SetAutoCursor(get_ret.first->Priority(), get_ret.first->Sequence());
                } else {
                    _queue->Commit(_cursor_handle, get_ret.first->Priority(), get_ret.first->Sequence());
                }
            }
        }
    }

    if (_ack_mode) {
        // Wait a short time for final acks to arrive
        _ack_queue->Wait(100);
    }

    // writer must be closed before calling _ack_reader->Stop(), or the stop may hang until the connection is closed remotely.
    _writer->Close();

    if (_ack_mode) {
        _ack_reader->Stop();
    }

    if (!IsStopping()) {
        Logger::Info("Output(%s): Connection lost", _name.c_str());
    }

    return !IsStopping();
}