in aios/apps/facility/build_service/build_service/workflow/SingleSwiftProcessedDocProducer.cpp [275:432]
FlowError SingleSwiftProcessedDocProducer::produce(document::ProcessedDocumentVecPtr& docVec)
{
Message message;
int64_t timestamp;
ErrorCode ec = ERROR_NONE;
swift::protocol::ReaderProgress readerProgress;
{
ScopeLatencyReporter reporter(_readLatencyMetric.get());
ec = _swiftParam.reader->read(timestamp, message);
if (ec == ERROR_NONE || ec == ERROR_CLIENT_NO_MORE_MESSAGE) {
if (_swiftParam.reader->getReaderProgress(readerProgress) != ERROR_NONE) {
// message already readed, if return FE_RETRY or similiar, message will lost
return FE_FATAL;
}
}
}
_lastReadTs.store(timestamp, std::memory_order_relaxed);
static const std::string emptyDocSource("");
if (ec == ERROR_CLIENT_EXCEED_TIME_STAMP_LIMIT) {
BS_PREFIX_LOG(INFO,
"swift read exceed the limited timestamp, "
"last message msgId[%ld] uint16Payload[%u] uint8Payload[%u]",
_lastMessageId, _lastMessageUint16Payload, _lastMessageUint8Payload);
reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, /* not report fast queue delay */ false);
BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, "swift read exceed limit timestamp, reach eof.");
return FE_EOF;
} else if (ec == ERROR_SEALED_TOPIC_READ_FINISH) {
BS_PREFIX_LOG(INFO,
"swift topic sealed, "
"last message msgId[%ld] uint16Payload[%u] uint8Payload[%u]",
_lastMessageId, _lastMessageUint16Payload, _lastMessageUint8Payload);
reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, /* not report fast queue delay */ false);
BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, "swift topic sealed.");
return FE_SEALED;
} else if (ec != ERROR_NONE) {
if (ec != ERROR_CLIENT_NO_MORE_MESSAGE) {
string errorMsg = "read from swift fail, errorCode[" + swift::protocol::ErrorCode_Name(ec) + "]";
BS_PREFIX_LOG(WARN, "%s", errorMsg.c_str());
BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
return FE_RETRY;
}
int64_t noMoreMsgBeginTs = _noMoreMsgBeginTs.load(std::memory_order_relaxed);
int64_t currentTs = TimeUtility::currentTime();
if (noMoreMsgBeginTs == -1) {
_noMoreMsgBeginTs.store(currentTs, std::memory_order_relaxed);
} else {
int64_t gapTsInSeconds = (currentTs - noMoreMsgBeginTs) / 1000000;
if (gapTsInSeconds > 60) {
static int64_t logTs;
if (currentTs - logTs > 300000000) { // 300s = 5min
logTs = currentTs;
BS_LOG(INFO,
"[%s] read from swift return SWIFT_CLIENT_NO_MORE_MSG"
" last over [%ld] seconds, lastReadTs [%ld]",
LOG_PREFIX, gapTsInSeconds, timestamp);
BEEPER_FORMAT_REPORT_WITHOUT_TAGS(WORKER_ERROR_COLLECTOR_NAME,
"[%s] read from swift return SWIFT_CLIENT_NO_MORE_MSG"
" last over [%ld] seconds, lastReadTs [%ld]",
LOG_PREFIX, gapTsInSeconds, timestamp);
}
}
}
reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, /* not report fast queue delay */ false);
int64_t currentTime = autil::TimeUtility::currentTimeInSeconds();
if (_ckpDocReportInterval >= 0 && currentTime - _ckpDocReportTime >= _ckpDocReportInterval) {
BS_PREFIX_LOG(DEBUG, "Create CHECKPOINT_DOC, locator: src[%lu], offset[%ld].", _sourceSignature, timestamp);
_ckpDocReportTime = currentTime;
auto [success, progress] =
util::LocatorUtil::convertSwiftProgress(readerProgress, _swiftParam.isMultiTopic);
if (!success) {
AUTIL_LOG(ERROR, "convert swift progress failed [%s]", readerProgress.ShortDebugString().c_str());
return FE_FATAL;
}
docVec.reset(createSkipProcessedDocument(progress));
return FE_OK;
// no message more than 60s, report processor checkpoint use current time
}
return FE_WAIT;
}
_noMoreMsgBeginTs.store(-1, std::memory_order_relaxed);
_lastValidReadTs.store(timestamp, std::memory_order_relaxed);
if (unlikely(-1 == _lastMessageId)) {
BS_PREFIX_LOG(INFO, "read swift from msgId[%ld] uint16Payload[%u] uint8Payload[%u]", message.msgid(),
message.uint16payload(), message.uint8maskpayload());
string msg = "read swift from msgId[" + StringUtil::toString(message.msgid()) + "] uint16Payload[" +
StringUtil::toString(message.uint16payload()) + "] uint8Payload[" +
StringUtil::toString(message.uint8maskpayload()) + "]";
BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, msg);
}
_lastMessageUint16Payload = message.uint16payload();
_lastMessageUint8Payload = message.uint8maskpayload();
_lastMessageId = message.msgid();
bool handleProcessedDocSuccess = false;
try {
auto [success, progress] = util::LocatorUtil::convertSwiftProgress(readerProgress, _swiftParam.isMultiTopic);
if (!success) {
AUTIL_LOG(ERROR, "convert swift progress failed [%s]", readerProgress.ShortDebugString().c_str());
return FE_FATAL;
}
if (_swiftParam.disableSwiftMaskFilter && _swiftParam.maskFilterPairs.size() == 1) {
// disableSwiftMaskFilter=true will filter invalid msg by bs producer
auto mask = _swiftParam.maskFilterPairs[0].first;
auto result = _swiftParam.maskFilterPairs[0].second;
if ((_lastMessageUint8Payload & mask) != result) {
BS_PREFIX_LOG(
DEBUG,
"msg filtered by mask [%u], will Create CHECKPOINT_DOC, locator: src[%lu], msg timestamp[%ld].",
_lastMessageUint8Payload, _sourceSignature, message.timestamp());
docVec.reset(createSkipProcessedDocument(progress));
handleProcessedDocSuccess = true;
return FE_OK;
}
}
docVec.reset(createProcessedDocument(message.data(), message.timestamp(), timestamp, _lastMessageUint16Payload,
progress));
const DocumentPtr& document = std::dynamic_pointer_cast<Document>((*docVec)[0]->getDocument());
if (_hashIdRewriter) {
document->AddTag(DOCUMENT_HASHID_TAG_KEY, StringUtil::toString(message.uint16payload()));
_hashIdRewriter->Rewrite(document);
}
if (_lastMessageUint8Payload == ProcessedDocument::SWIFT_FILTER_BIT_FASTQUEUE ||
_lastMessageUint8Payload ==
(ProcessedDocument::SWIFT_FILTER_BIT_FASTQUEUE | ProcessedDocument::SWIFT_FILTER_BIT_REALTIME)) {
BS_PREFIX_LOG(DEBUG, "pk[%s] payload[%d] read from fast queue", document->GetPrimaryKey().c_str(),
_lastMessageUint8Payload);
}
PkTracer::fromSwiftTrace(document->GetPrimaryKey(), document->GetLocator().ToString(), message.msgid());
IE_INDEX_DOC_TRACE(document, "read processed doc from swift");
handleProcessedDocSuccess = true;
} catch (const autil::legacy::ExceptionBase& e) {
stringstream ss;
ss << "swift message msgId[" << message.msgid() << "] timestamp[" << message.timestamp() << "] uint16Payload["
<< message.uint16payload() << "] uint8MaskPayload[" << message.uint8maskpayload() << "]";
string errorMsg = ss.str();
BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
errorMsg = "create processed document failed [" + string(e.what()) + "]";
BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
} catch (...) {
stringstream ss;
ss << "swift message msgId[" << message.msgid() << "] timestamp[" << message.timestamp() << "] uint16Payload["
<< message.uint16payload() << "] uint8MaskPayload[" << message.uint8maskpayload() << "]";
string errorMsg = ss.str();
BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
errorMsg = "create processed document failed";
BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
}
if (_linkReporter) {
_linkReporter->collectSwiftMessageMeta(message);
_linkReporter->collectSwiftMessageOffset(timestamp);
}
return _exceptionHandler.transferProcessResult(handleProcessedDocSuccess);
}