in aios/sql/ops/scan/scan_visitor/ScanIteratorCreatorR.cpp [122:259]
navi::ErrorCode ScanIteratorCreatorR::init(navi::ResourceInitContext &ctx) {
_indexPartitionReaderWrapper = _attributeExpressionCreatorR->_indexPartitionReaderWrapper;
_tableInfo = _attributeExpressionCreatorR->_tableInfo;
_matchDocAllocator = _attributeExpressionCreatorR->_matchDocAllocator;
_attributeExpressionCreator = _attributeExpressionCreatorR->_attributeExpressionCreator;
_functionProvider = _attributeExpressionCreatorR->_functionProvider;
if (!initMatchData()) {
return navi::EC_ABORT;
}
const auto &tableName = _scanInitParamR->tableName;
auto pool = _queryMemPoolR->getPool().get();
ConditionParser parser(pool);
ConditionPtr condition;
const auto &conditionInfo = _scanInitParamR->calcInitParamR->conditionJson;
if (!parser.parseCondition(conditionInfo, condition)) {
SQL_LOG(ERROR,
"table name [%s], parse condition [%s] failed",
tableName.c_str(),
conditionInfo.c_str());
return navi::EC_ABORT;
}
if (_indexPartitionReaderWrapper->getFieldMetaReadersMap() != nullptr) {
_fieldMetaReaderWrapper = std::make_shared<suez::turing::FieldMetaReaderWrapper>(
pool, _indexPartitionReaderWrapper->getFieldMetaReadersMap());
}
isearch::search::LayerMetaPtr layerMeta = createLayerMeta();
SQL_LOG(DEBUG, "layer meta: %s", layerMeta->toString().c_str());
const auto &tableSortDescription = _ha3TableInfoR->getTableSortDescMap();
auto iter = tableSortDescription.find(tableName);
if (iter != tableSortDescription.end()) {
SQL_LOG(DEBUG, "begin reduce docid range optimize");
DocIdRangesReduceOptimize optimize(iter->second, _scanInitParamR->fieldInfos);
if (condition) {
condition->accept(&optimize);
}
layerMeta = optimize.reduceDocIdRange(layerMeta, pool, _indexPartitionReaderWrapper);
SQL_LOG(DEBUG,
"after reduce docid range optimize, layer meta: %s",
layerMeta->toString().c_str());
} else {
SQL_LOG(DEBUG, "not find table [%s] sort description", tableName.c_str());
}
if (layerMeta) {
layerMeta = splitLayerMetaByStep(pool,
layerMeta,
_scanInitParamR->parallelIndex,
_scanInitParamR->parallelNum,
_scanInitParamR->parallelBlockCount);
if (!layerMeta) {
SQL_LOG(WARN, "table name [%s], split layer meta failed.", tableName.c_str());
return navi::EC_ABORT;
}
layerMeta->quotaMode = QM_PER_DOC;
proportionalLayerQuota(*layerMeta.get());
SQL_LOG(
DEBUG, "after proportional layer quota, layer meta: %s", layerMeta->toString().c_str());
}
Ha3ScanConditionVisitorParam param;
param.attrExprCreator = _attributeExpressionCreator;
param.indexPartitionReaderWrapper = _indexPartitionReaderWrapper;
param.analyzerFactory = _analyzerFactoryR->getFactory().get();
param.indexInfo = &_scanInitParamR->indexInfos;
param.pool = pool;
param.queryInfo = &_ha3QueryInfoR->getQueryInfo();
param.indexInfos = _tableInfo->getIndexInfos();
param.mainTableName = tableName;
param.timeoutTerminator = _timeoutTerminatorR->getTimeoutTerminator();
param.layerMeta = layerMeta.get();
param.modelConfigMap = &_modelConfigMapR->getModelConfigMap();
param.udfToQueryManager = _udfToQueryManagerR->getManager().get();
param.forbidIndexs = &_scanInitParamR->forbidIndexs;
Ha3ScanConditionVisitor visitor(param);
if (condition) {
condition->accept(&visitor);
}
if (visitor.isError()) {
SQL_LOG(WARN,
"table name [%s], create scan iterator failed, error info [%s]",
tableName.c_str(),
visitor.errorInfo().c_str());
return navi::EC_ABORT;
}
isearch::common::QueryPtr query(visitor.stealQuery());
if (query) {
SQL_LOG(TRACE1, "query [%s]", query->toString().c_str());
isearch::common::QueryFlatten queryFlatten;
queryFlatten.flatten(query.get());
isearch::common::Query *flattenQuery = queryFlatten.stealQuery();
assert(flattenQuery);
query.reset(flattenQuery);
SQL_LOG(TRACE1, "after flat query [%s]", query->toString().c_str());
auto tabletSchema = _indexPartitionReaderWrapper->getSchema();
if (!tabletSchema) {
SQL_LOG(ERROR, "get index schema failed from table [%s]", tableName.c_str());
return navi::EC_ABORT;
}
termCombine(query, tabletSchema);
if (!initTruncateDesc(query)) {
return navi::EC_ABORT;
}
if (!_scanInitParamR->matchType.empty()) {
SQL_LOG(TRACE1, "before sub match query");
auto matchDataLabel = getMatchDataLabel();
subMatchQuery(query, tabletSchema, matchDataLabel);
SQL_LOG(TRACE1, "after sub match query [%s]", query->toString().c_str());
}
}
suez::turing::AttributeExpression *attrExpr = visitor.stealAttributeExpression();
isearch::search::FilterWrapperPtr filterWrapper;
if (attrExpr || !_scanInitParamR->auxTableName.empty()) {
SQL_LOG(
TRACE1, "condition expr [%s]", attrExpr ? attrExpr->getOriginalString().c_str() : "");
if (!createJoinDocIdConverter()) {
return navi::EC_ABORT;
}
bool ret = createFilterWrapper(attrExpr,
visitor.getQueryExprs(),
_attributeExpressionCreator->getJoinDocIdConverterCreator(),
filterWrapper);
if (!ret) {
SQL_LOG(WARN,
"table name [%s], create filter wrapper failed, exprStr: [%s]",
_scanInitParamR->tableName.c_str(),
attrExpr->getOriginalString().c_str());
return navi::EC_ABORT;
}
}
_createScanIteratorInfo.query = query;
_createScanIteratorInfo.filterWrapper = filterWrapper;
_createScanIteratorInfo.queryExprs = visitor.getQueryExprs();
_createScanIteratorInfo.layerMeta = layerMeta;
_baseScanIteratorInfo = _createScanIteratorInfo;
return navi::EC_NONE;
}