aios/apps/facility/swift/client/SwiftSinglePartitionReader.cpp (857 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "swift/client/SwiftSinglePartitionReader.h"
#include <algorithm>
#include <assert.h>
#include <cstdint>
#include <iostream>
#include <limits>
#include <memory>
#include <unistd.h>
#include "autil/CommonMacros.h"
#include "autil/StringUtil.h"
#include "autil/TimeUtility.h"
#include "swift/client/SwiftClient.h"
#include "swift/client/trace/SwiftErrorResponseCollector.h"
#include "swift/client/trace/SwiftFatalErrorCollector.h"
#include "swift/config/ClientAuthorizerInfo.h"
#include "swift/monitor/ClientMetricsReporter.h"
#include "swift/monitor/MetricsCommon.h"
#include "swift/network/SwiftRpcChannelManager.h"
#include "swift/protocol/BrokerRequestResponse.pb.h"
#include "swift/protocol/Common.pb.h"
#include "swift/protocol/ErrCode.pb.h"
#include "swift/protocol/FBMessageReader.h"
#include "swift/protocol/SwiftMessage.pb.h"
#include "swift/protocol/SwiftMessage_generated.h"
#include "swift/protocol/TraceMessage.pb.h"
#include "swift/util/Atomic.h"
#include "swift/util/IpUtil.h"
#include "swift/util/SwiftUuidGenerator.h"
#include "swift/version.h"
namespace swift {
namespace client {
class Notifier;
} // namespace client
} // namespace swift
using namespace std;
using namespace autil;
using namespace swift::protocol;
using namespace swift::network;
using namespace swift::util;
using namespace swift::monitor;
namespace swift {
namespace client {
AUTIL_LOG_SETUP(swift, SwiftSinglePartitionReader);
static const int64_t MAX_TIME_STAMP = numeric_limits<int64_t>::max();
#define HANDLE_SESSION_OR_PARTITION_CHANGE \
if (_lastTopicVersion == -1 && response->has_topicversion()) { \
_lastTopicVersion = response->topicversion(); \
} \
if (ec == ERROR_BROKER_SESSION_CHANGED || ec == ERROR_BROKER_TOPIC_PARTITION_NOT_FOUND) { \
if (_func) { \
int64_t topicVersion = 0; \
if (response->has_topicversion()) { \
topicVersion = response->topicversion(); \
} \
ErrorCode funcEc; \
if (_lastTopicVersion > 0 || topicVersion > 0) { \
if (topicVersion != _lastTopicVersion) { \
if (topicVersion != 0) { \
funcEc = _func(topicVersion); \
} else { \
funcEc = _func(_lastTopicVersion); \
} \
if (funcEc != ERROR_NONE) { \
return funcEc; \
} \
if (topicVersion != 0) { \
AUTIL_LOG(INFO, \
"[%s %d] topic version changed[%ld -> %ld]", \
_config.topicName.c_str(), \
_partitionId, \
_lastTopicVersion, \
topicVersion); \
_lastTopicVersion = topicVersion; \
} \
} \
} else { \
AUTIL_LOG(INFO, \
"[%s %d] topic version may changed, error[%s]", \
_config.topicName.c_str(), \
_partitionId, \
ErrorCode_Name(ec).c_str()); \
funcEc = _func(topicVersion); \
if (funcEc != ERROR_NONE) { \
return funcEc; \
} \
} \
} \
if (response->has_sessionid()) { \
AUTIL_LOG(INFO, \
"[%s %d] sessionid changed[%ld -> %ld]", \
_config.topicName.c_str(), \
_partitionId, \
_sessionId, \
response->sessionid()); \
_sessionId = response->sessionid(); \
} \
return ec; \
} else { \
if (response->has_topicversion()) { \
int64_t topicVersion = response->topicversion(); \
if (topicVersion > 0 && _lastTopicVersion != topicVersion) { \
ErrorCode funcEc = _func(topicVersion); \
AUTIL_LOG(INFO, \
"[%s %d] topic version changed[%ld -> %ld], ret[%s]", \
_config.topicName.c_str(), \
_partitionId, \
_lastTopicVersion, \
topicVersion, \
ErrorCode_Name(funcEc).c_str()); \
if (funcEc != ERROR_NONE) { \
return funcEc; \
} \
_lastTopicVersion = topicVersion; \
} \
} \
}
SwiftSinglePartitionReader::SwiftSinglePartitionReader(SwiftAdminAdapterPtr adminAdapter,
SwiftRpcChannelManagerPtr channelManager,
uint32_t partitionId,
const SwiftReaderConfig &config,
int64_t topicVersion,
Notifier *notifier,
monitor::ClientMetricsReporter *reporter)
: _adminAdapter(adminAdapter)
, _partitionId(partitionId)
, _config(config)
, _timestampLimit(MAX_TIME_STAMP)
, _lastMsgTimestamp(-1)
, _lastMsgOffsetInMerge(0)
, _lastCheckpointTimestamp(-1)
, _func(NULL)
, _sessionId(-1)
, _clientCommittedCheckpoint(-1)
, _lastTopicVersion(topicVersion)
, _sealedTopicReadFinish(false)
, _isTopicLongPollingEnabled(false)
, _metricsReporter(reporter) {
_transportAdapter = new SwiftTransportAdapter<TRT_GETMESSAGE>(
adminAdapter, channelManager, _config.topicName, partitionId, notifier, _config.retryGetMsgInterval);
string idStr = "read " + _config.topicName + " " + autil::StringUtil::toString(_partitionId);
_transportAdapter->setIdStr(idStr);
protocol::AuthenticationInfo authInfo;
*authInfo.mutable_accessauthinfo() = config.getAccessAuthInfo();
if (_adminAdapter) {
auto clientAuthInfo = _adminAdapter->getAuthenticationConf();
if (!clientAuthInfo.isEmpty()) {
authInfo.set_username(clientAuthInfo.username);
authInfo.set_passwd(clientAuthInfo.passwd);
}
}
_transportAdapter->setAuthInfo(authInfo);
_maxMsgIdTransportAdapter = new SwiftTransportAdapter<TRT_GETMAXMESSAGEID>(
adminAdapter, channelManager, _config.topicName, partitionId, NULL);
idStr = "read_maxid " + _config.topicName + " " + autil::StringUtil::toString(_partitionId);
_maxMsgIdTransportAdapter->setIdStr(idStr);
_maxMsgIdTransportAdapter->setAuthInfo(authInfo);
_msgIdTransportAdapter = new SwiftTransportAdapter<TRT_GETMINMESSAGEIDBYTIME>(
adminAdapter, channelManager, _config.topicName, _partitionId);
idStr = "read_id " + _config.topicName + " " + autil::StringUtil::toString(_partitionId);
_msgIdTransportAdapter->setIdStr(idStr);
_msgIdTransportAdapter->setAuthInfo(authInfo);
_clientVersionInfo.set_version(swift_client_version_str);
if (_config.messageFormat == 1) {
_clientVersionInfo.set_supportfb(true);
} else {
_clientVersionInfo.set_supportfb(false);
}
if (_config.mergeMessage == 1) {
_clientVersionInfo.set_supportmergemsg(true);
} else {
_clientVersionInfo.set_supportmergemsg(false);
}
_clientVersionInfo.set_clienttype(CT_CPP);
_clientVersionInfo.set_supportauthentication(true);
clear();
_buffer.updateFilter(_config.swiftFilter, _requiredFieldNames, _filedFilterDesc);
_buffer.setTopicName(_config.topicName);
_buffer.setPartitionId(_partitionId);
_readerInfo.set_topicname(_config.topicName);
_readerInfo.set_partitionid(_partitionId);
_readerInfo.set_readerid(_config.readerIdentity);
_readerInfo.set_pid(getpid());
_readerInfo.set_ip(IpUtil::getIp());
*_readerInfo.mutable_filter() = _config.swiftFilter;
_buffer.setReaderInfo(_readerInfo);
_metricsTags.AddTag("topic", config.topicName);
_metricsTags.AddTag("partition", intToString(_partitionId));
if (_config.checkpointMode.empty() || _config.checkpointMode == READER_CHECKPOINT_REFRESH_MODE) {
_checkpointReflushMode = true;
} else {
_checkpointReflushMode = false;
}
}
SwiftSinglePartitionReader::~SwiftSinglePartitionReader() {
DELETE_AND_SET_NULL(_transportAdapter);
DELETE_AND_SET_NULL(_maxMsgIdTransportAdapter);
DELETE_AND_SET_NULL(_msgIdTransportAdapter);
}
void SwiftSinglePartitionReader::clear() {
_partitionStatus.refreshTime = -1;
_partitionStatus.maxMessageId = -1;
_partitionStatus.maxMessageTimestamp = -1;
_buffer.clear();
_nextMsgId = 0;
_nextTimestamp = -1;
_seekTimestamp = -1;
_lastErrorCode = ERROR_NONE;
_errorCodeForCheck = ERROR_NONE;
_lastSuccessResponseTime = TimeUtility::currentTime();
_lastReportErrorTime = TimeUtility::currentTime();
_seekByTimestampRecord.lastErrorCode = ERROR_NONE;
_seekByTimestampRecord.lastSeekTime = TimeUtility::currentTime();
_lastRefreshMsgIdTime = 0;
_timestampLimit = MAX_TIME_STAMP;
_lastMsgTimestamp = -1;
_lastMsgOffsetInMerge = 0;
_lastCheckpointTimestamp = -1;
_func = NULL;
_sessionId = -1;
}
bool SwiftSinglePartitionReader::exceedTimestampLimit() {
if (_timestampLimit == MAX_TIME_STAMP) {
return false;
}
int64_t timestamp = getNextMsgTimestamp();
if (timestamp == -1) {
return false;
}
return timestamp > _timestampLimit;
}
bool SwiftSinglePartitionReader::tryRead(protocol::Messages &msgs) {
msgs.clear_msgs();
uint32_t maxReadCount = _config.batchReadCount;
Message tmpMessage;
while (maxReadCount-- > 0) {
if (_buffer.getFirstMsgTimestamp() > _timestampLimit) {
break;
}
if (_buffer.read(tmpMessage)) {
if (tmpMessage.timestamp() < _lastMsgTimestamp) {
AUTIL_LOG(WARN,
"[%s %d] message timestamp roll back, msg id [%ld], timestamp [%ld], last msg timestamp[%ld]",
_config.topicName.c_str(),
_partitionId,
tmpMessage.msgid(),
tmpMessage.timestamp(),
_lastMsgTimestamp);
}
_lastMsgTimestamp = tmpMessage.timestamp();
_lastMsgOffsetInMerge = tmpMessage.offsetinrawmsg();
msgs.add_msgs()->Swap(&tmpMessage);
} else if (getUnReadMsgCount() <= 0) {
break;
}
}
return msgs.msgs_size() > 0;
}
int64_t SwiftSinglePartitionReader::tryFillBuffer(int64_t currentTime) {
bool isSend = false;
return tryFillBuffer(currentTime, false, isSend);
}
int64_t SwiftSinglePartitionReader::tryFillBuffer(int64_t currentTime, bool force, bool &isSent) {
ScopedLock lock(_mutex);
int64_t retryInterval = MAX_TIME_STAMP;
isSent = false;
if (!_transportAdapter->isLastRequestDone()) {
return MAX_TIME_STAMP;
}
ErrorCode ec = ERROR_NONE;
ClientMetricsCollector collector;
if (!_transportAdapter->isLastRequestHandled()) {
ec = handleGetMessageResponse(collector);
checkErrorCode(ec);
}
retryInterval = _transportAdapter->getRetryInterval(currentTime);
if (retryInterval > 0 && !force) {
return retryInterval;
}
if ((_nextTimestamp <= _timestampLimit && getUnReadMsgCount() < (int64_t)_config.readBufferSize) || force) {
if (mayWaitForRetry(ec) && !force) {
int64_t ci = currentTime - _partitionStatus.refreshTime;
if (ci < (int64_t)_config.retryGetMsgInterval) {
return _config.retryGetMsgInterval - ci;
}
}
ec = postGetMessageRequest(collector);
if (ec == ERROR_NONE) {
isSent = true;
}
checkErrorCode(ec);
retryInterval = _transportAdapter->getRetryInterval(currentTime);
if (retryInterval > 0) {
return retryInterval;
}
}
return MAX_TIME_STAMP;
}
void SwiftSinglePartitionReader::checkErrorCode(ErrorCode ec) {
if (ec != ERROR_NONE) {
_lastErrorCode = ec;
_errorCodeForCheck = ec;
}
}
void SwiftSinglePartitionReader::resetLastErrorCode() { _lastErrorCode = ERROR_NONE; }
void SwiftSinglePartitionReader::checkCurrentError(ErrorCode &errorCode, string &errorMsg) const {
errorCode = hasFatalError(_errorCodeForCheck);
if (ERROR_NONE == errorCode) {
return;
}
stringstream ss;
int64_t interval = TimeUtility::currentTime() - _lastSuccessResponseTime;
ss << "The interval between last Success getMsgReponse and now is:" << interval / 1000000 << " s"
<< ", topic name[" << _config.topicName << "], partitionId[" << _partitionId << "]"
<< ", last errorCode[" << _errorCodeForCheck << "].";
errorMsg = ss.str();
}
ErrorCode SwiftSinglePartitionReader::hasFatalError(ErrorCode ec) const {
bool isFatalError = (ec != ERROR_NONE) && (ec != ERROR_BROKER_SOME_MESSAGE_LOST) && (ec != ERROR_BROKER_NO_DATA) &&
(ec != ERROR_CLIENT_NO_MORE_MESSAGE);
if (!isFatalError) {
return ERROR_NONE;
}
int64_t interval = TimeUtility::currentTime() - _lastSuccessResponseTime;
if (interval >= (int64_t)_config.fatalErrorTimeLimit) {
return ec;
} else {
return ERROR_NONE;
}
}
int64_t SwiftSinglePartitionReader::getUnReadMsgCount() { return _buffer.getUnReadMsgCount(); }
bool SwiftSinglePartitionReader::hasUnReadMsg() {
if (getUnReadMsgCount() <= 0) {
return false;
} else if (getFirstMsgTimestamp() <= _timestampLimit) {
return true;
} else {
return false;
}
}
ErrorCode SwiftSinglePartitionReader::reportFatalError(bool resetLastError) {
if (ERROR_SEALED_TOPIC_READ_FINISH == _lastErrorCode) {
return _lastErrorCode;
}
ErrorCode ec = hasFatalError(_lastErrorCode);
if (ec == ERROR_NONE) {
return ERROR_NONE;
}
int64_t currentTime = TimeUtility::currentTime();
int64_t interval = currentTime - _lastReportErrorTime;
if (interval >= (int64_t)_config.fatalErrorReportInterval) {
if (resetLastError) {
_lastReportErrorTime = currentTime;
resetLastErrorCode();
}
return ec;
}
return ERROR_NONE;
}
ErrorCode SwiftSinglePartitionReader::seekByMessageId(int64_t msgId) {
if (msgId < 0) {
AUTIL_LOG(WARN, "msgId[%ld] less than zero, set msgId to zero", msgId);
msgId = 0;
}
ErrorCode ec = _transportAdapter->ignoreLastResponse();
checkErrorCode(ec);
_nextMsgId = _buffer.seek(msgId);
_nextTimestamp = -1;
_lastMsgTimestamp = -1;
_lastMsgOffsetInMerge = 0;
_lastCheckpointTimestamp = -1;
_seekTimestamp = -1;
_sealedTopicReadFinish = false;
return ERROR_NONE;
}
ErrorCode SwiftSinglePartitionReader::seekByTimestamp(int64_t timestamp) {
int64_t currTime = TimeUtility::currentTime();
if (ERROR_NONE != _seekByTimestampRecord.lastErrorCode &&
currTime < _seekByTimestampRecord.lastSeekTime + SEEKBYTIMESTAMP_ERROR_INTERVAL) {
usleep(_seekByTimestampRecord.lastSeekTime + SEEKBYTIMESTAMP_ERROR_INTERVAL - currTime);
}
ErrorCode ec = doSeekByTimestamp(timestamp);
if (ec == ERROR_NONE) {
_seekTimestamp = timestamp;
} else {
AUTIL_LOG(WARN,
"[%s %d] seek to[%ld] error[%s]",
_config.topicName.c_str(),
_partitionId,
timestamp,
ErrorCode_Name(ec).c_str());
}
_seekByTimestampRecord.lastErrorCode = ec;
_seekByTimestampRecord.lastSeekTime = TimeUtility::currentTime();
return ec;
}
ErrorCode SwiftSinglePartitionReader::doSeekByTimestamp(int64_t timestamp) {
int64_t msgid = -1;
int64_t msgTime = -1;
ErrorCode ec = getMinMessageIdByTime(timestamp, msgid, msgTime);
AUTIL_LOG(DEBUG,
"[%s %d] do seek to [%ld], error [%s], msgid [%ld], timestamp[%ld]",
_config.topicName.c_str(),
_partitionId,
timestamp,
ErrorCode_Name(ec).c_str(),
msgid,
msgTime);
if (ec != ERROR_NONE) {
checkErrorCode(ec);
return ec;
}
ec = seekByMessageId(msgid);
_nextTimestamp = msgTime;
return ec;
}
ErrorCode SwiftSinglePartitionReader::getMinMessageIdByTime(int64_t timestamp, int64_t &msgid, int64_t &msgTime) {
MessageIdRequest *request = new MessageIdRequest;
request->set_timestamp(timestamp);
ErrorCode ec = _msgIdTransportAdapter->postRequest(request);
if (ec != ERROR_NONE) {
AUTIL_LOG(WARN,
"[%s %d] seek by timestamp failed for[%s]!",
_config.topicName.c_str(),
_partitionId,
ErrorCode_Name(ec).c_str());
delete request;
return ec;
}
_msgIdTransportAdapter->waitLastRequestDone();
MessageIdResponse *response = NULL;
ec = _msgIdTransportAdapter->stealResponse(response);
unique_ptr<MessageIdResponse> responsePtr(response);
// handle session change or partition not found, maybe caused by partition changed
HANDLE_SESSION_OR_PARTITION_CHANGE;
if (ec != ERROR_NONE && ec != ERROR_BROKER_TIMESTAMP_TOO_LATEST && ec != ERROR_BROKER_NO_DATA) {
AUTIL_LOG(WARN,
"[%s %d] getMinMessageIdByTime failed for [%s]!",
_config.topicName.c_str(),
_partitionId,
ErrorCode_Name(ec).c_str());
return ec;
}
assert(response);
ErrorCode ec2 = fillPartitionInfo(_msgIdTransportAdapter->getLastTransportClosureDoneTime(), *response);
if (ec2 != ERROR_NONE) {
return ec2;
}
if (ec == ERROR_NONE) {
msgid = response->msgid();
msgTime = response->timestamp();
} else if (ec == ERROR_BROKER_NO_DATA) {
msgid = 0;
msgTime = -1;
} else if (ec == ERROR_BROKER_TIMESTAMP_TOO_LATEST) {
msgid = response->maxmsgid() + 1;
msgTime = std::max(response->maxtimestamp() + 1, timestamp);
} else {
assert(false);
}
return ERROR_NONE;
}
int64_t SwiftSinglePartitionReader::getFirstMsgTimestamp() { return _buffer.getFirstMsgTimestamp(); }
int64_t SwiftSinglePartitionReader::getNextMsgTimestamp() {
int64_t f = getFirstMsgTimestamp();
if (-1 != f) {
return f;
}
return std::max(_nextTimestamp, _seekTimestamp);
}
std::pair<int64_t, int32_t> SwiftSinglePartitionReader::getCheckpointTimestamp() {
int64_t checkpoint = getFirstMsgTimestamp(); // merge message has same msg timestamp
int32_t offsetInMerge = 0;
if (checkpoint == -1) { // read buffer no message
if (_checkpointReflushMode) {
checkpoint =
std::max(_lastMsgTimestamp + 1, int64_t(_nextTimestamp - _config.checkpointRefreshTimestampOffset));
checkpoint = std::max(checkpoint, _seekTimestamp);
} else {
checkpoint = std::max(_lastMsgTimestamp + 1, _seekTimestamp);
}
} else if (_lastMsgTimestamp == checkpoint) { // merge message
if (_lastMsgOffsetInMerge < common::OFFSET_IN_MERGE_MSG_BASE) {
AUTIL_LOG(
WARN,
"[%s %d] checkpoint timestamp [%ld] is same, but last msg offset in merge is [%d], less than [%d]. ",
_config.topicName.c_str(),
_partitionId,
checkpoint,
_lastMsgOffsetInMerge,
common::OFFSET_IN_MERGE_MSG_BASE);
} else {
offsetInMerge = _lastMsgOffsetInMerge - common::OFFSET_IN_MERGE_MSG_BASE + 1;
}
}
if (checkpoint < _lastCheckpointTimestamp) {
AUTIL_LOG(WARN,
"[%s %d] checkpoint timestamp roll back, current checkpoint timestamp [%ld], last checkpoint "
"timestamp[%ld]",
_config.topicName.c_str(),
_partitionId,
checkpoint,
_lastCheckpointTimestamp);
}
_lastCheckpointTimestamp = checkpoint;
return std::make_pair(_lastCheckpointTimestamp, offsetInMerge);
}
ErrorCode SwiftSinglePartitionReader::handleGetMessageResponse(ClientMetricsCollector &collector) {
MessageResponse *response = NULL;
ErrorCode ec = _transportAdapter->stealResponse(response);
if (!response) {
AUTIL_LOG(WARN, "[%s %d] Invalid broker response, response is empty", _config.topicName.c_str(), _partitionId);
return ec;
}
if (response->requestuuid() == 0) {
response->set_requestuuid(_curRequestUuid);
} else if (response->requestuuid() != _curRequestUuid) {
AUTIL_LOG(INFO,
"[%s %d] request uuid not match expect [%lu] actual [%lu]",
_config.topicName.c_str(),
_partitionId,
_curRequestUuid,
response->requestuuid());
}
unique_ptr<MessageResponse> auto_response(response);
if (ERROR_BROKER_INVALID_USER == ec) {
AUTIL_LOG(WARN,
"username[%s] not permitter to read[%s %d]",
_transportAdapter->getUsername().c_str(),
_config.topicName.c_str(),
_partitionId);
return ec;
}
AUTIL_LOG(DEBUG, "handle [%s %d %ld] response", _config.topicName.c_str(), _partitionId, response->requestuuid());
// handle session change or partition not found, maybe caused by partition changed
HANDLE_SESSION_OR_PARTITION_CHANGE;
if (ec != ERROR_NONE && ec != ERROR_BROKER_NO_DATA && ec != ERROR_BROKER_SOME_MESSAGE_LOST &&
ec != ERROR_SEALED_TOPIC_READ_FINISH) {
AUTIL_LOG(WARN,
"getMessage from [%s %d %ld] failed for [%s]!",
_config.topicName.c_str(),
_partitionId,
response->requestuuid(),
ErrorCode_Name(ec).c_str());
return ec;
}
if (ERROR_BROKER_SOME_MESSAGE_LOST == ec) {
AUTIL_LOG(WARN,
"some message lost in[%s %d, %ld]! maybe broker read file failed",
_config.topicName.c_str(),
_partitionId,
response->requestuuid());
}
assert(response);
if (response->has_sessionid()) {
_sessionId = response->sessionid();
}
if (ERROR_SEALED_TOPIC_READ_FINISH == ec && 0 == _buffer.getUnReadMsgCount()) {
_sealedTopicReadFinish = true;
AUTIL_LOG(INFO,
"[%s:%d %ld] read finsh[%s]",
_config.topicName.c_str(),
_partitionId,
response->requestuuid(),
ErrorCode_Name(ec).c_str());
return ec;
}
ErrorCode ec2;
ec2 = fillPartitionInfo(_transportAdapter->getLastTransportClosureDoneTime(), *response);
if (ec2 != ERROR_NONE) {
return ec2;
}
if (!isValidateResponse(response, ec)) {
return ERROR_CLIENT_INVALID_RESPONSE;
}
_lastSuccessResponseTime = TimeUtility::currentTime();
int64_t maxMsgId = response->maxmsgid();
uint32_t msgSize = 0;
int64_t lastTimeStamp = -1;
if (response->messageformat() == MF_FB) {
protocol::FBMessageReader reader;
if (!reader.init(response->fbmsgs(), false)) {
return ERROR_CLIENT_INVALID_RESPONSE;
}
msgSize = reader.size();
if (msgSize > 0) {
lastTimeStamp = (reader.read(msgSize - 1))->timestamp();
}
} else {
msgSize = response->msgs_size();
if (msgSize > 0) {
lastTimeStamp = (response->msgs()[msgSize - 1]).timestamp();
}
}
if (response->has_maxtimestamp() && -1 != lastTimeStamp) {
if (_metricsReporter) {
ReaderDelayCollector delayCollector;
delayCollector.readDelay = response->maxtimestamp() - lastTimeStamp;
delayCollector.currentDelay = TimeUtility::currentTime() - lastTimeStamp;
_metricsReporter->reportDelay(delayCollector, &_metricsTags);
}
}
collector.requestMsgCount = msgSize;
if (ec == ERROR_BROKER_NO_DATA) {
ec = ERROR_CLIENT_NO_MORE_MESSAGE;
} else if (msgSize > 0) {
if (!_buffer.addResponse(auto_response.release())) {
string errInfo = "[" + _config.topicName + " " + StringUtil::toString(_partitionId) + " " +
StringUtil::toString(response->requestuuid()) +
"] add response to read buffer failed. has msg count: " + StringUtil::toString(msgSize);
AUTIL_LOG(ERROR, "%s", errInfo.c_str());
ec = ERROR_CLIENT_INVALID_RESPONSE;
auto *errCollector = ErrorCollectorSingleton::getInstance();
errCollector->addRequestHasErrorDataInfo(errInfo);
auto *responseCollector = ResponseCollectorSingleton::getInstance();
string content;
if (response->SerializeToString(&content)) {
responseCollector->logResponse(_readerInfo, MESSAGE_RESPONSE, content);
} else {
responseCollector->logResponse(_readerInfo, MESSAGE_RESPONSE_FBMSG_PART, response->fbmsgs());
}
// return ec; // TODO, retry or skip request bug
}
_nextMsgId = response->nextmsgid();
} else {
if (_nextMsgId <= maxMsgId) {
_nextMsgId = response->nextmsgid();
} else {
int64_t oldNextId = _nextMsgId;
_nextMsgId = maxMsgId + 1;
if (_nextMsgId < oldNextId) {
AUTIL_LOG(WARN,
"response [%ld], Next msgId rollback from [%ld] to [%ld]",
response->requestuuid(),
oldNextId,
_nextMsgId);
}
ec = ERROR_CLIENT_NO_MORE_MESSAGE;
}
}
if (response->has_nexttimestamp()) {
_nextTimestamp = response->nexttimestamp();
}
return ec;
}
bool SwiftSinglePartitionReader::isValidateResponse(MessageResponse *response, ErrorCode ec) const {
if (ec != ERROR_BROKER_NO_DATA) {
if (!response->has_nextmsgid()) {
return false;
}
if (ec != ERROR_BROKER_SOME_MESSAGE_LOST && !response->has_nexttimestamp()) {
return false;
}
}
return true;
}
ErrorCode SwiftSinglePartitionReader::postGetMessageRequest(ClientMetricsCollector &collector) {
ConsumptionRequest *request = new ConsumptionRequest;
request->set_startid(_nextMsgId);
request->set_sessionidextend(_sessionId);
request->set_identitystr(_config.readerIdentity);
_requestSeq++;
int64_t curTime = TimeUtility::currentTime();
_curRequestUuid =
SwiftUuidGenerator::genRequestUuid(curTime / 1000, _partitionId, SwiftClient::traceFlag, _requestSeq);
request->set_requestuuid(_curRequestUuid);
request->set_generatedtime(curTime);
if (_nextTimestamp != -1) {
request->set_starttimestamp(_nextTimestamp);
}
if (_clientCommittedCheckpoint != -1) {
request->set_committedcheckpoint(_clientCommittedCheckpoint);
}
request->set_count(_config.oneRequestReadCount);
request->set_maxtotalsize(_config.consumptionRequestBytesLimit);
if (!_config.readAll) {
*request->mutable_filter() = _config.swiftFilter;
}
ClientVersionInfo *clientVersionInfo = request->mutable_versioninfo();
*clientVersionInfo = _clientVersionInfo;
for (vector<string>::const_iterator it = _requiredFieldNames.begin(); it != _requiredFieldNames.end(); ++it) {
request->add_requiredfieldnames(*it);
}
if (!_filedFilterDesc.empty()) {
request->set_fieldfilterdesc(_filedFilterDesc);
}
request->set_needcompress(_config.needCompress);
request->set_candecompressmsg(_config.canDecompress);
AUTIL_LOG(DEBUG,
"[%s %d] post read request is [%s]",
_config.topicName.c_str(),
_partitionId,
request->ShortDebugString().c_str());
ErrorCode ec = _transportAdapter->postRequest(request, common::DEFAULT_POST_TIMEOUT, &collector);
if (ERROR_NONE != ec) {
AUTIL_LOG(WARN,
"post getMessageRequest to[%s %d] failed, ec[%s]",
_config.topicName.c_str(),
_partitionId,
ErrorCode_Name(ec).c_str());
collector.reset(true);
DELETE_AND_SET_NULL(request);
if (ERROR_CLIENT_LOGIC_TOPIC_NOT_IN_BROKER == ec) {
_func(_lastTopicVersion);
}
}
if (_metricsReporter) {
_metricsReporter->reportReaderMetrics(collector, &_metricsTags);
}
return ec;
}
template <typename ResponseType>
ErrorCode SwiftSinglePartitionReader::fillPartitionInfo(int64_t refreshTime, const ResponseType &response) {
ScopedWriteLock wlock(_partStatusLock);
_partitionStatus.refreshTime = refreshTime;
ErrorCode ec = response.errorinfo().errcode();
if (ec == ERROR_BROKER_NO_DATA) {
_partitionStatus.maxMessageId = -1;
_partitionStatus.maxMessageTimestamp = -1;
return ERROR_NONE;
}
if (!response.has_maxmsgid() || !response.has_maxtimestamp()) {
AUTIL_LOG(WARN, "[%s %d] invalid response!", _config.topicName.c_str(), _partitionId);
return ERROR_CLIENT_INVALID_RESPONSE;
}
_partitionStatus.maxMessageId = response.maxmsgid();
_partitionStatus.maxMessageTimestamp = response.maxtimestamp();
return ERROR_NONE;
}
SwiftPartitionStatus SwiftSinglePartitionReader::getSwiftPartitionStatus(int64_t curTime) {
if (!_maxMsgIdTransportAdapter->isLastRequestDone()) {
ScopedReadLock rlock(_partStatusLock);
return _partitionStatus;
}
if (!_maxMsgIdTransportAdapter->isLastRequestHandled()) {
handleGetMessageIdResponse();
}
ScopedReadLock rlock(_partStatusLock);
int64_t refreshInterval = _config.partitionStatusRefreshInterval;
if (curTime - _partitionStatus.refreshTime > refreshInterval && _lastRefreshMsgIdTime < curTime) {
MessageIdRequest *request = new MessageIdRequest;
_lastRefreshMsgIdTime = curTime;
int64_t requestUuid = SwiftUuidGenerator::genRequestUuid(
TimeUtility::currentTime() / 1000, _partitionId, SwiftClient::traceFlag, _requestSeq);
request->set_requestuuid(requestUuid);
ErrorCode ec = _maxMsgIdTransportAdapter->postRequest(request);
if (ERROR_NONE != ec) {
AUTIL_LOG(INFO,
"[%s %d] post getMaxMessageIdRequest failed, ec[%s]",
_config.topicName.c_str(),
_partitionId,
ErrorCode_Name(ec).c_str());
delete request;
if (ERROR_CLIENT_LOGIC_TOPIC_NOT_IN_BROKER == ec) {
_func(_lastTopicVersion);
}
}
}
return _partitionStatus;
}
ErrorCode SwiftSinglePartitionReader::handleGetMessageIdResponse() {
MessageIdResponse *response = NULL;
ErrorCode ec = _maxMsgIdTransportAdapter->stealResponse(response);
assert(response);
unique_ptr<MessageIdResponse> responsePtr(response);
// handle session change or partition not found, maybe caused by partition changed
HANDLE_SESSION_OR_PARTITION_CHANGE;
ScopedWriteLock wlock(_partStatusLock);
if (ERROR_BROKER_NO_DATA == ec) {
_partitionStatus.refreshTime = _lastRefreshMsgIdTime;
_partitionStatus.maxMessageId = -1;
_partitionStatus.maxMessageTimestamp = -1;
return ec;
}
if (ERROR_NONE != ec) {
AUTIL_LOG(INFO,
"[%s %d] getMaxMessageId failed for [%s]!",
_config.topicName.c_str(),
_partitionId,
ErrorCode_Name(ec).c_str());
return ec;
}
_partitionStatus.refreshTime = _lastRefreshMsgIdTime;
_partitionStatus.maxMessageId = response->msgid();
_partitionStatus.maxMessageTimestamp = response->timestamp();
return ec;
}
void SwiftSinglePartitionReader::setRequiredFieldNames(const vector<string> &fieldNames) {
_requiredFieldNames = fieldNames;
_buffer.updateFilter(_config.swiftFilter, _requiredFieldNames, _filedFilterDesc);
ErrorCode ec = _transportAdapter->ignoreLastResponse();
checkErrorCode(ec);
Message msg;
if (!_buffer.read(msg)) {
return;
}
seekByMessageId(msg.msgid());
}
vector<string> SwiftSinglePartitionReader::getRequiredFieldNames() { return _requiredFieldNames; }
void SwiftSinglePartitionReader::setDecompressThreadPool(autil::ThreadPoolPtr decompressThreadPool) {
_buffer.setDecompressThreadPool(decompressThreadPool);
}
void SwiftSinglePartitionReader::setFieldFilterDesc(const std::string &fieldFilterDesc) {
_filedFilterDesc = fieldFilterDesc;
_buffer.updateFilter(_config.swiftFilter, _requiredFieldNames, _filedFilterDesc);
ErrorCode ec = _transportAdapter->ignoreLastResponse();
checkErrorCode(ec);
Message msg;
if (!_buffer.read(msg)) {
return;
}
seekByMessageId(msg.msgid());
}
string SwiftSinglePartitionReader::getFieldFilterDesc() { return _filedFilterDesc; }
void SwiftSinglePartitionReader::setTimestampLimit(int64_t timestampLimit) {
assert(timestampLimit >= _lastMsgTimestamp);
_timestampLimit = timestampLimit;
}
int64_t SwiftSinglePartitionReader::getTimestampLimit() const { return _timestampLimit; }
int64_t SwiftSinglePartitionReader::getLastMsgTimestamp() { return _lastMsgTimestamp; }
void SwiftSinglePartitionReader::setCallBackFunc(const std::function<protocol::ErrorCode(int64_t)> &func) {
_func = func;
}
bool SwiftSinglePartitionReader::updateCommittedCheckpoint(int64_t checkpoint) {
if (_clientCommittedCheckpoint > checkpoint) {
AUTIL_LOG(WARN, "update checkpoint [%ld] is small than current [%ld].", checkpoint, _clientCommittedCheckpoint);
return false;
}
_clientCommittedCheckpoint = checkpoint;
bool isSent = false;
tryFillBuffer(autil::TimeUtility::currentTime(), true, isSent);
if (isSent) {
AUTIL_LOG(INFO,
"send update checkpoint [%ld] to [%s %d] success.",
checkpoint,
_config.topicName.c_str(),
_partitionId);
return true;
} else {
AUTIL_LOG(WARN,
"send update checkpoint [%ld] to [%s %d] fail, will update in next read.",
checkpoint,
_config.topicName.c_str(),
_partitionId);
return false;
}
}
bool SwiftSinglePartitionReader::mayWaitForRetry(ErrorCode ec) {
ScopedReadLock rlock(_partStatusLock);
return (_nextMsgId > _partitionStatus.maxMessageId &&
(!_isTopicLongPollingEnabled ||
(_isTopicLongPollingEnabled && ERROR_NONE != ec && ec != ERROR_CLIENT_NO_MORE_MESSAGE)));
}
std::pair<uint32_t, uint32_t> SwiftSinglePartitionReader::getFilterRange() const {
return std::make_pair(_rangeFrom, _rangeTo);
}
void SwiftSinglePartitionReader::setFilterRange(uint32_t from, uint32_t to) {
_rangeFrom = from;
_rangeTo = to;
}
} // namespace client
} // namespace swift