in aios/apps/facility/build_service/build_service/workflow/SingleSwiftProcessedDocProducerV2.cpp [291:509]
FlowError SingleSwiftProcessedDocProducerV2::produce(document::ProcessedDocumentVecPtr& docVec)
{
Message message;
int64_t timestamp;
ErrorCode ec = ERROR_NONE;
swift::protocol::ReaderProgress readerProgress;
{
ScopeLatencyReporter reporter(_readLatencyMetric.get());
ec = _swiftParam.reader->read(timestamp, message);
if (ec == ERROR_NONE || ec == ERROR_CLIENT_NO_MORE_MESSAGE) {
if (_swiftParam.reader->getReaderProgress(readerProgress) != ERROR_NONE) {
// message already readed, if return FE_RETRY or similiar, message will lost
return FE_FATAL;
}
}
}
_lastReadTs.store(timestamp, std::memory_order_relaxed);
static const std::string emptyDocSource("");
if (ec == ERROR_CLIENT_EXCEED_TIME_STAMP_LIMIT) {
BS_PREFIX_LOG(INFO,
"swift read exceed the limited timestamp, "
"last message msgId[%ld] uint16Payload[%u] uint8Payload[%u]",
_lastMessageId, _lastMessageUint16Payload, _lastMessageUint8Payload);
reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, _swiftParam.isMultiTopic);
BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, "swift read exceed limit timestamp, reach eof.");
if (_alterTableSchemaId != indexlib::INVALID_SCHEMAID) {
_needAlterTable = true;
}
return FE_EOF;
} else if (ec == ERROR_SEALED_TOPIC_READ_FINISH) {
BS_PREFIX_LOG(INFO,
"swift topic sealed, "
"last message msgId[%ld] uint16Payload[%u] uint8Payload[%u]",
_lastMessageId, _lastMessageUint16Payload, _lastMessageUint8Payload);
reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, _swiftParam.isMultiTopic);
BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, "swift topic sealed.");
return FE_SEALED;
} else if (ec != ERROR_NONE) {
if (ec != ERROR_CLIENT_NO_MORE_MESSAGE) {
string errorMsg = "read from swift fail, errorCode[" + swift::protocol::ErrorCode_Name(ec) + "]";
BS_PREFIX_LOG(WARN, "%s", errorMsg.c_str());
BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
return FE_RETRY;
}
int64_t noMoreMsgBeginTs = _noMoreMsgBeginTs.load(std::memory_order_relaxed);
int64_t currentTs = TimeUtility::currentTime();
if (noMoreMsgBeginTs == -1) {
_noMoreMsgBeginTs.store(currentTs, std::memory_order_relaxed);
} else {
int64_t gapTsInSeconds = (currentTs - noMoreMsgBeginTs) / 1000000;
if (gapTsInSeconds > 60) {
static int64_t logTs;
if (currentTs - logTs > 300000000) { // 300s = 5min
logTs = currentTs;
BS_LOG(INFO,
"[%s] read from swift return SWIFT_CLIENT_NO_MORE_MSG"
" last over [%ld] seconds, lastReadTs [%ld]",
LOG_PREFIX, gapTsInSeconds, timestamp);
BEEPER_FORMAT_REPORT_WITHOUT_TAGS(WORKER_ERROR_COLLECTOR_NAME,
"[%s] read from swift return SWIFT_CLIENT_NO_MORE_MSG"
" last over [%ld] seconds, lastReadTs [%ld]",
LOG_PREFIX, gapTsInSeconds, timestamp);
}
}
}
reportFreshnessMetrics(0, /* no more msg */ true, emptyDocSource, _swiftParam.isMultiTopic);
int64_t currentTime = autil::TimeUtility::currentTimeInSeconds();
if (_ckpDocReportInterval >= 0 && currentTime - _ckpDocReportTime >= _ckpDocReportInterval) {
BS_PREFIX_LOG(DEBUG, "Create CHECKPOINT_DOC, locator: src[%lu], offset[%ld].", _sourceSignature, timestamp);
_ckpDocReportTime = currentTime;
uint8_t sourceIdx = 0;
if (_swiftParam.isMultiTopic) {
for (auto [mask, result] : _swiftParam.maskFilterPairs) {
if (((uint8_t)message.uint8maskpayload() & mask) == result) {
break;
}
sourceIdx++;
}
}
indexlibv2::framework::Locator::DocInfo docInfo(
/* hashId */ message.uint16payload(), /* timestamp */ message.timestamp(),
/* concurrentIdx*/ message.offsetinrawmsg(), /*sourceIdx*/ sourceIdx);
if (_locatorKeeper->needSkip(docInfo) || _alterTableSchemaId != indexlib::INVALID_SCHEMAID) {
BS_INTERVAL_LOG2(10, WARN, "filter message payload [%u] timestamp [%ld] progress [%s]",
message.uint16payload(), message.timestamp(),
readerProgress.ShortDebugString().c_str());
return FE_SKIP;
}
if (!_locatorKeeper->update(readerProgress, docInfo)) {
return FE_FATAL;
}
docVec.reset(createSkipProcessedDocument(_locatorKeeper->getLocator().GetMultiProgress()));
return FE_OK;
// no message more than 60s, report processor checkpoint use current time
}
return FE_WAIT;
}
_noMoreMsgBeginTs.store(-1, std::memory_order_relaxed);
_lastValidReadTs.store(timestamp, std::memory_order_relaxed);
if (unlikely(-1 == _lastMessageId)) {
BS_PREFIX_LOG(INFO, "read swift from msgId[%ld] uint16Payload[%u] uint8Payload[%u]", message.msgid(),
message.uint16payload(), message.uint8maskpayload());
string msg = "read swift from msgId[" + StringUtil::toString(message.msgid()) + "] uint16Payload[" +
StringUtil::toString(message.uint16payload()) + "] uint8Payload[" +
StringUtil::toString(message.uint8maskpayload()) + "]";
BEEPER_REPORT(WORKER_STATUS_COLLECTOR_NAME, msg);
}
_lastMessageUint16Payload = message.uint16payload();
_lastMessageUint8Payload = message.uint8maskpayload();
_lastMessageId = message.msgid();
bool handleProcessedDocSuccess = false;
try {
uint8_t sourceIdx = 0;
if (_swiftParam.isMultiTopic) {
for (auto [mask, result] : _swiftParam.maskFilterPairs) {
if (((uint8_t)message.uint8maskpayload() & mask) == result) {
break;
}
sourceIdx++;
}
}
indexlibv2::framework::Locator::DocInfo docInfo(
/* hashId */ message.uint16payload(), /* timestamp */ message.timestamp(),
/* concurrentIdx*/ message.offsetinrawmsg(), /*sourceIdx*/ sourceIdx);
if (_locatorKeeper->needSkip(docInfo)) {
BS_INTERVAL_LOG2(10, WARN, "filter message payload [%u] timestamp [%ld] progress [%s]",
message.uint16payload(), message.timestamp(), readerProgress.ShortDebugString().c_str());
return FE_SKIP;
}
if (_swiftParam.disableSwiftMaskFilter && _swiftParam.maskFilterPairs.size() == 1) {
// disableSwiftMaskFilter=true will filter invalid msg by bs producer
auto mask = _swiftParam.maskFilterPairs[0].first;
auto result = _swiftParam.maskFilterPairs[0].second;
if ((_lastMessageUint8Payload & mask) != result) {
BS_PREFIX_LOG(
DEBUG,
"msg filtered by mask [%u], will Create CHECKPOINT_DOC, locator: src[%lu], msg timestamp[%ld].",
_lastMessageUint8Payload, _sourceSignature, message.timestamp());
if (!_locatorKeeper->update(readerProgress, docInfo)) {
return FE_FATAL;
}
docVec.reset(createSkipProcessedDocument(_locatorKeeper->getLocator().GetMultiProgress()));
return FE_OK;
}
}
docVec.reset(createProcessedDocument(message.data(), docInfo, timestamp,
_locatorKeeper->getLocator().GetMultiProgress()));
if (!docVec) {
BS_PREFIX_LOG(ERROR, "create processed document failed");
return _exceptionHandler.transferProcessResult(handleProcessedDocSuccess);
}
const std::shared_ptr<indexlibv2::document::IDocument>& document = (*docVec)[0]->getDocument();
auto schemaId = document->GetSchemaId();
if (schemaId > _schema->GetSchemaId()) {
auto tmpSchemaId = _alterTableSchemaId.load(std::memory_order_relaxed);
if (_alterTableSchemaId == indexlib::INVALID_SCHEMAID) {
_alterTableSchemaId = schemaId;
} else {
_alterTableSchemaId = std::min(_alterTableSchemaId.load(std::memory_order_relaxed), schemaId);
}
if (tmpSchemaId != _alterTableSchemaId.load(std::memory_order_relaxed)) {
BS_LOG(INFO, "next alter table schema change [%d]",
_alterTableSchemaId.load(std::memory_order_relaxed));
}
if (_stopTimestamp == std::numeric_limits<int64_t>::max()) {
suspendReadAtTimestamp(message.timestamp(), common::ETA_STOP);
}
_locatorKeeper->updateSchemaChangeDocInfo(docInfo);
return FE_SKIP;
}
if (!_locatorKeeper->update(readerProgress, docInfo)) {
return FE_FATAL;
}
(*docVec)[0]->setLocator(_locatorKeeper->getLocator());
std::string pk;
const auto& normalDoc = std::dynamic_pointer_cast<indexlibv2::document::NormalDocument>(document);
if (normalDoc) {
pk = normalDoc->GetPrimaryKey();
}
if (_lastMessageUint8Payload == ProcessedDocument::SWIFT_FILTER_BIT_FASTQUEUE ||
_lastMessageUint8Payload ==
(ProcessedDocument::SWIFT_FILTER_BIT_FASTQUEUE | ProcessedDocument::SWIFT_FILTER_BIT_REALTIME)) {
BS_PREFIX_LOG(INFO, "pk[%s] payload[%d] read from fast queue, locator[%s]", pk.c_str(),
_lastMessageUint8Payload, document->GetLocatorV2().DebugString().c_str());
}
PkTracer::fromSwiftTrace(pk, document->GetLocatorV2().DebugString(), message.msgid());
IE_DOC_TRACE(document, "read processed doc from swift");
handleProcessedDocSuccess = true;
} catch (const autil::legacy::ExceptionBase& e) {
stringstream ss;
ss << "swift message msgId[" << message.msgid() << "] timestamp[" << message.timestamp() << "] uint16Payload["
<< message.uint16payload() << "] uint8MaskPayload[" << message.uint8maskpayload() << "]";
string errorMsg = ss.str();
BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
errorMsg = "create processed document failed [" + string(e.what()) + "]";
BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
} catch (...) {
stringstream ss;
ss << "swift message msgId[" << message.msgid() << "] timestamp[" << message.timestamp() << "] uint16Payload["
<< message.uint16payload() << "] uint8MaskPayload[" << message.uint8maskpayload() << "]";
string errorMsg = ss.str();
BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
errorMsg = "create processed document failed";
BS_PREFIX_LOG(ERROR, "%s", errorMsg.c_str());
BEEPER_INTERVAL_REPORT(10, WORKER_ERROR_COLLECTOR_NAME, errorMsg);
}
if (_linkReporter) {
_linkReporter->collectSwiftMessageMeta(message);
_linkReporter->collectSwiftMessageOffset(timestamp);
}
return _exceptionHandler.transferProcessResult(handleProcessedDocSuccess);
}