aios/suez/sdk/TableWriter.cpp (372 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "suez/sdk/TableWriter.h"
#include <future_lite/uthread/Latch.h>
#include "RawDocument2SwiftFieldFilter.h"
#include "autil/Log.h"
#include "autil/StringUtil.h"
#include "autil/TimeUtility.h"
#include "autil/result/Errors.h"
#include "build_service/config/ResourceReader.h"
#include "build_service/proto/BasicDefs.pb.h"
#include "build_service/proto/ParserConfig.h"
#include "build_service/reader/DocumentSeparators.h"
#include "build_service/reader/ParserCreator.h"
#include "build_service/reader/RawDocumentBuilder.h"
#include "build_service/util/LocatorUtil.h"
#include "build_service/util/SwiftClientCreator.h"
#include "indexlib/document/document_factory_wrapper.h"
#include "indexlib/document/raw_document/raw_document_define.h"
#include "indexlib/document/raw_document_parser.h"
#include "indexlib/framework/ITablet.h"
#include "indexlib/framework/Locator.h"
#include "indexlib/framework/TabletInfos.h"
#include "indexlib/indexlib.h"
#include "kmonitor/client/MetricType.h"
#include "kmonitor/client/MetricsReporter.h"
#include "suez/table/wal/CommonDefine.h"
#include "suez/table/wal/WALConfig.h"
#include "suez/table/wal/WALStrategy.h"
namespace suez {
AUTIL_DECLARE_AND_SETUP_LOGGER(suez, TableWriter);
using autil::result::RuntimeError;
struct WriteMetricsCollector {
WriteMetricsCollector() { startTime = autil::TimeUtility::currentTimeInMicroSeconds(); }
void writeEnd() {
auto endTime = autil::TimeUtility::currentTimeInMicroSeconds();
latency = endTime - startTime;
}
int64_t startTime = 0;
size_t inMessageCount = 0;
int64_t latency = 0;
int64_t parseLatency = 0;
int64_t logLatency = 0;
int64_t buildLatency = 0;
int64_t buildGap = 0;
bool writeSuccess = false;
};
class WriteMetrics : public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager *manager) override {
REGISTER_GAUGE_MUTABLE_METRIC(_inMessageCount, "inMessageCount");
REGISTER_LATENCY_MUTABLE_METRIC(_latency, "latency");
REGISTER_LATENCY_MUTABLE_METRIC(_parseLatency, "parseLatency");
REGISTER_LATENCY_MUTABLE_METRIC(_logLatency, "logLatency");
REGISTER_LATENCY_MUTABLE_METRIC(_buildLatency, "buildLatency");
REGISTER_LATENCY_MUTABLE_METRIC(_buildGap, "buildGap");
REGISTER_QPS_MUTABLE_METRIC(_buildGap0, "buildGap0");
REGISTER_QPS_MUTABLE_METRIC(_buildGap10, "buildGapIn10ms");
REGISTER_QPS_MUTABLE_METRIC(_buildGap100, "buildGapIn100ms");
REGISTER_QPS_MUTABLE_METRIC(_buildGap1000, "buildGapIn1000ms");
REGISTER_QPS_MUTABLE_METRIC(_buildGapLarge1000, "buildGapLarge1000ms");
REGISTER_QPS_MUTABLE_METRIC(_inQps, "inQps");
REGISTER_QPS_MUTABLE_METRIC(_errorQps, "errorQps");
REGISTER_QPS_MUTABLE_METRIC(_emptyQps, "emptyQps");
return true;
}
void report(const kmonitor::MetricsTags *tags, WriteMetricsCollector *collector) {
REPORT_MUTABLE_METRIC(_inMessageCount, collector->inMessageCount);
REPORT_MUTABLE_METRIC(_latency, collector->latency);
if (collector->parseLatency > 0) {
REPORT_MUTABLE_METRIC(_parseLatency, collector->parseLatency);
}
if (collector->logLatency > 0) {
REPORT_MUTABLE_METRIC(_logLatency, collector->logLatency);
}
if (collector->buildLatency > 0) {
REPORT_MUTABLE_METRIC(_buildLatency, collector->buildLatency);
}
if (collector->buildGap > 0) {
if (collector->buildGap <= 10000) {
REPORT_MUTABLE_QPS(_buildGap10);
} else if (collector->buildGap <= 100000) {
REPORT_MUTABLE_QPS(_buildGap100);
} else if (collector->buildGap <= 1000000) {
REPORT_MUTABLE_QPS(_buildGap1000);
} else {
REPORT_MUTABLE_QPS(_buildGapLarge1000);
}
REPORT_MUTABLE_METRIC(_buildGap, collector->buildGap);
} else {
REPORT_MUTABLE_QPS(_buildGap0);
}
REPORT_MUTABLE_QPS(_inQps);
if (!collector->writeSuccess) {
REPORT_MUTABLE_QPS(_errorQps);
} else if (collector->inMessageCount == 0) {
REPORT_MUTABLE_QPS(_emptyQps);
}
}
private:
kmonitor::MutableMetric *_inMessageCount = nullptr;
kmonitor::MutableMetric *_latency = nullptr;
kmonitor::MutableMetric *_parseLatency = nullptr;
kmonitor::MutableMetric *_logLatency = nullptr;
kmonitor::MutableMetric *_buildLatency = nullptr;
kmonitor::MutableMetric *_buildGap = nullptr;
kmonitor::MutableMetric *_buildGap0 = nullptr;
kmonitor::MutableMetric *_buildGap10 = nullptr;
kmonitor::MutableMetric *_buildGap100 = nullptr;
kmonitor::MutableMetric *_buildGap1000 = nullptr;
kmonitor::MutableMetric *_buildGapLarge1000 = nullptr;
kmonitor::MutableMetric *_inQps = nullptr;
kmonitor::MutableMetric *_errorQps = nullptr;
kmonitor::MutableMetric *_emptyQps = nullptr;
};
TableWriter::TableWriter() {}
TableWriter::~TableWriter() { stop(); }
bool TableWriter::init(const build_service::proto::PartitionId &pid,
const std::string &configPath, // for document parser
const kmonitor::MetricsReporterPtr &reporter,
const WALConfig &walConfig,
const std::shared_ptr<build_service::util::SwiftClientCreator> &swiftClientCreator) {
_pid = std::make_unique<build_service::proto::PartitionId>(pid);
_reporter = reporter;
_walConfig = std::make_unique<WALConfig>(walConfig);
_walConfig->desc = _pid->range().ShortDebugString();
_walConfig->range = std::make_pair(_pid->range().from(), _pid->range().to());
_swiftClientCreator = swiftClientCreator;
auto resourceReader = std::make_shared<build_service::config::ResourceReader>(configPath);
_documentFactoryWrapper = indexlib::document::DocumentFactoryWrapper::CreateDocumentFactoryWrapper(
nullptr, indexlib::config::CUSTOMIZED_DOCUMENT_CONFIG_RAWDOCUMENT, resourceReader->getPluginPath());
if (!_documentFactoryWrapper) {
AUTIL_LOG(ERROR, "%s: init DocumentFactoryWrapper failed", _pid->ShortDebugString().c_str());
return false;
}
// 容忍init wal失败,避免swift挂掉雪崩,当前实现是在rpc请求里重试,可能需要一个后台线程来自动重连
autil::ScopedLock lock(_mutex);
maybeInitWALLocked();
return true;
}
void TableWriter::setEnableWrite(bool flag) {
bool enableWrite = _enableWrite.exchange(flag);
if (enableWrite && !flag) {
AUTIL_LOG(INFO, "%s: writer changed from enabled to disabled", _pid->ShortDebugString().c_str());
autil::ScopedLock lock(_mutex);
if (_wal) {
AUTIL_LOG(INFO, "%s: begin flushing log", _pid->ShortDebugString().c_str());
_wal->flush();
AUTIL_LOG(INFO, "%s: finish flushing log", _pid->ShortDebugString().c_str());
}
}
AUTIL_LOG(INFO, "%s: writer is %s", _pid->ShortDebugString().c_str(), flag ? "enabled" : "disabled");
}
void TableWriter::write(const std::string &format,
const WalDocVector &docs,
const std::function<void(autil::Result<WriteResult>)> &done,
future_lite::Executor *executor) {
if (!_enableWrite) {
done(RuntimeError::make("%s is disabled, can not write", _pid->ShortDebugString().c_str()));
return;
}
auto deleter = [this](WriteMetricsCollector *collector) {
if (collector != nullptr) {
collector->writeEnd();
if (_reporter) {
kmonitor::MetricsTags tags{{{"table_name", _pid->clusternames(0)}, {"role_type", _roleType}}};
auto reporter = _reporter->getSubReporter("", tags);
reporter->report<WriteMetrics>(nullptr, collector);
}
delete collector;
}
};
std::shared_ptr<WriteMetricsCollector> collector(new WriteMetricsCollector(), std::move(deleter));
collector->inMessageCount = docs.size();
autil::ScopedTime2 parseT;
auto result = parseWalDocs(format, docs);
collector->parseLatency = parseT.done_us();
if (!result.is_ok()) {
done(std::move(result).steal_error());
return;
}
auto walDocs = std::move(result).steal_value();
if (walDocs.empty()) {
// empty qps
collector->writeSuccess = true;
done({});
return;
}
auto logStartTime = autil::TimeUtility::currentTimeInMicroSeconds();
auto logDone =
[this, logStartTime, collector_ = std::move(collector), done](autil::Result<std::vector<int64_t>> ret) {
collector_->logLatency = autil::TimeUtility::currentTimeInMicroSeconds() - logStartTime;
if (!ret.is_ok()) {
done(std::move(ret).steal_error());
return;
}
collector_->writeSuccess = true;
collector_->writeEnd();
auto values = std::move(ret).steal_value();
updateLatestLogOffset(values.back());
// TODO: maybe wait build to hold read on write
WriteResult result;
fillWriteResult(result, collector_.get());
collector_->buildGap = result.watermark.buildGap;
done(result);
};
if (executor != nullptr) {
autil::Result<std::vector<int64_t>> logResult;
future_lite::uthread::Latch latch(1);
auto latchDone = [&latch, &logResult](autil::Result<std::vector<int64_t>> ret) {
logResult = std::move(ret);
latch.downCount();
};
DoWrite(walDocs, latchDone);
latch.await(executor);
logDone(std::move(logResult));
} else {
DoWrite(walDocs, logDone);
}
}
void TableWriter::DoWrite(const WalDocVector &docs,
const std::function<void(autil::Result<std::vector<int64_t>>)> &done) {
autil::ScopedLock lock(_mutex);
maybeInitWALLocked();
if (!_wal) {
done(RuntimeError::make("%s wal invalid", _pid->ShortDebugString().c_str()));
return;
}
_wal->log(docs, done);
}
void TableWriter::updateSchema(uint32_t version,
const std::string &configPath,
std::function<void(autil::Result<int64_t>)> done) {
AUTIL_LOG(INFO, "update schema, version: [%u], config path: [%s]", version, configPath.c_str());
if (!_enableWrite) {
done(RuntimeError::make("%s is disabled, can not write", _pid->ShortDebugString().c_str()));
return;
}
std::string buildIdStr;
if (!_pid->buildid().SerializeToString(&buildIdStr)) {
done(RuntimeError::make("%s buildid serialize failed, can not write", _pid->ShortDebugString().c_str()));
return;
}
build_service::reader::RawDocumentBuilder builder(build_service::reader::RAW_DOCUMENT_HA3_SEP_PREFIX,
build_service::reader::RAW_DOCUMENT_HA3_SEP_SUFFIX,
build_service::reader::RAW_DOCUMENT_HA3_FIELD_SEP,
build_service::reader::RAW_DOCUMENT_HA3_KV_SEP);
builder.addField(autil::StringView(indexlib::document::CMD_TAG), indexlib::document::CMD_ALTER);
builder.addField(autil::StringView(CONFIG_PATH_KEY), configPath);
builder.addField(autil::StringView(SCHEMA_VERSION_KEY), std::to_string(version));
builder.addField(autil::StringView(BUILD_ID_KEY), buildIdStr);
auto docStr = builder.finalize().to_string();
auto walDocs = parseWalDocs(build_service::reader::RAW_DOCUMENT_HA3_DOCUMENT_FORMAT,
{{_pid->range().from(), std::move(docStr)}});
if (!walDocs.is_ok()) {
done(std::move(walDocs).steal_error());
return;
}
autil::ScopedLock lock(_mutex);
maybeInitWALLocked();
if (!_wal) {
done(RuntimeError::make("%s wal invalid", _pid->ShortDebugString().c_str()));
return;
}
auto logDone = [this, done_ = std::move(done)](autil::Result<std::vector<int64_t>> ret) {
if (!ret.is_ok()) {
done_(std::move(ret).steal_error());
return;
}
auto values = std::move(ret).steal_value();
updateLatestLogOffset(values.back());
done_(values.back());
};
_wal->log(std::move(walDocs).steal_value(), std::move(logDone));
AUTIL_LOG(INFO, "update schema success, config path [%s]", configPath.c_str());
}
void TableWriter::stop() {
autil::ScopedLock lock(_mutex);
bool enableWrite = _enableWrite.exchange(false);
if (enableWrite && _wal) {
_wal->flush();
}
if (_wal) {
_wal->stop();
_wal.reset();
}
updateIndex(nullptr);
}
void TableWriter::updateIndex(const std::shared_ptr<indexlibv2::framework::ITablet> &index) {
autil::ScopedWriteLock lock(_indexLock);
_index = index;
}
std::shared_ptr<indexlibv2::framework::ITablet> TableWriter::getIndex() const {
autil::ScopedReadLock lock(_indexLock);
return _index;
}
void TableWriter::maybeInitWALLocked() {
if (_wal) {
return;
}
auto wal = WALStrategy::create(*_walConfig, _swiftClientCreator);
if (!wal) {
AUTIL_LOG(ERROR,
"%s: create wal with %s failed",
_pid->ShortDebugString().c_str(),
FastToJsonString(*_walConfig, true).c_str());
return;
}
_wal = std::move(wal);
}
autil::Result<WalDocVector> TableWriter::parseWalDocs(const std::string &format, const WalDocVector &docs) {
// parse
auto parser = getParser(format);
if (!parser) {
return RuntimeError::make("%s unknown format %s", _pid->ShortDebugString().c_str(), format.c_str());
}
std::vector<indexlib::document::RawDocumentPtr> rawDocs;
rawDocs.reserve(docs.size());
int64_t currentTime = autil::TimeUtility::currentTimeInMicroSeconds();
auto currentTimeStr = autil::StringUtil::toString(currentTime);
for (const auto &doc : docs) {
indexlib::document::RawDocumentPtr rawDoc(_documentFactoryWrapper->CreateRawDocument());
if (!parser->parse(doc.second, *rawDoc)) {
return RuntimeError::make("%s parse %s failed", _pid->ShortDebugString().c_str(), doc.second.c_str());
}
rawDoc->setDocTimestamp(currentTime);
rawDoc->setField(HA_RESERVED_TIMESTAMP, currentTimeStr);
rawDocs.emplace_back(std::move(rawDoc));
}
// format
WalDocVector walDocs;
walDocs.reserve(rawDocs.size());
auto formatDocs = convertRawDocument2SwiftFieldFilter(rawDocs);
for (auto i = 0; i < docs.size(); ++i) {
walDocs.emplace_back(docs[i].first, std::move(formatDocs[i]));
}
return std::move(walDocs);
}
std::unique_ptr<indexlib::document::RawDocumentParser> TableWriter::getParser(const std::string &format) {
// TODO: maybe cache
build_service::proto::ParserConfig parserConfig;
parserConfig.type = format;
build_service::reader::ParserCreator parserCreator;
std::unique_ptr<indexlib::document::RawDocumentParser> parser;
parser.reset(parserCreator.createSingleParser(parserConfig));
if (!parser) {
AUTIL_LOG(ERROR,
"%s: create parser with format [%s] failed, error: %s",
_pid->ShortDebugString().c_str(),
format.c_str(),
parserCreator.getLastError().c_str());
return nullptr;
}
return parser;
}
void TableWriter::updateLatestLogOffset(int64_t latestLogOffset) {
_latestLogOffset.store(latestLogOffset, std::memory_order_relaxed);
}
int64_t TableWriter::getLatestLogOffset() const { return _latestLogOffset.load(std::memory_order_relaxed); }
void TableWriter::fillWriteResult(WriteResult &result, const WriteMetricsCollector *collector) const {
result.stat.inMessageCount = collector->inMessageCount;
result.stat.parseLatency = collector->parseLatency;
result.stat.logLatency = collector->logLatency;
result.stat.buildLatency = collector->buildLatency;
// fill watermark
result.watermark.maxCp = getLatestLogOffset();
auto index = getIndex();
if (!index) {
result.state = WriterState::LOG;
return;
}
result.state = WriterState::ASYNC; // TODO: support sync
auto locator = index->GetTabletInfos()->GetLatestLocator();
if (locator.IsValid()) {
result.watermark.buildLocatorOffset = build_service::util::LocatorUtil::getSwiftWatermark(locator);
}
if (result.watermark.maxCp > 0) {
result.watermark.buildGap = result.watermark.maxCp - std::max(0L, result.watermark.buildLocatorOffset);
}
}
} // namespace suez