aios/sql/ops/scan/scan_visitor/ScanIteratorCreatorR.cpp (906 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "sql/ops/scan/ScanIteratorCreatorR.h"
#include <assert.h>
#include <exception>
#include <ext/alloc_traits.h>
#include <limits>
#include <map>
#include <set>
#include <stddef.h>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include "autil/StringUtil.h"
#include "autil/legacy/exception.h"
#include "autil/mem_pool/Pool.h"
#include "autil/mem_pool/PoolBase.h"
#include "autil/mem_pool/PoolVector.h"
#include "build_service/analyzer/AnalyzerFactory.h"
#include "ha3/common/AndQuery.h"
#include "ha3/common/DocIdsQuery.h"
#include "ha3/common/ErrorDefine.h"
#include "ha3/common/QueryFlatten.h"
#include "ha3/common/QueryTermVisitor.h"
#include "ha3/common/TermCombineVisitor.h"
#include "ha3/isearch.h"
#include "ha3/search/AuxiliaryChainVisitor.h"
#include "ha3/search/DefaultLayerMetaUtil.h"
#include "ha3/search/Filter.h"
#include "ha3/search/FilterWrapper.h"
#include "ha3/search/JoinFilter.h"
#include "ha3/search/QueryExecutor.h"
#include "ha3/search/QueryExecutorCreator.h"
#include "ha3/search/RangeQueryExecutor.h"
#include "ha3/search/SubMatchCheckVisitor.h"
#include "ha3/search/SubMatchVisitor.h"
#include "ha3/search/TermMatchVisitor.h"
#include "indexlib/base/Types.h"
#include "indexlib/config/ITabletSchema.h"
#include "indexlib/index/normal/deletionmap/deletion_map_reader.h"
#include "indexlib/indexlib.h"
#include "indexlib/partition/index_partition.h"
#include "indexlib/partition/index_partition_reader.h"
#include "indexlib/util/Exception.h"
#include "indexlib/util/JsonMap.h"
#include "matchdoc/MatchDocAllocator.h"
#include "navi/builder/ResourceDefBuilder.h"
#include "navi/engine/Resource.h"
#include "navi/proto/KernelDef.pb.h"
#include "sql/common/Log.h"
#include "sql/common/common.h"
#include "sql/ops/calc/CalcInitParamR.h"
#include "sql/ops/condition/Condition.h"
#include "sql/ops/condition/ConditionParser.h"
#include "sql/ops/scan/DocIdRangesReduceOptimize.h"
#include "sql/ops/scan/DocIdsScanIterator.h"
#include "sql/ops/scan/Ha3ScanConditionVisitor.h"
#include "sql/ops/scan/Ha3ScanConditionVisitorParam.h"
#include "sql/ops/scan/Ha3ScanIterator.h"
#include "sql/ops/scan/OrderedHa3ScanIterator.h"
#include "sql/ops/scan/QueryExecutorExpressionWrapper.h"
#include "sql/ops/scan/QueryExprFilterWrapper.h"
#include "sql/ops/scan/QueryScanIterator.h"
#include "sql/ops/scan/RangeScanIterator.h"
#include "sql/ops/scan/RangeScanIteratorWithoutFilter.h"
#include "sql/ops/scan/udf_to_query/UdfToQueryManager.h"
#include "sql/ops/sort/SortInitParam.h"
#include "suez/turing/expression/framework/AttributeExpression.h"
#include "suez/turing/expression/framework/JoinDocIdConverterCreator.h"
#include "suez/turing/expression/provider/MetaInfo.h"
#include "suez/turing/expression/provider/common.h"
#include "suez/turing/expression/util/FieldMetaReaderWrapper.h"
#include "suez/turing/expression/util/IndexInfoHelper.h"
#include "suez/turing/expression/util/LegacyIndexInfoHelper.h"
namespace indexlib {
namespace index {
class DeletionMapReaderAdaptor;
class JoinDocidAttributeIterator;
} // namespace index
} // namespace indexlib
namespace navi {
class ResourceInitContext;
} // namespace navi
namespace sql {
const std::string ScanIteratorCreatorR::RESOURCE_ID = "scan_iterator_creator_r";
ScanIteratorCreatorR::ScanIteratorCreatorR() {}
ScanIteratorCreatorR::~ScanIteratorCreatorR() {
if (_indexInfoHelper) {
POOL_DELETE_CLASS(_indexInfoHelper);
}
}
void ScanIteratorCreatorR::def(navi::ResourceDefBuilder &builder) const {
builder.name(RESOURCE_ID, navi::RS_KERNEL);
}
bool ScanIteratorCreatorR::config(navi::ResourceConfigContext &ctx) {
return true;
}
navi::ErrorCode ScanIteratorCreatorR::init(navi::ResourceInitContext &ctx) {
_indexPartitionReaderWrapper = _attributeExpressionCreatorR->_indexPartitionReaderWrapper;
_tableInfo = _attributeExpressionCreatorR->_tableInfo;
_matchDocAllocator = _attributeExpressionCreatorR->_matchDocAllocator;
_attributeExpressionCreator = _attributeExpressionCreatorR->_attributeExpressionCreator;
_functionProvider = _attributeExpressionCreatorR->_functionProvider;
if (!initMatchData()) {
return navi::EC_ABORT;
}
const auto &tableName = _scanInitParamR->tableName;
auto pool = _queryMemPoolR->getPool().get();
ConditionParser parser(pool);
ConditionPtr condition;
const auto &conditionInfo = _scanInitParamR->calcInitParamR->conditionJson;
if (!parser.parseCondition(conditionInfo, condition)) {
SQL_LOG(ERROR,
"table name [%s], parse condition [%s] failed",
tableName.c_str(),
conditionInfo.c_str());
return navi::EC_ABORT;
}
if (_indexPartitionReaderWrapper->getFieldMetaReadersMap() != nullptr) {
_fieldMetaReaderWrapper = std::make_shared<suez::turing::FieldMetaReaderWrapper>(
pool, _indexPartitionReaderWrapper->getFieldMetaReadersMap());
}
isearch::search::LayerMetaPtr layerMeta = createLayerMeta();
SQL_LOG(DEBUG, "layer meta: %s", layerMeta->toString().c_str());
const auto &tableSortDescription = _ha3TableInfoR->getTableSortDescMap();
auto iter = tableSortDescription.find(tableName);
if (iter != tableSortDescription.end()) {
SQL_LOG(DEBUG, "begin reduce docid range optimize");
DocIdRangesReduceOptimize optimize(iter->second, _scanInitParamR->fieldInfos);
if (condition) {
condition->accept(&optimize);
}
layerMeta = optimize.reduceDocIdRange(layerMeta, pool, _indexPartitionReaderWrapper);
SQL_LOG(DEBUG,
"after reduce docid range optimize, layer meta: %s",
layerMeta->toString().c_str());
} else {
SQL_LOG(DEBUG, "not find table [%s] sort description", tableName.c_str());
}
if (layerMeta) {
layerMeta = splitLayerMetaByStep(pool,
layerMeta,
_scanInitParamR->parallelIndex,
_scanInitParamR->parallelNum,
_scanInitParamR->parallelBlockCount);
if (!layerMeta) {
SQL_LOG(WARN, "table name [%s], split layer meta failed.", tableName.c_str());
return navi::EC_ABORT;
}
layerMeta->quotaMode = QM_PER_DOC;
proportionalLayerQuota(*layerMeta.get());
SQL_LOG(
DEBUG, "after proportional layer quota, layer meta: %s", layerMeta->toString().c_str());
}
Ha3ScanConditionVisitorParam param;
param.attrExprCreator = _attributeExpressionCreator;
param.indexPartitionReaderWrapper = _indexPartitionReaderWrapper;
param.analyzerFactory = _analyzerFactoryR->getFactory().get();
param.indexInfo = &_scanInitParamR->indexInfos;
param.pool = pool;
param.queryInfo = &_ha3QueryInfoR->getQueryInfo();
param.indexInfos = _tableInfo->getIndexInfos();
param.mainTableName = tableName;
param.timeoutTerminator = _timeoutTerminatorR->getTimeoutTerminator();
param.layerMeta = layerMeta.get();
param.modelConfigMap = &_modelConfigMapR->getModelConfigMap();
param.udfToQueryManager = _udfToQueryManagerR->getManager().get();
param.forbidIndexs = &_scanInitParamR->forbidIndexs;
Ha3ScanConditionVisitor visitor(param);
if (condition) {
condition->accept(&visitor);
}
if (visitor.isError()) {
SQL_LOG(WARN,
"table name [%s], create scan iterator failed, error info [%s]",
tableName.c_str(),
visitor.errorInfo().c_str());
return navi::EC_ABORT;
}
isearch::common::QueryPtr query(visitor.stealQuery());
if (query) {
SQL_LOG(TRACE1, "query [%s]", query->toString().c_str());
isearch::common::QueryFlatten queryFlatten;
queryFlatten.flatten(query.get());
isearch::common::Query *flattenQuery = queryFlatten.stealQuery();
assert(flattenQuery);
query.reset(flattenQuery);
SQL_LOG(TRACE1, "after flat query [%s]", query->toString().c_str());
auto tabletSchema = _indexPartitionReaderWrapper->getSchema();
if (!tabletSchema) {
SQL_LOG(ERROR, "get index schema failed from table [%s]", tableName.c_str());
return navi::EC_ABORT;
}
termCombine(query, tabletSchema);
if (!initTruncateDesc(query)) {
return navi::EC_ABORT;
}
if (!_scanInitParamR->matchType.empty()) {
SQL_LOG(TRACE1, "before sub match query");
auto matchDataLabel = getMatchDataLabel();
subMatchQuery(query, tabletSchema, matchDataLabel);
SQL_LOG(TRACE1, "after sub match query [%s]", query->toString().c_str());
}
}
suez::turing::AttributeExpression *attrExpr = visitor.stealAttributeExpression();
isearch::search::FilterWrapperPtr filterWrapper;
if (attrExpr || !_scanInitParamR->auxTableName.empty()) {
SQL_LOG(
TRACE1, "condition expr [%s]", attrExpr ? attrExpr->getOriginalString().c_str() : "");
if (!createJoinDocIdConverter()) {
return navi::EC_ABORT;
}
bool ret = createFilterWrapper(attrExpr,
visitor.getQueryExprs(),
_attributeExpressionCreator->getJoinDocIdConverterCreator(),
filterWrapper);
if (!ret) {
SQL_LOG(WARN,
"table name [%s], create filter wrapper failed, exprStr: [%s]",
_scanInitParamR->tableName.c_str(),
attrExpr->getOriginalString().c_str());
return navi::EC_ABORT;
}
}
_createScanIteratorInfo.query = query;
_createScanIteratorInfo.filterWrapper = filterWrapper;
_createScanIteratorInfo.queryExprs = visitor.getQueryExprs();
_createScanIteratorInfo.layerMeta = layerMeta;
_baseScanIteratorInfo = _createScanIteratorInfo;
return navi::EC_NONE;
}
bool ScanIteratorCreatorR::initMatchData() {
_matchDataManager = std::make_shared<isearch::search::MatchDataManager>();
const auto &matchType = _scanInitParamR->matchType;
if (matchType.empty()) {
return true;
}
SQL_LOG(DEBUG,
"require match data, match types [%s]",
autil::StringUtil::toString(matchType).c_str());
_matchDataManager->requireMatchData();
_matchDataManager->setQueryCount(1);
return true;
}
bool ScanIteratorCreatorR::initTruncateDesc(isearch::common::QueryPtr &query) {
auto scanHintMap = _scanInitParamR->getScanHintMap();
if (!scanHintMap) {
return true;
}
const auto &hints = *scanHintMap;
auto iter = hints.find("truncateDesc");
if (iter == hints.end()) {
return true;
}
const auto &truncateDesc = iter->second;
if (!truncateDesc.empty()) {
SQL_LOG(DEBUG, "use truncate query");
try {
truncateQuery(query, truncateDesc);
} catch (const autil::legacy::ExceptionBase &e) {
SQL_LOG(ERROR, "scanInitParam init failed error:[%s].", e.what());
return false;
} catch (...) { return false; }
SQL_LOG(TRACE1, "after truncate query [%s]", query->toString().c_str());
}
return true;
}
std::string ScanIteratorCreatorR::getMatchDataLabel() const {
std::string ret = "sub";
auto scanHintMap = _scanInitParamR->getScanHintMap();
if (!scanHintMap) {
return ret;
}
const auto &hints = *scanHintMap;
auto iter = hints.find("MatchDataLabel");
if (iter != hints.end()) {
if (iter->second == "full_term") {
ret = "full_term";
}
}
return ret;
}
void ScanIteratorCreatorR::requireMatchData() {
auto pool = _queryMemPoolR->getPool().get();
std::unordered_set<std::string> matchTypeSet(_scanInitParamR->matchType.begin(),
_scanInitParamR->matchType.end());
if (matchTypeSet.count(SQL_MATCH_TYPE_SUB) != 0) {
if (_matchDocAllocator->hasSubDocAllocator()) {
SQL_LOG(DEBUG, "require sub simple match data");
_matchDataManager->requireSubSimpleMatchData(
_matchDocAllocator.get(), SIMPLE_MATCH_DATA_REF, SUB_DOC_DISPLAY_GROUP, pool);
} else {
SQL_LOG(DEBUG, "require simple match data");
_matchDataManager->requireSimpleMatchData(
_matchDocAllocator.get(), SIMPLE_MATCH_DATA_REF, SUB_DOC_DISPLAY_NO, pool);
}
} else if (matchTypeSet.count(SQL_MATCH_TYPE_SIMPLE) != 0) {
SQL_LOG(DEBUG, "require simple match data");
_matchDataManager->requireSimpleMatchData(
_matchDocAllocator.get(), SIMPLE_MATCH_DATA_REF, SUB_DOC_DISPLAY_NO, pool);
}
if (matchTypeSet.count(SQL_MATCH_TYPE_FULL) != 0) {
SQL_LOG(DEBUG, "require full match data");
_matchDataManager->requireMatchData(
_matchDocAllocator.get(), MATCH_DATA_REF, SUB_DOC_DISPLAY_NO, pool);
}
if (matchTypeSet.count(SQL_MATCH_TYPE_VALUE) != 0) {
SQL_LOG(DEBUG, "require match value");
_matchDataManager->requireMatchValues(
_matchDocAllocator.get(), MATCH_VALUE_REF, SUB_DOC_DISPLAY_NO, pool);
}
}
void ScanIteratorCreatorR::postInitMatchData() {
if (_scanInitParamR->matchType.empty()) {
return;
}
if (!_metaInfo) {
_metaInfo = std::make_shared<suez::turing::MetaInfo>();
_matchDataManager->getQueryTermMetaInfo(_metaInfo.get());
_indexInfoHelper = POOL_NEW_CLASS(_queryMemPoolR->getPool().get(),
suez::turing::LegacyIndexInfoHelper,
_tableInfo->getIndexInfos());
}
requireMatchData();
if (_functionProvider->getMatchInfoReader() == nullptr) {
auto metaInfo = std::make_shared<suez::turing::MetaInfo>();
_matchDataManager->getQueryTermMetaInfo(metaInfo.get());
_functionProvider->initMatchInfoReader(std::move(metaInfo));
_functionProvider->setIndexInfoHelper(_indexInfoHelper);
_functionProvider->setIndexReaderPtr(_indexPartitionReaderWrapper->getIndexReader());
_functionProvider->setFieldMetaReaderWrapper(_fieldMetaReaderWrapper);
}
const auto &partitionInfo = _indexPartitionReaderWrapper->getPartitionInfo();
_functionProvider->getMatchInfoReader()->getMetaInfoPtr()->setPartTotalDocCount(
partitionInfo->GetTotalDocCount() - partitionInfo->GetDelDocCount());
}
bool ScanIteratorCreatorR::updateQuery(const StreamQueryPtr &inputQuery) {
const auto &query = inputQuery->query;
if (!query) {
return true;
}
SQL_LOG(TRACE2, "update query [%s]", query->toString().c_str());
if (_baseScanIteratorInfo.query != nullptr) {
isearch::common::QueryPtr andQuery(new isearch::common::AndQuery(""));
andQuery->addQuery(query);
andQuery->addQuery(_baseScanIteratorInfo.query);
_createScanIteratorInfo.query = andQuery;
} else {
_createScanIteratorInfo.query = query;
}
SQL_LOG(TRACE2, "updated query [%s]", _createScanIteratorInfo.query->toString().c_str());
if (_updateQueryCount != 0 && _createScanIteratorInfo.filterWrapper != nullptr) {
_createScanIteratorInfo.filterWrapper->resetFilter();
}
_updateQueryCount++;
if (!_scanInitParamR->matchType.empty()) {
_matchDataManager->requireMatchData();
_matchDataManager->setQueryCount(1);
}
return true;
}
bool ScanIteratorCreatorR::createJoinDocIdConverter() {
const auto &auxTableName = _scanInitParamR->auxTableName;
if (!auxTableName.empty()) {
if (!_attributeExpressionCreator->createJoinDocIdConverter(auxTableName,
_scanInitParamR->tableName)) {
SQL_LOG(WARN,
"aux join create join doc id converter failed, aux [%s], main [%s]",
auxTableName.c_str(),
_scanInitParamR->tableName.c_str());
return false;
}
}
return true;
}
void ScanIteratorCreatorR::truncateQuery(isearch::common::QueryPtr &query,
const std::string &truncateDesc) {
std::vector<std::string> truncateNames;
std::vector<isearch::search::SelectAuxChainType> types;
if (!parseTruncateDesc(truncateDesc, truncateNames, types)) {
return;
}
for (size_t i = 0; i < truncateNames.size(); ++i) {
isearch::search::TermDFMap termDFMap;
if (types[i] != isearch::search::SAC_ALL) {
isearch::common::QueryTermVisitor visitor(
isearch::common::QueryTermVisitor::VT_ANDNOT_QUERY);
query->accept(&visitor);
const auto &termVector = visitor.getTermVector();
for (const auto &term : termVector) {
termDFMap[term] = _indexPartitionReaderWrapper->getTermDF(term);
}
}
isearch::search::AuxiliaryChainVisitor visitor(truncateNames[i], termDFMap, types[i]);
query->accept(&visitor);
}
}
bool ScanIteratorCreatorR::parseTruncateDesc(
const std::string &truncateDesc,
std::vector<std::string> &truncateNames,
std::vector<isearch::search::SelectAuxChainType> &types) {
for (const auto &auxChain : autil::StringUtil::split(truncateDesc, ";")) {
auto type = isearch::search::SelectAuxChainType::SAC_DF_SMALLER;
std::string truncateName;
for (const auto &option : autil::StringUtil::split(auxChain, "|")) {
const std::vector<std::string> &st = autil::StringUtil::split(option, "#");
if (st.size() != 2) {
SQL_LOG(WARN, "Invalid truncate query option[%s], skip optimize", option.c_str());
return false;
}
if (st[0] == "select") {
if (st[1] == "bigger" || st[1] == "BIGGER") {
type = isearch::search::SelectAuxChainType::SAC_DF_BIGGER;
} else if (st[1] == "ALL" || st[1] == "all") {
type = isearch::search::SelectAuxChainType::SAC_ALL;
} else if (st[1] == "smaller" || st[1] == "SMALLER") {
type = isearch::search::SelectAuxChainType::SAC_DF_SMALLER;
} else {
SQL_LOG(WARN,
"Invalid truncate query select type [%s], skip optimize",
st[1].c_str());
return false;
}
} else if (st[0] == "aux_name") {
truncateName = st[1];
}
}
if (truncateName.empty()) {
SQL_LOG(WARN, "Invalid truncate query, truncate name not found");
return false;
}
truncateNames.push_back(truncateName);
types.push_back(type);
}
return true;
}
void ScanIteratorCreatorR::subMatchQuery(
isearch::common::QueryPtr &query,
const std::shared_ptr<indexlibv2::config::ITabletSchema> &indexSchemaPtr,
const std::string &matchDataLabel) {
isearch::search::SubMatchCheckVisitor subMatchCheckVisitor;
query->accept(&subMatchCheckVisitor);
if (subMatchCheckVisitor.needSubMatch()) {
isearch::search::SubMatchVisitor subMatchVisitor;
query->accept(&subMatchVisitor);
if (matchDataLabel == "full_term") {
isearch::search::TermMatchVisitor termMatchVisitor(indexSchemaPtr);
query->accept(&termMatchVisitor);
}
}
}
ScanIteratorPtr ScanIteratorCreatorR::createScanIterator(bool &emptyScan) {
const auto &query = _createScanIteratorInfo.query;
const auto &filterWrapper = _createScanIteratorInfo.filterWrapper;
const auto &queryExprs = _createScanIteratorInfo.queryExprs;
const auto &layerMeta = _createScanIteratorInfo.layerMeta;
emptyScan = false;
if (!layerMeta) {
SQL_LOG(WARN, "table name [%s], layer meta is null.", _scanInitParamR->tableName.c_str());
return NULL;
}
if (layerMeta->size() == 0) {
SQL_LOG(TRACE1, "table name [%s], empty layer meta.", _scanInitParamR->tableName.c_str());
emptyScan = true;
return NULL;
}
std::vector<isearch::search::LayerMetaPtr> layerMetas;
ScanIteratorPtr scanIterator = ScanIteratorPtr();
if (_scanInitParamR->sortDesc.topk != 0) { // has sort desc
SQL_LOG(DEBUG, "create ordered ha3 scan iter");
for (size_t i = 0; i < layerMeta->size(); ++i) {
isearch::search::LayerMetaPtr singleLayerMeta(
new isearch::search::LayerMeta(*layerMeta));
singleLayerMeta->clear();
singleLayerMeta->push_back((*layerMeta)[i]);
layerMetas.emplace_back(singleLayerMeta);
}
scanIterator.reset(
createOrderedHa3ScanIterator(query, filterWrapper, layerMetas, emptyScan));
} else if (dynamic_cast<isearch::common::DocIdsQuery *>(query.get()) != nullptr
&& filterWrapper == nullptr) {
SQL_LOG(DEBUG, "create docids scan iter");
scanIterator.reset(createDocIdScanIterator(query));
} else if (needHa3Scan(query)) {
SQL_LOG(DEBUG, "create ha3 scan iter");
scanIterator.reset(createHa3ScanIterator(query, filterWrapper, layerMeta, emptyScan));
layerMetas = {layerMeta};
} else if (nullptr == query) {
SQL_LOG(DEBUG, "create range scan iter");
scanIterator.reset(createRangeScanIterator(filterWrapper, layerMeta));
layerMetas = {layerMeta};
} else {
SQL_LOG(DEBUG, "create query scan iter");
scanIterator.reset(createQueryScanIterator(query, filterWrapper, layerMeta, emptyScan));
layerMetas = {layerMeta};
}
for (auto *expr : queryExprs) {
auto *queryExpr = dynamic_cast<QueryExecutorExpressionWrapper *>(expr);
if (queryExpr == nullptr) {
SQL_LOG(ERROR,
"unexpected query expression cast failed, [%s]",
expr->getOriginalString().c_str());
return ScanIteratorPtr();
}
if (!queryExpr->init(_indexPartitionReaderWrapper,
_scanInitParamR->tableName,
_queryMemPoolR->getPool().get(),
_timeoutTerminatorR->getTimeoutTerminator(),
layerMetas)) {
SQL_LOG(ERROR,
"unexpected init query expression failed, [%s]",
expr->getOriginalString().c_str());
return ScanIteratorPtr();
}
}
postInitMatchData();
return scanIterator;
}
bool ScanIteratorCreatorR::needHa3Scan(const isearch::common::QueryPtr &query) {
return _matchDocAllocator->hasSubDocAllocator() || (_matchDataManager->needMatchData())
|| (query != nullptr && _scanInitParamR->limit != std::numeric_limits<uint32_t>::max());
}
bool ScanIteratorCreatorR::isTermQuery(const isearch::common::QueryPtr &query) {
if (query == NULL) {
return false;
}
auto type = query->getType();
if (type == TERM_QUERY || type == NUMBER_QUERY) {
return true;
}
return false;
}
ScanIterator *ScanIteratorCreatorR::createRangeScanIterator(
const isearch::search::FilterWrapperPtr &filterWrapper,
const isearch::search::LayerMetaPtr &layerMeta) {
std::shared_ptr<indexlib::index::DeletionMapReaderAdaptor> delMapReader;
delMapReader = _indexPartitionReaderWrapper->getDeletionMapReader();
if (filterWrapper != NULL) {
return new RangeScanIterator(filterWrapper,
_matchDocAllocator,
delMapReader,
layerMeta,
_timeoutTerminatorR->getTimeoutTerminator());
} else {
return new RangeScanIteratorWithoutFilter(_matchDocAllocator,
delMapReader,
layerMeta,
_timeoutTerminatorR->getTimeoutTerminator());
}
}
ScanIterator *ScanIteratorCreatorR::createQueryScanIterator(
const isearch::common::QueryPtr &query,
const isearch::search::FilterWrapperPtr &filterWrapper,
const isearch::search::LayerMetaPtr &layerMeta,
bool &emptyScan) {
auto queryExecutor
= createQueryExecutor(query, _scanInitParamR->tableName, layerMeta.get(), emptyScan);
if (emptyScan) {
SQL_LOG(TRACE1, "empty result scan condition. query [%s]", query->toString().c_str());
return NULL;
}
if (!queryExecutor) {
SQL_LOG(WARN,
"table name [%s], create query executor failed. query [%s]",
_scanInitParamR->tableName.c_str(),
query->toString().c_str());
return NULL;
}
isearch::search::QueryExecutorPtr queryExecutorPtr(
queryExecutor, [](isearch::search::QueryExecutor *p) { POOL_DELETE_CLASS(p); });
if (!_useSubR->getUseSub() && queryExecutor->hasSubDocExecutor()) {
SQL_LOG(ERROR,
"use sub query [%s] without unnest sub table",
queryExecutor->toString().c_str());
return NULL;
}
std::shared_ptr<indexlib::index::DeletionMapReaderAdaptor> delMapReader;
delMapReader = _indexPartitionReaderWrapper->getDeletionMapReader();
return new QueryScanIterator(queryExecutorPtr,
filterWrapper,
_matchDocAllocator,
delMapReader,
layerMeta,
_timeoutTerminatorR->getTimeoutTerminator());
}
ScanIterator *
ScanIteratorCreatorR::createHa3ScanIterator(const isearch::common::QueryPtr &query,
const isearch::search::FilterWrapperPtr &filterWrapper,
const isearch::search::LayerMetaPtr &layerMeta,
bool &emptyScan) {
std::vector<isearch::search::QueryExecutorPtr> queryExecutors;
if (!createQueryExecutors(
query, _scanInitParamR->tableName, {layerMeta}, emptyScan, queryExecutors)) {
SQL_LOG(TRACE1, "create query executors failed");
return nullptr;
}
std::shared_ptr<indexlib::index::DeletionMapReaderAdaptor> delMapReader;
indexlib::index::DeletionMapReaderPtr subDelMapReader;
indexlib::index::JoinDocidAttributeIterator *mainToSubIt = NULL;
delMapReader = _indexPartitionReaderWrapper->getDeletionMapReader();
if (_matchDocAllocator->hasSubDocAllocator()) {
mainToSubIt = _indexPartitionReaderWrapper->getMainToSubIter();
const indexlib::partition::IndexPartitionReaderPtr &subIndexPartReader
= _indexPartitionReaderWrapper->getSubReader();
if (subIndexPartReader) {
subDelMapReader = subIndexPartReader->GetDeletionMapReader();
}
}
Ha3ScanIteratorParam seekParam;
seekParam.queryExecutors = queryExecutors;
seekParam.filterWrapper = filterWrapper;
seekParam.matchDocAllocator = _matchDocAllocator;
seekParam.delMapReader = delMapReader;
seekParam.subDelMapReader = subDelMapReader;
seekParam.layerMetas = {layerMeta};
if (query != nullptr) {
seekParam.matchDataManager = _matchDataManager;
}
seekParam.mainToSubIt = mainToSubIt;
seekParam.timeoutTerminator = _timeoutTerminatorR->getTimeoutTerminator();
seekParam.needAllSubDocFlag = false;
return new Ha3ScanIterator(seekParam);
}
ScanIterator *ScanIteratorCreatorR::createOrderedHa3ScanIterator(
const isearch::common::QueryPtr &query,
const isearch::search::FilterWrapperPtr &filterWrapper,
const std::vector<isearch::search::LayerMetaPtr> &layerMetas,
bool &emptyScan) {
if (_matchDocAllocator->hasSubDocAllocator() || _useSubR->getUseSub()) {
SQL_LOG(ERROR, "unexpected ordered scan iter has UNNEST table");
return nullptr;
}
std::vector<isearch::search::QueryExecutorPtr> queryExecutors;
if (!createQueryExecutors(
query, _scanInitParamR->tableName, layerMetas, emptyScan, queryExecutors)) {
SQL_LOG(TRACE1, "create query executors failed");
return nullptr;
}
std::shared_ptr<indexlib::index::DeletionMapReaderAdaptor> delMapReader;
delMapReader = _indexPartitionReaderWrapper->getDeletionMapReader();
Ha3ScanIteratorParam seekParam;
seekParam.queryExecutors = queryExecutors;
seekParam.filterWrapper = filterWrapper;
seekParam.matchDocAllocator = _matchDocAllocator;
seekParam.delMapReader = delMapReader;
seekParam.subDelMapReader = nullptr;
seekParam.layerMetas = layerMetas;
if (query != nullptr) {
seekParam.matchDataManager = _matchDataManager;
}
seekParam.mainToSubIt = nullptr;
seekParam.timeoutTerminator = _timeoutTerminatorR->getTimeoutTerminator();
seekParam.needAllSubDocFlag = false;
OrderedHa3ScanIterator *orderedHa3ScanIterator(
new OrderedHa3ScanIterator(seekParam,
_queryMemPoolR->getPool().get(),
_scanInitParamR->sortDesc,
_attributeExpressionCreator.get()));
if (!orderedHa3ScanIterator->init()) {
SQL_LOG(WARN, "create orderedHa3ScanIterator failed");
delete orderedHa3ScanIterator;
return nullptr;
}
return orderedHa3ScanIterator;
}
bool ScanIteratorCreatorR::createQueryExecutors(
const isearch::common::QueryPtr &query,
const std::string &mainTableName,
const std::vector<isearch::search::LayerMetaPtr> &layerMetas,
bool &emptyScan,
std::vector<isearch::search::QueryExecutorPtr> &queryExecutors) {
for (size_t i = 0; i < layerMetas.size(); ++i) {
auto queryExecutor
= createQueryExecutor(query, mainTableName, layerMetas[i].get(), emptyScan);
if (emptyScan) {
SQL_LOG(TRACE1,
"table name [%s], empty result scan condition. query [%s]",
mainTableName.c_str(),
query->toString().c_str());
return false;
}
if (!queryExecutor) {
SQL_LOG(WARN,
"table name [%s], create query executor failed. query [%s]",
mainTableName.c_str(),
query->toString().c_str());
return false;
}
SQL_LOG(
DEBUG, "create query executor, query executor [%s]", queryExecutor->toString().c_str());
isearch::search::QueryExecutorPtr queryExecutorPtr(
queryExecutor, [](isearch::search::QueryExecutor *p) { POOL_DELETE_CLASS(p); });
if (!_useSubR->getUseSub() && queryExecutor->hasSubDocExecutor()) {
SQL_LOG(ERROR,
"use sub query [%s] without unnest sub table",
queryExecutor->toString().c_str());
return false;
}
queryExecutors.emplace_back(queryExecutorPtr);
}
return true;
}
isearch::search::QueryExecutor *
ScanIteratorCreatorR::createQueryExecutor(const isearch::common::QueryPtr &query,
const std::string &mainTableName,
isearch::search::LayerMeta *layerMeta,
bool &emptyScan) {
emptyScan = false;
isearch::search::QueryExecutor *queryExecutor = NULL;
auto pool = _queryMemPoolR->getPool().get();
if (query == NULL) {
queryExecutor = POOL_NEW_CLASS(pool, isearch::search::RangeQueryExecutor, layerMeta);
SQL_LOG(TRACE1, "table name [%s], no query executor, null query", mainTableName.c_str());
return queryExecutor;
}
SQL_LOG(DEBUG,
"create query executor, table name [%s], query [%s]",
mainTableName.c_str(),
query->toString().c_str());
isearch::ErrorCode errorCode = isearch::ERROR_NONE;
std::string errorMsg;
try {
isearch::search::QueryExecutorCreator qeCreator(_matchDataManager.get(),
_indexPartitionReaderWrapper.get(),
pool,
_timeoutTerminatorR->getTimeoutTerminator(),
layerMeta);
query->accept(&qeCreator);
queryExecutor = qeCreator.stealQuery();
if (queryExecutor->isEmpty()) {
POOL_DELETE_CLASS(queryExecutor);
queryExecutor = NULL;
emptyScan = true;
}
} catch (const indexlib::util::ExceptionBase &e) {
SQL_LOG(WARN, "table name [%s], lookup exception: [%s]", mainTableName.c_str(), e.what());
errorMsg = "ExceptionBase: " + e.GetClassName();
errorCode = isearch::ERROR_SEARCH_LOOKUP;
if (e.GetClassName() == "FileIOException") {
errorCode = isearch::ERROR_SEARCH_LOOKUP_FILEIO_ERROR;
}
} catch (const std::exception &e) {
errorMsg = e.what();
errorCode = isearch::ERROR_SEARCH_LOOKUP;
} catch (...) {
errorMsg = "Unknown Exception";
errorCode = isearch::ERROR_SEARCH_LOOKUP;
}
if (errorCode != isearch::ERROR_NONE) {
SQL_LOG(WARN,
"table name [%s], Create query executor failed, query [%s], Exception: [%s]",
mainTableName.c_str(),
query->toString().c_str(),
errorMsg.c_str());
}
return queryExecutor;
}
bool ScanIteratorCreatorR::createFilterWrapper(
suez::turing::AttributeExpression *attrExpr,
const std::vector<suez::turing::AttributeExpression *> &queryExprVec,
suez::turing::JoinDocIdConverterCreator *joinDocIdConverterCreator,
isearch::search::FilterWrapperPtr &filterWrapper) {
auto pool = _queryMemPoolR->getPool().get();
isearch::search::Filter *filter = NULL;
isearch::search::SubDocFilter *subDocFilter = NULL;
if (attrExpr) {
suez::turing::AttributeExpressionTyped<bool> *boolAttrExpr
= dynamic_cast<suez::turing::AttributeExpressionTyped<bool> *>(attrExpr);
if (!boolAttrExpr) {
SQL_LOG(WARN, "filter expression return type should be bool.");
return false;
}
filter = POOL_NEW_CLASS(pool, isearch::search::Filter, boolAttrExpr);
if (filter->needFilterSubDoc() && _matchDocAllocator->hasSubDocAllocator()) {
subDocFilter = POOL_NEW_CLASS(
pool, isearch::search::SubDocFilter, _matchDocAllocator->getSubDocAccessor());
}
}
isearch::search::JoinFilter *joinFilter = NULL;
if (joinDocIdConverterCreator != nullptr && !joinDocIdConverterCreator->isEmpty()) {
joinFilter
= POOL_NEW_CLASS(pool, isearch::search::JoinFilter, joinDocIdConverterCreator, true);
}
if (filter != NULL || subDocFilter != NULL || joinFilter != NULL) {
if (queryExprVec.size() != 0) {
filterWrapper.reset(new QueryExprFilterWrapper(queryExprVec));
} else {
filterWrapper.reset(new isearch::search::FilterWrapper());
}
filterWrapper->setFilter(filter);
filterWrapper->setSubDocFilter(subDocFilter);
filterWrapper->setJoinFilter(joinFilter);
}
return true;
}
isearch::search::LayerMetaPtr ScanIteratorCreatorR::createLayerMeta() {
auto limit = _scanInitParamR->limit;
auto pool = _queryMemPoolR->getPool().get();
isearch::search::LayerMeta fullLayerMeta
= isearch::search::DefaultLayerMetaUtil::createFullRange(
pool, _indexPartitionReaderWrapper.get());
fullLayerMeta.quota = limit;
return std::make_shared<isearch::search::LayerMeta>(fullLayerMeta);
}
void ScanIteratorCreatorR::proportionalLayerQuota(isearch::search::LayerMeta &layerMeta) {
if (layerMeta.size() == 0 || layerMeta.quota == 0 || layerMeta.quotaMode == QM_PER_LAYER) {
return;
}
uint32_t remainQuota = layerMeta.quota;
docid_t totalRange = 0;
for (size_t i = 0; i < layerMeta.size(); ++i) {
auto &meta = layerMeta[i];
layerMeta[i].quota = meta.end - meta.begin + 1;
totalRange += layerMeta[i].quota;
}
if (totalRange > layerMeta.quota) {
for (size_t i = 0; i < layerMeta.size(); ++i) {
auto &meta = layerMeta[i];
docid_t range = meta.end - meta.begin + 1;
meta.quota = (uint32_t)((double)range * layerMeta.quota / totalRange);
remainQuota -= meta.quota;
}
layerMeta[0].quota += remainQuota;
}
layerMeta.quota = 0;
}
isearch::search::LayerMetaPtr
ScanIteratorCreatorR::splitLayerMetaByStep(autil::mem_pool::Pool *pool,
const isearch::search::LayerMetaPtr &layerMeta,
uint32_t parallelIndex,
uint32_t parallelNum,
uint32_t parallelBlockCount) {
if (parallelNum <= 1) {
return layerMeta;
}
if (unlikely(parallelIndex >= parallelNum)) {
SQL_LOG(ERROR,
"unexpected invalid param parallelNum=[%u], parallelIndex=[%u]",
parallelNum,
parallelIndex);
return isearch::search::LayerMetaPtr(nullptr);
}
parallelBlockCount = std::min((uint32_t)PARALLEL_MAX_BLOCK_COUNT, parallelBlockCount);
uint32_t rangeSizeSum = 0;
uint32_t blockSize = PARALLEL_MIN_BLOCK_SIZE;
const auto &originMeta = *layerMeta;
for (size_t i = 0; i < originMeta.size(); ++i) {
rangeSizeSum += originMeta[i].end - originMeta[i].begin + 1;
}
blockSize = std::max(
(uint32_t)std::ceil((double)rangeSizeSum / (parallelNum * parallelBlockCount)), blockSize);
SQL_LOG(TRACE1, "parallel block size is %u", blockSize);
const uint64_t stepSize = blockSize * parallelNum;
isearch::search::LayerMetaPtr newMeta(new isearch::search::LayerMeta(pool));
uint64_t curRangeStart = blockSize * parallelIndex;
uint64_t curRangeEnd = curRangeStart + blockSize - 1;
size_t curMetaIdx = 0;
uint64_t offset = originMeta[0].begin;
while (curMetaIdx < originMeta.size()) {
uint64_t realRangeStart = curRangeStart + offset;
uint64_t realRangeEnd = curRangeEnd + offset;
intersectRange(newMeta, realRangeStart, realRangeEnd, originMeta, curMetaIdx);
if (realRangeEnd > originMeta[curMetaIdx].end) {
++curMetaIdx;
if (curMetaIdx >= originMeta.size()) {
break;
}
offset += originMeta[curMetaIdx].begin - originMeta[curMetaIdx - 1].end - 1;
} else {
curRangeStart += stepSize;
curRangeEnd += stepSize;
}
}
return newMeta;
}
void ScanIteratorCreatorR::intersectRange(isearch::search::LayerMetaPtr newMeta,
uint64_t rangeStart,
uint64_t rangeEnd,
const isearch::search::LayerMeta &oldMeta,
size_t metaIdx) {
uint64_t maxRangeStart = std::max(rangeStart, (uint64_t)oldMeta[metaIdx].begin);
uint64_t minRangeEnd = std::min(rangeEnd, (uint64_t)oldMeta[metaIdx].end);
if (minRangeEnd >= maxRangeStart) {
isearch::search::DocIdRangeMeta rangeMeta(
maxRangeStart, minRangeEnd, oldMeta[metaIdx].ordered, minRangeEnd - maxRangeStart + 1);
newMeta->push_back(rangeMeta);
}
}
ScanIterator *
ScanIteratorCreatorR::createDocIdScanIterator(const isearch::common::QueryPtr &query) {
return new DocIdsScanIterator(
_matchDocAllocator, _timeoutTerminatorR->getTimeoutTerminator(), query);
}
void ScanIteratorCreatorR::termCombine(
isearch::common::QueryPtr &query,
const std::shared_ptr<indexlibv2::config::ITabletSchema> &tabletSchema) {
isearch::common::TermCombineVisitor termCombineVisitor(
tabletSchema->GetUserDefinedParam().GetMap());
if (termCombineVisitor.hasCombineDict()) {
SQL_LOG(TRACE1, "before term combine query [%s]", query->toString().c_str());
query->accept(&termCombineVisitor);
query.reset(termCombineVisitor.stealQuery());
SQL_LOG(TRACE1, "after term combine query [%s]", query->toString().c_str());
}
}
REGISTER_RESOURCE(ScanIteratorCreatorR);
} // namespace sql