FlowError SingleSwiftProcessedDocProducer::produce()

in aios/apps/facility/build_service/build_service/workflow/SingleSwiftProcessedDocProducer.cpp [275:432]


FlowError SingleSwiftProcessedDocProducer::produce(document::ProcessedDocumentVecPtr& docVec)
{
    Message message;
    int64_t timestamp;
    ErrorCode ec = ERROR_NONE;

    swift::protocol::ReaderProgress readerProgress;
    {
        ScopeLatencyReporter reporter(_readLatencyMetric.get());
        ec = _swiftParam.reader->read(timestamp, message);
        if (ec == ERROR_NONE || ec == ERROR_CLIENT_NO_MORE_MESSAGE) {
            if (_swiftParam.reader->getReaderProgress(readerProgress) != ERROR_NONE) {
                // message already readed, if return FE_RETRY or similiar, message will lost
                return FE_FATAL;
            }
        }
    }
    _lastReadTs.store(timestamp, std::memory_order_relaxed);

    static const std::string emptyDocSource("");
    if (ec == ERROR_CLIENT_EXCEED_TIME_STAMP_LIMIT) {
        BS_PREFIX_LOG(INFO,
                      "swift read exceed the limited timestamp, "
                      "last message msgId[%ld] uint16Payload[%u] uint8Payload[%u]",
                      _lastMessageId, _lastMessageUint16Payload, _lastMessageUint8Payload);
        reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, /* not report fast queue delay */ false);
        BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, "swift read exceed limit timestamp, reach eof.");
        return FE_EOF;
    } else if (ec == ERROR_SEALED_TOPIC_READ_FINISH) {
        BS_PREFIX_LOG(INFO,
                      "swift topic sealed, "
                      "last message msgId[%ld] uint16Payload[%u] uint8Payload[%u]",
                      _lastMessageId, _lastMessageUint16Payload, _lastMessageUint8Payload);
        reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, /* not report fast queue delay */ false);
        BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, "swift topic sealed.");
        return FE_SEALED;
    } else if (ec != ERROR_NONE) {
        if (ec != ERROR_CLIENT_NO_MORE_MESSAGE) {
            string errorMsg = "read from swift fail, errorCode[" + swift::protocol::ErrorCode_Name(ec) + "]";
            BS_PREFIX_LOG(WARN, "%s", errorMsg.c_str());
            BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
            return FE_RETRY;
        }

        int64_t noMoreMsgBeginTs = _noMoreMsgBeginTs.load(std::memory_order_relaxed);
        int64_t currentTs = TimeUtility::currentTime();
        if (noMoreMsgBeginTs == -1) {
            _noMoreMsgBeginTs.store(currentTs, std::memory_order_relaxed);
        } else {
            int64_t gapTsInSeconds = (currentTs - noMoreMsgBeginTs) / 1000000;
            if (gapTsInSeconds > 60) {
                static int64_t logTs;
                if (currentTs - logTs > 300000000) { // 300s = 5min
                    logTs = currentTs;
                    BS_LOG(INFO,
                           "[%s] read from swift return SWIFT_CLIENT_NO_MORE_MSG"
                           " last over [%ld] seconds, lastReadTs [%ld]",
                           LOG_PREFIX, gapTsInSeconds, timestamp);
                    BEEPER_FORMAT_REPORT_WITHOUT_TAGS(WORKER_ERROR_COLLECTOR_NAME,
                                                      "[%s] read from swift return SWIFT_CLIENT_NO_MORE_MSG"
                                                      " last over [%ld] seconds, lastReadTs [%ld]",
                                                      LOG_PREFIX, gapTsInSeconds, timestamp);
                }
            }
        }
        reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, /* not report fast queue delay */ false);
        int64_t currentTime = autil::TimeUtility::currentTimeInSeconds();
        if (_ckpDocReportInterval >= 0 && currentTime - _ckpDocReportTime >= _ckpDocReportInterval) {
            BS_PREFIX_LOG(DEBUG, "Create CHECKPOINT_DOC, locator: src[%lu], offset[%ld].", _sourceSignature, timestamp);
            _ckpDocReportTime = currentTime;
            auto [success, progress] =
                util::LocatorUtil::convertSwiftProgress(readerProgress, _swiftParam.isMultiTopic);
            if (!success) {
                AUTIL_LOG(ERROR, "convert swift progress failed [%s]", readerProgress.ShortDebugString().c_str());
                return FE_FATAL;
            }
            docVec.reset(createSkipProcessedDocument(progress));
            return FE_OK;
            // no message more than 60s, report processor checkpoint use current time
        }
        return FE_WAIT;
    }
    _noMoreMsgBeginTs.store(-1, std::memory_order_relaxed);
    _lastValidReadTs.store(timestamp, std::memory_order_relaxed);

    if (unlikely(-1 == _lastMessageId)) {
        BS_PREFIX_LOG(INFO, "read swift from msgId[%ld] uint16Payload[%u] uint8Payload[%u]", message.msgid(),
                      message.uint16payload(), message.uint8maskpayload());
        string msg = "read swift from msgId[" + StringUtil::toString(message.msgid()) + "] uint16Payload[" +
                     StringUtil::toString(message.uint16payload()) + "] uint8Payload[" +
                     StringUtil::toString(message.uint8maskpayload()) + "]";
        BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, msg);
    }
    _lastMessageUint16Payload = message.uint16payload();
    _lastMessageUint8Payload = message.uint8maskpayload();
    _lastMessageId = message.msgid();
    bool handleProcessedDocSuccess = false;
    try {
        auto [success, progress] = util::LocatorUtil::convertSwiftProgress(readerProgress, _swiftParam.isMultiTopic);
        if (!success) {
            AUTIL_LOG(ERROR, "convert swift progress failed [%s]", readerProgress.ShortDebugString().c_str());
            return FE_FATAL;
        }
        if (_swiftParam.disableSwiftMaskFilter && _swiftParam.maskFilterPairs.size() == 1) {
            // disableSwiftMaskFilter=true will filter invalid msg by bs producer
            auto mask = _swiftParam.maskFilterPairs[0].first;
            auto result = _swiftParam.maskFilterPairs[0].second;
            if ((_lastMessageUint8Payload & mask) != result) {
                BS_PREFIX_LOG(
                    DEBUG,
                    "msg filtered by mask [%u], will Create CHECKPOINT_DOC, locator: src[%lu], msg timestamp[%ld].",
                    _lastMessageUint8Payload, _sourceSignature, message.timestamp());
                docVec.reset(createSkipProcessedDocument(progress));
                handleProcessedDocSuccess = true;
                return FE_OK;
            }
        }
        docVec.reset(createProcessedDocument(message.data(), message.timestamp(), timestamp, _lastMessageUint16Payload,
                                             progress));
        const DocumentPtr& document = std::dynamic_pointer_cast<Document>((*docVec)[0]->getDocument());
        if (_hashIdRewriter) {
            document->AddTag(DOCUMENT_HASHID_TAG_KEY, StringUtil::toString(message.uint16payload()));
            _hashIdRewriter->Rewrite(document);
        }
        if (_lastMessageUint8Payload == ProcessedDocument::SWIFT_FILTER_BIT_FASTQUEUE ||
            _lastMessageUint8Payload ==
                (ProcessedDocument::SWIFT_FILTER_BIT_FASTQUEUE | ProcessedDocument::SWIFT_FILTER_BIT_REALTIME)) {
            BS_PREFIX_LOG(DEBUG, "pk[%s] payload[%d] read from fast queue", document->GetPrimaryKey().c_str(),
                          _lastMessageUint8Payload);
        }
        PkTracer::fromSwiftTrace(document->GetPrimaryKey(), document->GetLocator().ToString(), message.msgid());
        IE_INDEX_DOC_TRACE(document, "read processed doc from swift");
        handleProcessedDocSuccess = true;
    } catch (const autil::legacy::ExceptionBase& e) {
        stringstream ss;
        ss << "swift message msgId[" << message.msgid() << "] timestamp[" << message.timestamp() << "] uint16Payload["
           << message.uint16payload() << "] uint8MaskPayload[" << message.uint8maskpayload() << "]";
        string errorMsg = ss.str();
        BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
        errorMsg = "create processed document failed [" + string(e.what()) + "]";
        BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
        BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
    } catch (...) {
        stringstream ss;
        ss << "swift message msgId[" << message.msgid() << "] timestamp[" << message.timestamp() << "] uint16Payload["
           << message.uint16payload() << "] uint8MaskPayload[" << message.uint8maskpayload() << "]";
        string errorMsg = ss.str();
        BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
        errorMsg = "create processed document failed";
        BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
        BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
    }
    if (_linkReporter) {
        _linkReporter->collectSwiftMessageMeta(message);
        _linkReporter->collectSwiftMessageOffset(timestamp);
    }
    return _exceptionHandler.transferProcessResult(handleProcessedDocSuccess);
}