aios/suez/service/TableServiceImpl.cpp (963 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "suez/service/TableServiceImpl.h" #include <functional> #include <memory> #include <tuple> #include <utility> #include "autil/ClosureGuard.h" #include "autil/EnvUtil.h" #include "autil/RangeUtil.h" #include "autil/Scope.h" #include "autil/TimeUtility.h" #include "autil/legacy/base64.h" #include "indexlib/base/FieldType.h" #include "indexlib/base/FieldTypeUtil.h" #include "indexlib/base/Types.h" #include "indexlib/config/TabletSchema.h" #include "indexlib/config/legacy_schema_adapter.h" #include "indexlib/framework/ITablet.h" #include "indexlib/framework/TabletInfos.h" #include "indexlib/index/kv/KVIndexReader.h" #include "indexlib/indexlib/tools/partition_querier/executors/IndexTableExecutor.h" #include "indexlib/indexlib/tools/partition_querier/executors/KkvTableExecutor.h" #include "indexlib/indexlib/tools/partition_querier/executors/KvTableExecutor.h" #include "indexlib/table/BuiltinDefine.h" #include "indexlib/util/ProtoJsonizer.h" #include "kmonitor/client/MetricsReporter.h" #include "kmonitor/client/core/MetricsTags.h" #include "suez/sdk/RpcServer.h" #include "suez/sdk/TableWriter.h" #include "suez/service/KVTableSearcher.h" #include "suez/service/SchemaUtil.h" #include "suez/service/TableServiceHelper.h" #include "suez/service/WriteDone.h" #include "suez/service/WriteTableAccessLog.h" using namespace std; using namespace autil; using namespace indexlib; using namespace indexlib::config; using namespace kmonitor; using indexlib::Status; using indexlib::partition::IndexPartitionPtr; using indexlib::tools::IndexTableExecutor; using indexlib::tools::KkvTableExecutor; using indexlib::tools::KvTableExecutor; using indexlibv2::base::PartitionQuery; using indexlibv2::base::PartitionResponse; namespace suez { constexpr char TABLE_SERVICE_TOPO_NAME[] = "table_service"; constexpr char ZONE_BIZ_NAME_SPLITTER[] = "."; AUTIL_LOG_SETUP(suez, TableServiceImpl); TableServiceImpl::TableServiceImpl() {} TableServiceImpl::~TableServiceImpl() {} bool TableServiceImpl::init(const SearchInitParam &initParam) { const auto &kmonMetaInfo = initParam.kmonMetaInfo; _metricsReporter.reset(new kmonitor::MetricsReporter(kmonMetaInfo.metricsPrefix, "", {}, "table_service")); _executor = initParam.asyncIntraExecutor; _rpcServer = initParam.rpcServer; _enablePublishTopoInfo = autil::EnvUtil::getEnv("ENABLE_PUBLISH_TABLE_TOPO_INFO", false); return initParam.rpcServer->RegisterService(this); } UPDATE_RESULT TableServiceImpl::update(const suez::UpdateArgs &updateArgs, UpdateIndications &updateIndications, SuezErrorCollector &errorCollector) { bool enableUpdateTopoInfo = _enablePublishTopoInfo && needUpdateTopoInfo(updateArgs); setIndexProvider(updateArgs.indexProvider); if (enableUpdateTopoInfo && !updateTopoInfo(updateArgs)) { return UR_NEED_RETRY; } return UR_REACH_TARGET; } void TableServiceImpl::stopService() { AUTIL_LOG(INFO, "stop table service."); setIndexProvider(nullptr); } void TableServiceImpl::stopWorker() { AUTIL_LOG(INFO, "stop worker."); stopService(); } bool TableServiceImpl::needUpdateTopoInfo(const UpdateArgs &updateArgs) const { assert(updateArgs.indexProvider); if (!_indexProvider) { return true; } if (*_indexProvider == *(updateArgs.indexProvider)) { return false; } else { return true; } } static regionid_t getRegionId(indexlibv2::config::ITabletSchema *schema, const string &regionName) { if (regionName.empty()) { return DEFAULT_REGIONID; } auto legacySchemaAdapter = dynamic_cast<indexlib::config::LegacySchemaAdapter *>(schema); if (!legacySchemaAdapter) { return DEFAULT_REGIONID; } return legacySchemaAdapter->GetLegacySchema()->GetRegionId(regionName); } #define COMMON_CHECK(tablename_func) \ TableServiceHelper::trySetGigEc(done, multi_call::MULTI_CALL_ERROR_NONE); \ ClosureGuard guard(done); \ auto indexProvider = getIndexProvider(); \ if (!indexProvider) { \ ERROR_THEN_RETURN(done, \ TBS_ERROR_SERVICE_NOT_READY, \ "service not ready", \ kQueryTable, \ kUnknownTable, \ _metricsReporter.get()); \ } \ if (!request->has_##tablename_func() || request->tablename_func().empty()) { \ ERROR_THEN_RETURN( \ done, TBS_ERROR_NO_TABLE, "no table specified.", kQueryTable, kUnknownTable, _metricsReporter.get()); \ } #define ERROR_THEN_RETURN(done, error_code, msg, queryType, tableName, metricsReporter) \ do { \ TableServiceHelper::trySetGigEc(done, multi_call::MULTI_CALL_REPLY_ERROR_RESPONSE); \ TableServiceHelper::setErrorInfo( \ response->mutable_errorinfo(), done, error_code, msg, queryType, tableName, metricsReporter); \ return; \ } while (false) void TableServiceImpl::querySchema(google::protobuf::RpcController *controller, const SchemaQueryRequest *request, SchemaQueryResponse *response, google::protobuf::Closure *done) { int64_t startTime = TimeUtility::currentTime(); COMMON_CHECK(tablename); const auto &tableName = request->tablename(); std::shared_ptr<indexlibv2::config::ITabletSchema> schema; auto tablets = indexProvider->multiTableReader.getTablets(tableName); if (!tablets.empty()) { auto firstTablet = tablets[0]; schema = firstTablet->GetTabletSchema(); } else { auto indexPartitions = indexProvider->multiTableReader.getIndexPartitions(tableName); if (indexPartitions.empty()) { ERROR_THEN_RETURN(done, TBS_ERROR_NO_TABLE, "can't get index partition of table", kQuerySchema, tableName, _metricsReporter.get()); } auto indexPartitionPtr = indexPartitions.begin()->second; schema = std::make_shared<indexlib::config::LegacySchemaAdapter>(indexPartitionPtr->GetSchema()); } ErrorInfo errInfo; ResultSchema resultSchema; if (request->has_jsonschema() && request->jsonschema() == true) { errInfo = SchemaUtil::getJsonSchema(schema, resultSchema); } else { errInfo = SchemaUtil::getPbSchema(schema, "", resultSchema); } *(response->mutable_res()) = resultSchema; if (errInfo.errorcode() != TBS_ERROR_NONE) { *(response->mutable_errorinfo()) = errInfo; } if (nullptr != _metricsReporter) { MetricsTags baseTags = {{{kQueryType, kQuerySchema}, {kTableName, tableName}}}; int64_t latency = TimeUtility::currentTime() - startTime; _metricsReporter->report(latency, kLatency, kmonitor::GAUGE, &baseTags); _metricsReporter->report(1, kQps, kmonitor::QPS, &baseTags); } } void TableServiceImpl::queryVersion(google::protobuf::RpcController *controller, const VersionQueryRequest *request, VersionQueryResponse *response, google::protobuf::Closure *done) { COMMON_CHECK(tablename); const auto &tableName = request->tablename(); SingleTableReaderPtr tableReader; if (request->partition().size() == 2) { RangeType from = request->partition()[0]; RangeType to = request->partition()[1]; tableReader = indexProvider->multiTableReader.getTableReaderByRange(from, to, tableName); if (tableReader == nullptr) { ERROR_THEN_RETURN(done, TBS_ERROR_NO_TABLE, "not exist", kQueryVersion, tableName, _metricsReporter.get()); } } else { AUTIL_LOG(INFO, "query version use first table reader"); auto tableReaders = indexProvider->multiTableReader.getTableReaders(tableName); if (tableReaders.empty()) { ERROR_THEN_RETURN(done, TBS_ERROR_NO_TABLE, "not exist", kQueryVersion, tableName, _metricsReporter.get()); } tableReader = tableReaders.begin()->second; } auto indexPartition = tableReader->getIndexPartition(); if (indexPartition) { auto version = indexPartition->GetReader()->GetVersion(); response->set_publishversioninfo(version.ToString()); } else { auto tablet = tableReader->getTablet(); if (!tablet) { ERROR_THEN_RETURN(done, TBS_ERROR_NOT_SUPPORTED, "table type unsupported", kQueryVersion, tableName, _metricsReporter.get()); } else { auto tabletInfo = tablet->GetTabletInfos(); response->set_publishversioninfo(tabletInfo->GetLoadedPublishVersion().ToString()); response->set_privateversioninfo(tabletInfo->GetLoadedPrivateVersion().ToString()); response->set_doccount(tabletInfo->GetTabletDocCount()); auto buildLocator = tabletInfo->GetBuildLocator(); auto *buildLocatorProto = response->mutable_buildlocator(); buildLocatorProto->set_src(buildLocator.GetSrc()); buildLocatorProto->set_offset(buildLocator.GetOffset().first); auto *commitLocatorProto = response->mutable_lastcommittedlocator(); auto commitLocator = tabletInfo->GetBuildLocator(); commitLocatorProto->set_src(commitLocator.GetSrc()); commitLocatorProto->set_offset(commitLocator.GetOffset().first); } } } void TableServiceImpl::queryTable(google::protobuf::RpcController *controller, const TableQueryRequest *request, TableQueryResponse *response, google::protobuf::Closure *done) { int64_t startTime = TimeUtility::currentTime(); COMMON_CHECK(table); const string &tableName = request->table(); SingleTableReaderPtr singleTableReader; if (request->partition().empty()) { SingleTableReaderMap readerMap = indexProvider->multiTableReader.getTableReaders(tableName); if (readerMap.empty()) { ERROR_THEN_RETURN(done, TBS_ERROR_NO_TABLE, "table[" + tableName + "] not found", kQueryTable, tableName, _metricsReporter.get()); } auto pid = readerMap.begin()->first; singleTableReader = readerMap.begin()->second; if (readerMap.size() > 1) { AUTIL_LOG(WARN, "table[%s] has [%zu] partitions, but no partition specified, use first partition[%d,%d]", tableName.c_str(), readerMap.size(), pid.from, pid.to); } } else if (2 == request->partition().size()) { RangeType from = request->partition()[0]; RangeType to = request->partition()[1]; singleTableReader = indexProvider->multiTableReader.getTableReaderByRange(from, to, tableName); } else { ERROR_THEN_RETURN( done, TBS_ERROR_NO_TABLE, "invalid partitions", kQueryTable, tableName, _metricsReporter.get()); } if (nullptr == singleTableReader) { ERROR_THEN_RETURN( done, TBS_ERROR_NO_TABLE, "cannot find table", kQueryTable, tableName, _metricsReporter.get()); } const auto query = convertRequestToPartitionQuery(request); PartitionResponse partitionResponse; indexlib::Status status; std::shared_ptr<indexlibv2::config::ITabletSchema> schema; auto tablet = singleTableReader->getTablet(); auto indexPartition = singleTableReader->getIndexPartition(); if (nullptr != tablet) { status = queryTablet(tablet, query, partitionResponse, schema); } else { status = queryIndexPartition(indexPartition, request, query, partitionResponse, schema); } ErrorInfo errorInfo = convertStatusToErrorInfo(status); if (errorInfo.errorcode() != TBS_ERROR_NONE) { AUTIL_LOG(ERROR, "SuezTableService Query Failed. status msg: %s", errorInfo.errormsg().c_str()); ERROR_THEN_RETURN( done, errorInfo.errorcode(), errorInfo.errormsg(), kQueryTable, tableName, _metricsReporter.get()); } convertPartitionResponseToResponse(request, partitionResponse, response); auto &tableQueryResult = *response->mutable_res(); tableQueryResult.set_tablename(tableName); if (request->has_showschema() && request->showschema()) { ResultSchema resultSchema; SchemaUtil::getPbSchema(schema, "", resultSchema); *(tableQueryResult.mutable_resultschema()) = resultSchema; } if (tableQueryResult.docids().empty() && tableQueryResult.docvalueset().empty() && !tableQueryResult.has_metaresult()) { ERROR_THEN_RETURN( done, TBS_ERROR_NO_RECORD, "no record found.", kQueryTable, tableName, _metricsReporter.get()); } if (!tableQueryResult.docvalueset().empty()) { tableQueryResult.set_resultcount(tableQueryResult.docvalueset_size()); } else { tableQueryResult.set_resultcount(tableQueryResult.docids_size()); } if (nullptr != _metricsReporter) { MetricsTags baseTags = {{{kQueryType, kQueryTable}, {kTableName, tableName}}}; int64_t latency = TimeUtility::currentTime() - startTime; _metricsReporter->report(latency, kLatency, kmonitor::GAUGE, &baseTags); _metricsReporter->report(1, kQps, kmonitor::QPS, &baseTags); _metricsReporter->report(tableQueryResult.resultcount(), kDocCount, kmonitor::GAUGE, &baseTags); } } void TableServiceImpl::simpleQuery(google::protobuf::RpcController *controller, const SimpleQueryRequest *request, SimpleQueryResponse *response, google::protobuf::Closure *done) { int64_t startTime = TimeUtility::currentTime(); COMMON_CHECK(table); const string &tableName = request->table(); SingleTableReaderPtr singleTableReader; if (request->partition().empty()) { SingleTableReaderMap readerMap = indexProvider->multiTableReader.getTableReaders(tableName); if (readerMap.empty()) { ERROR_THEN_RETURN(done, TBS_ERROR_NO_TABLE, "table[" + tableName + "] not found", kQueryTable, tableName, _metricsReporter.get()); } auto pid = readerMap.begin()->first; singleTableReader = readerMap.begin()->second; if (readerMap.size() > 1) { AUTIL_LOG(WARN, "table[%s] has [%zu] partitions, but no partition specified, use first partition[%d,%d]", tableName.c_str(), readerMap.size(), pid.from, pid.to); } } else if (2 == request->partition().size()) { RangeType from = request->partition()[0]; RangeType to = request->partition()[1]; singleTableReader = indexProvider->multiTableReader.getTableReaderByRange(from, to, tableName); } else { ERROR_THEN_RETURN( done, TBS_ERROR_INVALID_PARAMETER, "invalid partitions", kQueryTable, tableName, _metricsReporter.get()); } if (nullptr == singleTableReader) { ERROR_THEN_RETURN( done, TBS_ERROR_NO_TABLE, "cannot find table", kQueryTable, tableName, _metricsReporter.get()); } auto tablet = singleTableReader->getTablet(); if (tablet == nullptr) { ERROR_THEN_RETURN(done, TBS_ERROR_NO_TABLE, "tablet is null", kQueryTable, tableName, _metricsReporter.get()); } auto tabletReader = tablet->GetTabletReader(); std::string query = std::string(request->request().data(), request->request().size()); std::string result; auto status = tabletReader->Search(query, result); if (!status.IsOK()) { ERROR_THEN_RETURN( done, TBS_ERROR_GET_FAILED, "simple query failed", kQueryTable, tableName, _metricsReporter.get()); } response->mutable_errorinfo()->set_errorcode(TBS_ERROR_NONE); response->mutable_errorinfo()->set_errormsg(""); response->set_response(std::move(result)); if (nullptr != _metricsReporter) { MetricsTags baseTags = {{{kQueryType, kQueryTable}, {kTableName, tableName}}}; int64_t latency = TimeUtility::currentTime() - startTime; _metricsReporter->report(latency, kLatency, kmonitor::GAUGE, &baseTags); _metricsReporter->report(1, kQps, kmonitor::QPS, &baseTags); } } Status TableServiceImpl::queryTablet(shared_ptr<indexlibv2::framework::ITablet> tablet, const PartitionQuery &query, PartitionResponse &partitionResponse, std::shared_ptr<indexlibv2::config::ITabletSchema> &schema) { std::string queryStr; RETURN_STATUS_DIRECTLY_IF_ERROR(indexlib::util::ProtoJsonizer::ToJson(query, &queryStr)); schema = tablet->GetTabletSchema(); auto tabletReader = tablet->GetTabletReader(); if (!tabletReader) { return Status::InternalError("tablet reader is nullptr"); } std::string result; const std::string tableType = schema->GetTableType(); if (tableType == indexlib::table::TABLE_TYPE_NORMAL || tableType == indexlib::table::TABLE_TYPE_KV || tableType == indexlib::table::TABLE_TYPE_KKV ) { RETURN_STATUS_DIRECTLY_IF_ERROR(tabletReader->Search(queryStr, result)); RETURN_STATUS_DIRECTLY_IF_ERROR(indexlib::util::ProtoJsonizer::FromJson(result, &partitionResponse)); return Status::OK(); } return Status::InvalidArgs("table schema type [%s] not supported", tableType.c_str()); } Status TableServiceImpl::queryIndexPartition(IndexPartitionPtr indexPartitionPtr, const TableQueryRequest *request, const PartitionQuery &query, PartitionResponse &partitionResponse, std::shared_ptr<indexlibv2::config::ITabletSchema> &schema) { auto legacySchema = indexPartitionPtr->GetSchema(); assert(legacySchema); schema = std::make_shared<indexlib::config::LegacySchemaAdapter>(legacySchema); Status status; const auto reader = indexPartitionPtr->GetReader(); const auto &tableType = schema->GetTableType(); if (tableType == indexlib::table::TABLE_TYPE_NORMAL) { status = IndexTableExecutor::QueryIndex(reader, query, partitionResponse); } else if (tableType == indexlib::table::TABLE_TYPE_KV) { status = KvTableExecutor::QueryKVTable( reader, getRegionId(schema.get(), request->region()), query, partitionResponse); } else if (tableType == indexlib::table::TABLE_TYPE_KKV) { status = KkvTableExecutor::QueryKkvTable( reader, getRegionId(schema.get(), request->region()), query, partitionResponse); } else { status = Status::InvalidArgs("table schema type [%d] not supported", schema->GetTableType().c_str()); } return status; } void TableServiceImpl::writeTable(google::protobuf::RpcController *controller, const WriteRequest *request, WriteResponse *response, google::protobuf::Closure *done) { auto accessLog = make_shared<WriteTableAccessLog>(); accessLog->collectRequest(request); accessLog->collectClosure(done); accessLog->collectRPCController(controller); int64_t startTime = TimeUtility::currentTime(); TableServiceHelper::trySetGigEc(done, multi_call::MULTI_CALL_ERROR_NONE); ScopeGuard scopeGuard([response, accessLog, done]() { accessLog->collectResponse(response); done->Run(); }); auto indexProvider = getIndexProvider(); if (!indexProvider) { ERROR_THEN_RETURN(done, TBS_ERROR_SERVICE_NOT_READY, "service not ready", kWrite, request->tablename(), _metricsReporter.get()); } map<shared_ptr<TableWriter>, vector<pair<uint16_t, string>>> useTableWriters; map<shared_ptr<TableWriter>, vector<int>> tableWriterToDocIndex; for (int i = 0; i < request->writes().size(); i++) { auto write = request->writes()[i]; bool found = false; for (const auto &[pid, tableWriter] : indexProvider->tableWriters) { if (pid.getTableName() != request->tablename() || !pid.covers(write.hashid())) { continue; } useTableWriters[tableWriter].emplace_back(make_pair(write.hashid(), write.str())); tableWriterToDocIndex[tableWriter].emplace_back(i); found = true; break; } if (!found) { ERROR_THEN_RETURN(done, TBS_ERROR_OTHERS, "no valid table/range for WriteRequest table: " + request->tablename() + " hashid: " + to_string(write.hashid()), kWrite, request->tablename(), _metricsReporter.get()); } } scopeGuard.release(); auto singleWriteDone = make_shared<WriteDone>(request, response, done, _metricsReporter.get(), accessLog, startTime, std::move(indexProvider), request->writes().size()); for (const auto &useTableWriter : useTableWriters) { auto docIndexList = tableWriterToDocIndex[useTableWriter.first]; auto currentWriteDone = [singleWriteDone, docIndexList](Result<WriteResult> result) { singleWriteDone->run(std::move(result), docIndexList); }; useTableWriter.first->write(request->format(), useTableWriter.second, currentWriteDone, nullptr); } } void TableServiceImpl::updateSchema(google::protobuf::RpcController *controller, const UpdateSchemaRequest *request, UpdateSchemaResponse *response, google::protobuf::Closure *done) { TableServiceHelper::trySetGigEc(done, multi_call::MULTI_CALL_ERROR_NONE); ClosureGuard guard(done); AUTIL_LOG(INFO, "update schema for table [%s], part[%d]", request->tablename().c_str(), request->partids(0)); if (request->partids().size() != 1) { ERROR_THEN_RETURN(done, TBS_ERROR_UPDATE_SCHEMA, "only support update one part", kUpdateSchema, request->tablename(), _metricsReporter.get()); } auto indexProvider = getIndexProvider(); if (!indexProvider) { ERROR_THEN_RETURN(done, TBS_ERROR_SERVICE_NOT_READY, "service not ready", kUpdateSchema, request->tablename(), _metricsReporter.get()); } const auto &partId = request->partids(0); std::shared_ptr<TableWriter> tableWriter; string configPath; bool found = false; for (const auto &iter : indexProvider->tableWriters) { if (iter.first.getTableName() != request->tablename()) { continue; } // TODO: admin set index if (iter.first.index == partId) { tableWriter = iter.second; configPath = request->configpath(); found = true; break; } } if (!found) { ERROR_THEN_RETURN(done, TBS_ERROR_UPDATE_SCHEMA, "no valid table/part for update schema table: " + request->tablename() + " partid: " + to_string(partId), kUpdateSchema, request->tablename(), _metricsReporter.get()); } auto updateDone = [this, done = guard.steal(), request, response](autil::Result<int64_t> result) { if (!result.is_ok()) { AUTIL_LOG(ERROR, "update schema failed, error msg: %s", result.get_error().message().c_str()); TableServiceHelper::trySetGigEc(done, multi_call::MULTI_CALL_REPLY_ERROR_RESPONSE); TableServiceHelper::setErrorInfo(response->mutable_errorinfo(), done, TBS_ERROR_UPDATE_SCHEMA, "update schema failed, error msg: " + result.get_error().message(), kUpdateSchema, request->tablename(), _metricsReporter.get()); } done->Run(); }; tableWriter->updateSchema(request->schemaversion(), configPath, std::move(updateDone)); } void TableServiceImpl::getSchemaVersion(google::protobuf::RpcController *controller, const GetSchemaVersionRequest *request, GetSchemaVersionResponse *response, google::protobuf::Closure *done) { TableServiceHelper::trySetGigEc(done, multi_call::MULTI_CALL_ERROR_NONE); ClosureGuard guard(done); auto indexProvider = getIndexProvider(); if (!indexProvider) { ERROR_THEN_RETURN(done, TBS_ERROR_SERVICE_NOT_READY, "service not ready", kGetSchemaVersion, request->tablename(), _metricsReporter.get()); } if (request->partids().size() != 1) { ERROR_THEN_RETURN(done, TBS_ERROR_GET_SCHEMA_VERSION, "only support get one part", kGetSchemaVersion, request->tablename(), _metricsReporter.get()); } const auto &partId = request->partids(0); const auto &tableName = request->tablename(); if (tableName.empty()) { ERROR_THEN_RETURN( done, TBS_ERROR_NO_TABLE, "no table specified", kGetSchemaVersion, tableName, _metricsReporter.get()); } auto tablet = indexProvider->multiTableReader.getTabletByIdx(tableName, partId); if (tablet == nullptr) { ERROR_THEN_RETURN( done, TBS_ERROR_NO_TABLE, "tablet not found", kGetSchemaVersion, tableName, _metricsReporter.get()); } auto tabletReader = tablet->GetTabletReader(); if (!tabletReader) { ERROR_THEN_RETURN( done, TBS_ERROR_OTHERS, "fail to get tablet reader", kGetSchemaVersion, tableName, _metricsReporter.get()); } auto tabletSchema = tabletReader->GetSchema(); if (!tabletSchema) { ERROR_THEN_RETURN( done, TBS_ERROR_OTHERS, "fail to get schema", kGetSchemaVersion, tableName, _metricsReporter.get()); } auto schemaId = tabletSchema->GetSchemaId(); auto schemaVersion = response->add_schemaversions(); schemaVersion->set_partid(partId); schemaVersion->set_currentversionid(schemaId); } #define IF_NOT_OK_THEN_RETURN(IS_OK, MSG) \ do { \ if (unlikely(!(IS_OK))) { \ ERROR_THEN_RETURN(done, TBS_ERROR_OTHERS, MSG, kKvBatchGet, tableName, _metricsReporter.get()); \ } \ } while (0) void TableServiceImpl::kvBatchGet(google::protobuf::RpcController *controller, const KVBatchGetRequest *request, KVBatchGetResponse *response, google::protobuf::Closure *done) { COMMON_CHECK(tablename); response->set_notfoundcount(0); response->set_failedcount(0); int64_t startTime = TimeUtility::currentTime(); const string &tableName = request->tablename(); string indexName = request->indexname(); std::vector<std::string> attrs; for (size_t i = 0; i < request->attrs_size(); i++) { attrs.emplace_back(request->attrs(i)); } IF_NOT_OK_THEN_RETURN(_executor, "batch search need executor"); IF_NOT_OK_THEN_RETURN(request->inputkeys_size(), "input keys are empty "); // 构造searcher autil::mem_pool::Pool pool(1024); std::unique_ptr<LookupOptions> options = std::make_unique<LookupOptions>(attrs, request->timeoutinus(), _executor, &pool); auto searcher = std::make_shared<KVTableSearcher>(&(indexProvider->multiTableReader)); auto result = searcher->init(tableName, indexName); IF_NOT_OK_THEN_RETURN(result.is_ok(), "kv table searcher init failed, msg:" + result.get_error().message()); // 查询 if (request->resulttype() == KVBatchGetResultType::DEBUG) { // debug 接口 KVTableSearcher::InputKeys inputKeys; for (auto i = 0; i < request->inputkeys_size(); i++) { auto inputKey = request->inputkeys(i); inputKeys.emplace_back( make_pair<int, vector<string>>(inputKey.partid(), {inputKey.keys().begin(), inputKey.keys().end()})); } auto result = searcher->query(inputKeys, options.get()); IF_NOT_OK_THEN_RETURN(result.is_ok(), result.get_error().message()); auto values = result.get(); *response->mutable_values() = {values.begin(), values.end()}; } else { // bytes和table // 构造inputHashKeys KVTableSearcher::InputHashKeys inputHashKeys; for (auto i = 0; i < request->inputkeys_size(); i++) { auto inputKey = request->inputkeys(i); auto partId = inputKey.partid(); if (request->usehashkey()) { auto hashKeys = inputKey.hashkeys(); inputHashKeys.emplace_back( make_pair<int, vector<uint64_t>>(partId, {hashKeys.begin(), hashKeys.end()})); } else { auto hashKeys = searcher->genHashKeys(partId, {inputKey.keys().begin(), inputKey.keys().end()}); IF_NOT_OK_THEN_RETURN(hashKeys.is_ok(), hashKeys.get_error().message()); inputHashKeys.emplace_back(make_pair((int)partId, hashKeys.get())); } } auto lookupResult = searcher->lookup(inputHashKeys, options.get()); IF_NOT_OK_THEN_RETURN(lookupResult.is_ok(), "look up failed, errormsg: " + lookupResult.get_error().message()); if (request->resulttype() == KVBatchGetResultType::FLATBYTES) { if (lookupResult.get().foundValues.size()) { int index = 0; auto valueSize = lookupResult.get().foundValues.begin()->second.size(); std::string *buffers = response->mutable_binvalues(); buffers->resize(valueSize * lookupResult.get().foundValues.size()); for (auto [hashKey, value] : lookupResult.get().foundValues) { response->add_foundkeys(hashKey); if (unlikely(value.size() != valueSize)) { IF_NOT_OK_THEN_RETURN(value.size() == valueSize, "look up failed, dimension not consistant."); } memcpy(buffers->data() + index, value.data(), valueSize); index += valueSize; } } else { response->set_binvalues(""); } } else if (request->resulttype() == KVBatchGetResultType::BYTES) { for (auto [hashKey, value] : lookupResult.get().foundValues) { response->add_foundkeys(hashKey); response->add_values(value.data(), value.size()); } } else if (request->resulttype() == KVBatchGetResultType::TABLE) { std::vector<autil::StringView> values; for (auto [hashKey, value] : lookupResult.get().foundValues) { values.emplace_back(value); } auto inputKey = request->inputkeys(0); auto partId = inputKey.partid(); auto valueStr = searcher->convertResult(partId, values, options.get()); IF_NOT_OK_THEN_RETURN(valueStr.is_ok(), valueStr.get_error().message()); response->set_binvalues(valueStr.get().data(), valueStr.get().size()); } response->set_failedcount(lookupResult.get().failedCount); response->set_notfoundcount(lookupResult.get().notFoundCount); } if (_metricsReporter) { MetricsTags baseTags = {{{kQueryType, kKvBatchGet}, {kTableName, tableName}}}; int64_t latency = TimeUtility::currentTime() - startTime; _metricsReporter->report(latency, kLatency, kmonitor::GAUGE, &baseTags); _metricsReporter->report(1, kQps, kmonitor::QPS, &baseTags); searcher->reportMetrics(_metricsReporter.get()); } else { AUTIL_LOG(WARN, "cannot find metric reporter") } } void TableServiceImpl::health_check(google::protobuf::RpcController *controller, const HealthCheckRequest *request, HealthCheckResponse *response, google::protobuf::Closure *done) { TableServiceHelper::trySetGigEc(done, multi_call::MULTI_CALL_ERROR_NONE); ClosureGuard guard(done); auto indexProvider = getIndexProvider(); response->set_serviceready(indexProvider.get() != nullptr); } void TableServiceImpl::setIndexProvider(const shared_ptr<IndexProvider> &provider) { ScopedWriteLock lock(_lock); _indexProvider = provider; } shared_ptr<IndexProvider> TableServiceImpl::getIndexProvider() const { ScopedReadLock lock(_lock); return _indexProvider; } PartitionQuery TableServiceImpl::convertRequestToPartitionQuery(const TableQueryRequest *const request) { PartitionQuery partitionQuery; if (request->has_attr()) { partitionQuery.add_attrs(request->attr()); } if (request->has_pk()) { partitionQuery.add_pk(request->pk()); } if (request->has_docid()) { partitionQuery.add_docid(request->docid()); } if (request->has_region()) { partitionQuery.set_region(request->region()); } if (request->has_indexname()) { partitionQuery.mutable_condition()->set_indexname(request->indexname()); if (request->has_indexvalue()) { partitionQuery.mutable_condition()->add_values(request->indexvalue()); } } if (request->has_limit()) { partitionQuery.set_limit(request->limit()); } if (!request->sk().empty()) { partitionQuery.add_skey(request->sk()); } if (request->has_pknumber()) { partitionQuery.add_pknumber(request->pknumber()); } if (request->has_ignoredeletionmap()) { partitionQuery.set_ignoredeletionmap(request->ignoredeletionmap()); } else { partitionQuery.set_ignoredeletionmap(false); } if (request->has_needsectioninfo()) { partitionQuery.set_needsectioninfo(request->needsectioninfo()); } else { partitionQuery.set_needsectioninfo(false); } for (int i = 0; i < request->summarys_size(); ++i) { *partitionQuery.add_summarys() = request->summarys(i); } for (int i = 0; i < request->sources_size(); ++i) { *partitionQuery.add_sources() = request->sources(i); } if (request->has_fieldmetaquery()) { auto originRequest = request->fieldmetaquery(); partitionQuery.mutable_fieldmetaquery()->set_fieldmetatype(originRequest.fieldmetatype()); partitionQuery.mutable_fieldmetaquery()->set_indexname(originRequest.indexname()); } if (request->has_fieldtokencountquery()) { partitionQuery.mutable_fieldtokencountquery()->set_indexname(request->fieldtokencountquery().indexname()); partitionQuery.mutable_fieldtokencountquery()->set_docid(request->fieldtokencountquery().docid()); } return partitionQuery; } void TableServiceImpl::convertPartitionResponseToResponse(const TableQueryRequest *const request, const PartitionResponse &partitionResponse, TableQueryResponse *const response) { auto tableQueryResult = response->mutable_res(); tableQueryResult->set_resultcount(partitionResponse.rows_size()); const bool hasDocId = partitionResponse.rows_size() > 0 ? partitionResponse.rows(0).has_docid() : false; const auto &attrInfo = partitionResponse.attrinfo(); tableQueryResult->clear_docids(); for (int64_t i = 0; i < partitionResponse.rows_size(); ++i) { const auto &row = partitionResponse.rows(i); auto &docValue = *tableQueryResult->add_docvalueset(); if (hasDocId) { int64_t docId = row.docid(); docValue.set_docid(docId); tableQueryResult->add_docids(docId); } docValue.clear_attrvalue(); for (int j = 0; j < attrInfo.fields_size(); ++j) { auto &singleAttrValue = *docValue.add_attrvalue(); singleAttrValue.set_attrname(attrInfo.fields(j).attrname()); setSingleAttr<TableQueryRequest>(request, singleAttrValue, row.attrvalues(j)); } for (int k = 0; k < row.summaryvalues_size(); ++k) { const auto &srcValue = row.summaryvalues(k); auto targetValue = docValue.add_summaryvalues(); targetValue->set_fieldname(srcValue.fieldname()); targetValue->set_value(srcValue.value()); } for (int k = 0; k < row.sourcevalues_size(); ++k) { const auto &srcValue = row.sourcevalues(k); auto targetValue = docValue.add_sourcevalues(); targetValue->set_fieldname(srcValue.fieldname()); targetValue->set_value(srcValue.value()); } if (row.has_fieldtokencountres()) { auto targetValue = docValue.mutable_fieldtokencountres(); targetValue->set_fieldtokencount(row.fieldtokencountres().fieldtokencount()); targetValue->set_indexname(row.fieldtokencountres().indexname()); } } if (partitionResponse.has_termmeta()) { tableQueryResult->mutable_indextermmeta()->set_docfreq(partitionResponse.termmeta().docfreq()); tableQueryResult->mutable_indextermmeta()->set_totaltermfreq(partitionResponse.termmeta().totaltermfreq()); tableQueryResult->mutable_indextermmeta()->set_payload(partitionResponse.termmeta().payload()); } for (int i = 0; i < partitionResponse.sectionmetas_size(); ++i) { auto sectionMeta = tableQueryResult->add_sectionmetas(); sectionMeta->set_fieldid(partitionResponse.sectionmetas(i).fieldid()); sectionMeta->set_sectionweight(partitionResponse.sectionmetas(i).sectionweight()); sectionMeta->set_sectionlen(partitionResponse.sectionmetas(i).sectionlen()); } for (int i = 0; i < partitionResponse.matchvalues_size(); ++i) { tableQueryResult->add_matchvalues(partitionResponse.matchvalues(i)); } if (partitionResponse.has_metaresult()) { tableQueryResult->mutable_metaresult()->set_indexname((partitionResponse.metaresult().indexname())); tableQueryResult->mutable_metaresult()->set_fieldmetatype((partitionResponse.metaresult().fieldmetatype())); tableQueryResult->mutable_metaresult()->set_metainfo((partitionResponse.metaresult().metainfo())); } } ErrorInfo TableServiceImpl::convertStatusToErrorInfo(const indexlib::Status &status) { if (status.IsOK()) { return ErrorInfo::default_instance(); } const string partitionQuerierStatusMsg = status.ToString(); AUTIL_LOG(ERROR, "SuezTableService Query Failed. status msg: %s", partitionQuerierStatusMsg.c_str()); ErrorInfo errorInfo; errorInfo.set_errormsg(partitionQuerierStatusMsg); if (status.IsNotFound()) { errorInfo.set_errorcode(TBS_ERROR_NO_RECORD); } else if (status.IsInvalidArgs()) { if (strstr(partitionQuerierStatusMsg.c_str(), "can not find index")) { errorInfo.set_errorcode(TBS_ERROR_CREATE_READER); } else { errorInfo.set_errorcode(TBS_ERROR_INVALID_ATTR_NAME); } } else { errorInfo.set_errorcode(TBS_ERROR_OTHERS); } return errorInfo; } template <typename T> void TableServiceImpl::setSingleAttr(const T *const request, SingleAttrValue &singleAttrValue, const indexlibv2::base::AttrValue &attrValue) { const bool isSingle = attrValue.has_int32_value() || attrValue.has_uint32_value() || attrValue.has_int64_value() || attrValue.has_uint64_value() || attrValue.has_float_value() || attrValue.has_double_value() || attrValue.has_bytes_value(); #define GET_VALUE_FROM_ATTRVALUE(SUEZ_SINGLE_FIELD, SUEZ_MULTI_FIELD, INDEXLIB_SINGLE_FIELD, INDEXLIB_MULTI_FIELD) \ if (isSingle) { \ singleAttrValue.set_##SUEZ_SINGLE_FIELD(attrValue.INDEXLIB_SINGLE_FIELD()); \ } else { \ for (int i = 0; i < attrValue.INDEXLIB_MULTI_FIELD().value_size(); ++i) { \ singleAttrValue.add_##SUEZ_MULTI_FIELD(attrValue.INDEXLIB_MULTI_FIELD().value(i)); \ } \ } \ break; switch (attrValue.type()) { case indexlibv2::base::INT_8: case indexlibv2::base::INT_16: case indexlibv2::base::INT_32: GET_VALUE_FROM_ATTRVALUE(intvalue, multiintvalue, int32_value, multi_int32_value); case indexlibv2::base::UINT_8: case indexlibv2::base::UINT_16: case indexlibv2::base::UINT_32: GET_VALUE_FROM_ATTRVALUE(intvalue, multiintvalue, uint32_value, multi_uint32_value); case indexlibv2::base::INT_64: GET_VALUE_FROM_ATTRVALUE(intvalue, multiintvalue, int64_value, multi_int64_value); case indexlibv2::base::UINT_64: GET_VALUE_FROM_ATTRVALUE(intvalue, multiintvalue, uint64_value, multi_uint64_value); case indexlibv2::base::FLOAT: GET_VALUE_FROM_ATTRVALUE(doublevalue, multidoublevalue, float_value, multi_float_value); case indexlibv2::base::DOUBLE: GET_VALUE_FROM_ATTRVALUE(doublevalue, multidoublevalue, double_value, multi_double_value); case indexlibv2::base::STRING: { const bool encodeStr = request->has_encodestr() && request->encodestr(); if (isSingle) { const string &&str = encodeStr ? autil::legacy::Base64EncodeFast(attrValue.bytes_value()) : attrValue.bytes_value(); singleAttrValue.set_strvalue(str); } else { for (int i = 0; i < attrValue.multi_bytes_value().value_size(); ++i) { const string &&str = encodeStr ? autil::legacy::Base64EncodeFast(attrValue.multi_bytes_value().value(i)) : attrValue.multi_bytes_value().value(i); singleAttrValue.add_multistrvalue(str); } } break; } case indexlibv2::base::INT_128: default:; } #undef GET_VALUE_FROM_ATTRVALUE return; } bool TableServiceImpl::updateTopoInfo(const UpdateArgs &updateArgs) { multi_call::TopoInfoBuilder infoBuilder; if (buildTableTopoInfo(_rpcServer, updateArgs, infoBuilder) && !publishTopoInfo(_rpcServer, (multi_call::PublishGroupTy)this, infoBuilder)) { return false; } return true; } bool TableServiceImpl::buildTableTopoInfo(const suez::RpcServer *rpcServer, const UpdateArgs &updateArgs, multi_call::TopoInfoBuilder &infoBuilder) { const auto &serviceInfo = updateArgs.serviceInfo; const auto &indexProvider = updateArgs.indexProvider; string zoneName = serviceInfo.getZoneName() + ZONE_BIZ_NAME_SPLITTER + TABLE_SERVICE_TOPO_NAME; int64_t version = multi_call::INVALID_VERSION_ID; int64_t protocolVersion = multi_call::INVALID_VERSION_ID; int32_t weight = multi_call::MAX_WEIGHT; const auto &tableReaders = indexProvider->multiTableReader.getAllTableReaders(); int32_t grpcPort = -1; if (rpcServer) { grpcPort = rpcServer->gigRpcServer->getGrpcPort(); } for (const auto &tableReader : tableReaders) { const auto &tableName = tableReader.first; for (auto &it : tableReader.second) { const auto &partId = it.first; if (!partId.validate()) { AUTIL_LOG(INFO, "partition id is invalid, table service will not publish"); return false; } infoBuilder.addBiz(zoneName + ZONE_BIZ_NAME_SPLITTER + tableName, partId.partCount, partId.index, version, weight, protocolVersion, grpcPort); AUTIL_LOG(DEBUG, "fill topo info: zone [%s], biz [%s], part count [%d]," " partid [%d], version[%ld], protocol version[%ld], grpcport[%d]", zoneName.c_str(), tableName.c_str(), partId.partCount, partId.index, version, protocolVersion, grpcPort); } } auto topoInfo = infoBuilder.build(); AUTIL_LOG(INFO, "table service update topo info : %s", topoInfo.c_str()); return true; } bool TableServiceImpl::publishTopoInfo(const suez::RpcServer *rpcServer, const multi_call::PublishGroupTy group, const multi_call::TopoInfoBuilder &infoBuilder) const { if (rpcServer == nullptr) { AUTIL_LOG(ERROR, "rpc server is nullptr"); return false; } const auto &topoVec = infoBuilder.getBizTopoInfo(); auto gigRpcServer = rpcServer->gigRpcServer; std::vector<multi_call::SignatureTy> signatureVec; if (!gigRpcServer->publishGroup(group, topoVec, signatureVec)) { AUTIL_LOG(ERROR, "table service publish topo info failed"); return false; } return true; } } // namespace suez