FlowError SingleSwiftProcessedDocProducerV2::produce()

in aios/apps/facility/build_service/build_service/workflow/SingleSwiftProcessedDocProducerV2.cpp [291:509]


FlowError SingleSwiftProcessedDocProducerV2::produce(document::ProcessedDocumentVecPtr& docVec)
{
    Message message;
    int64_t timestamp;
    ErrorCode ec = ERROR_NONE;

    swift::protocol::ReaderProgress readerProgress;
    {
        ScopeLatencyReporter reporter(_readLatencyMetric.get());
        ec = _swiftParam.reader->read(timestamp, message);
        if (ec == ERROR_NONE || ec == ERROR_CLIENT_NO_MORE_MESSAGE) {
            if (_swiftParam.reader->getReaderProgress(readerProgress) != ERROR_NONE) {
                // message already readed, if return FE_RETRY or similiar, message will lost
                return FE_FATAL;
            }
        }
    }
    _lastReadTs.store(timestamp, std::memory_order_relaxed);

    static const std::string emptyDocSource("");
    if (ec == ERROR_CLIENT_EXCEED_TIME_STAMP_LIMIT) {
        BS_PREFIX_LOG(INFO,
                      "swift read exceed the limited timestamp, "
                      "last message msgId[%ld] uint16Payload[%u] uint8Payload[%u]",
                      _lastMessageId, _lastMessageUint16Payload, _lastMessageUint8Payload);
        reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, _swiftParam.isMultiTopic);
        BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, "swift read exceed limit timestamp, reach eof.");
        if (_alterTableSchemaId != indexlib::INVALID_SCHEMAID) {
            _needAlterTable = true;
        }
        return FE_EOF;
    } else if (ec == ERROR_SEALED_TOPIC_READ_FINISH) {
        BS_PREFIX_LOG(INFO,
                      "swift topic sealed, "
                      "last message msgId[%ld] uint16Payload[%u] uint8Payload[%u]",
                      _lastMessageId, _lastMessageUint16Payload, _lastMessageUint8Payload);
        reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, _swiftParam.isMultiTopic);
        BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, "swift topic sealed.");
        return FE_SEALED;
    } else if (ec != ERROR_NONE) {
        if (ec != ERROR_CLIENT_NO_MORE_MESSAGE) {
            string errorMsg = "read from swift fail, errorCode[" + swift::protocol::ErrorCode_Name(ec) + "]";
            BS_PREFIX_LOG(WARN, "%s", errorMsg.c_str());
            BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
            return FE_RETRY;
        }

        int64_t noMoreMsgBeginTs = _noMoreMsgBeginTs.load(std::memory_order_relaxed);
        int64_t currentTs = TimeUtility::currentTime();
        if (noMoreMsgBeginTs == -1) {
            _noMoreMsgBeginTs.store(currentTs, std::memory_order_relaxed);
        } else {
            int64_t gapTsInSeconds = (currentTs - noMoreMsgBeginTs) / 1000000;
            if (gapTsInSeconds > 60) {
                static int64_t logTs;
                if (currentTs - logTs > 300000000) { // 300s = 5min
                    logTs = currentTs;
                    BS_LOG(INFO,
                           "[%s] read from swift return SWIFT_CLIENT_NO_MORE_MSG"
                           " last over [%ld] seconds, lastReadTs [%ld]",
                           LOG_PREFIX, gapTsInSeconds, timestamp);
                    BEEPER_FORMAT_REPORT_WITHOUT_TAGS(WORKER_ERROR_COLLECTOR_NAME,
                                                      "[%s] read from swift return SWIFT_CLIENT_NO_MORE_MSG"
                                                      " last over [%ld] seconds, lastReadTs [%ld]",
                                                      LOG_PREFIX, gapTsInSeconds, timestamp);
                }
            }
        }
        reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, _swiftParam.isMultiTopic);
        int64_t currentTime = autil::TimeUtility::currentTimeInSeconds();
        if (_ckpDocReportInterval >= 0 && currentTime - _ckpDocReportTime >= _ckpDocReportInterval) {
            BS_PREFIX_LOG(DEBUG, "Create CHECKPOINT_DOC, locator: src[%lu], offset[%ld].", _sourceSignature, timestamp);
            _ckpDocReportTime = currentTime;
            uint8_t sourceIdx = 0;
            if (_swiftParam.isMultiTopic) {
                for (auto [mask, result] : _swiftParam.maskFilterPairs) {
                    if (((uint8_t)message.uint8maskpayload() & mask) == result) {
                        break;
                    }
                    sourceIdx++;
                }
            }
            indexlibv2::framework::Locator::DocInfo docInfo(
                /* hashId */ message.uint16payload(), /* timestamp */ message.timestamp(),
                /* concurrentIdx*/ message.offsetinrawmsg(), /*sourceIdx*/ sourceIdx);
            if (_locatorKeeper->needSkip(docInfo) || _alterTableSchemaId != indexlib::INVALID_SCHEMAID) {
                BS_INTERVAL_LOG2(10, WARN, "filter message payload [%u] timestamp [%ld] progress [%s]",
                                 message.uint16payload(), message.timestamp(),
                                 readerProgress.ShortDebugString().c_str());
                return FE_SKIP;
            }
            if (!_locatorKeeper->update(readerProgress, docInfo)) {
                return FE_FATAL;
            }
            docVec.reset(createSkipProcessedDocument(_locatorKeeper->getLocator().GetMultiProgress()));
            return FE_OK;
            // no message more than 60s, report processor checkpoint use current time
        }
        return FE_WAIT;
    }
    _noMoreMsgBeginTs.store(-1, std::memory_order_relaxed);
    _lastValidReadTs.store(timestamp, std::memory_order_relaxed);

    if (unlikely(-1 == _lastMessageId)) {
        BS_PREFIX_LOG(INFO, "read swift from msgId[%ld] uint16Payload[%u] uint8Payload[%u]", message.msgid(),
                      message.uint16payload(), message.uint8maskpayload());
        string msg = "read swift from msgId[" + StringUtil::toString(message.msgid()) + "] uint16Payload[" +
                     StringUtil::toString(message.uint16payload()) + "] uint8Payload[" +
                     StringUtil::toString(message.uint8maskpayload()) + "]";
        BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, msg);
    }
    _lastMessageUint16Payload = message.uint16payload();
    _lastMessageUint8Payload = message.uint8maskpayload();
    _lastMessageId = message.msgid();
    bool handleProcessedDocSuccess = false;
    try {
        uint8_t sourceIdx = 0;
        if (_swiftParam.isMultiTopic) {
            for (auto [mask, result] : _swiftParam.maskFilterPairs) {
                if (((uint8_t)message.uint8maskpayload() & mask) == result) {
                    break;
                }
                sourceIdx++;
            }
        }
        indexlibv2::framework::Locator::DocInfo docInfo(
            /* hashId */ message.uint16payload(), /* timestamp */ message.timestamp(),
            /* concurrentIdx*/ message.offsetinrawmsg(), /*sourceIdx*/ sourceIdx);
        if (_locatorKeeper->needSkip(docInfo)) {
            BS_INTERVAL_LOG2(10, WARN, "filter message payload [%u] timestamp [%ld] progress [%s]",
                             message.uint16payload(), message.timestamp(), readerProgress.ShortDebugString().c_str());
            return FE_SKIP;
        }
        if (_swiftParam.disableSwiftMaskFilter && _swiftParam.maskFilterPairs.size() == 1) {
            // disableSwiftMaskFilter=true will filter invalid msg by bs producer
            auto mask = _swiftParam.maskFilterPairs[0].first;
            auto result = _swiftParam.maskFilterPairs[0].second;
            if ((_lastMessageUint8Payload & mask) != result) {
                BS_PREFIX_LOG(
                    DEBUG,
                    "msg filtered by mask [%u], will Create CHECKPOINT_DOC, locator: src[%lu], msg timestamp[%ld].",
                    _lastMessageUint8Payload, _sourceSignature, message.timestamp());
                if (!_locatorKeeper->update(readerProgress, docInfo)) {
                    return FE_FATAL;
                }
                docVec.reset(createSkipProcessedDocument(_locatorKeeper->getLocator().GetMultiProgress()));
                return FE_OK;
            }
        }
        docVec.reset(createProcessedDocument(message.data(), docInfo, timestamp,
                                             _locatorKeeper->getLocator().GetMultiProgress()));
        if (!docVec) {
            BS_PREFIX_LOG(ERROR, "create processed document failed");
            return _exceptionHandler.transferProcessResult(handleProcessedDocSuccess);
        }

        const std::shared_ptr<indexlibv2::document::IDocument>& document = (*docVec)[0]->getDocument();
        auto schemaId = document->GetSchemaId();
        if (schemaId > _schema->GetSchemaId()) {
            auto tmpSchemaId = _alterTableSchemaId.load(std::memory_order_relaxed);
            if (_alterTableSchemaId == indexlib::INVALID_SCHEMAID) {
                _alterTableSchemaId = schemaId;
            } else {
                _alterTableSchemaId = std::min(_alterTableSchemaId.load(std::memory_order_relaxed), schemaId);
            }
            if (tmpSchemaId != _alterTableSchemaId.load(std::memory_order_relaxed)) {
                BS_LOG(INFO, "next alter table schema change [%d]",
                       _alterTableSchemaId.load(std::memory_order_relaxed));
            }
            if (_stopTimestamp == std::numeric_limits<int64_t>::max()) {
                suspendReadAtTimestamp(message.timestamp(), common::ETA_STOP);
            }
            _locatorKeeper->updateSchemaChangeDocInfo(docInfo);
            return FE_SKIP;
        }
        if (!_locatorKeeper->update(readerProgress, docInfo)) {
            return FE_FATAL;
        }
        (*docVec)[0]->setLocator(_locatorKeeper->getLocator());
        std::string pk;
        const auto& normalDoc = std::dynamic_pointer_cast<indexlibv2::document::NormalDocument>(document);
        if (normalDoc) {
            pk = normalDoc->GetPrimaryKey();
        }
        if (_lastMessageUint8Payload == ProcessedDocument::SWIFT_FILTER_BIT_FASTQUEUE ||
            _lastMessageUint8Payload ==
                (ProcessedDocument::SWIFT_FILTER_BIT_FASTQUEUE | ProcessedDocument::SWIFT_FILTER_BIT_REALTIME)) {
            BS_PREFIX_LOG(INFO, "pk[%s] payload[%d] read from fast queue, locator[%s]", pk.c_str(),
                          _lastMessageUint8Payload, document->GetLocatorV2().DebugString().c_str());
        }
        PkTracer::fromSwiftTrace(pk, document->GetLocatorV2().DebugString(), message.msgid());
        IE_DOC_TRACE(document, "read processed doc from swift");
        handleProcessedDocSuccess = true;
    } catch (const autil::legacy::ExceptionBase& e) {
        stringstream ss;
        ss << "swift message msgId[" << message.msgid() << "] timestamp[" << message.timestamp() << "] uint16Payload["
           << message.uint16payload() << "] uint8MaskPayload[" << message.uint8maskpayload() << "]";
        string errorMsg = ss.str();
        BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
        errorMsg = "create processed document failed [" + string(e.what()) + "]";
        BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
        BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
    } catch (...) {
        stringstream ss;
        ss << "swift message msgId[" << message.msgid() << "] timestamp[" << message.timestamp() << "] uint16Payload["
           << message.uint16payload() << "] uint8MaskPayload[" << message.uint8maskpayload() << "]";
        string errorMsg = ss.str();
        BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
        errorMsg = "create processed document failed";
        BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
        BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
    }

    if (_linkReporter) {
        _linkReporter->collectSwiftMessageMeta(message);
        _linkReporter->collectSwiftMessageOffset(timestamp);
    }
    return _exceptionHandler.transferProcessResult(handleProcessedDocSuccess);
}