aios/sql/ops/planTransform/GraphTransform.cpp (1,088 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "sql/ops/planTransform/GraphTransform.h"
#include <algorithm>
#include <assert.h>
#include <cstddef>
#include <cstdint>
#include <ext/alloc_traits.h>
#include <rapidjson/document.h>
#include <utility>
#include "autil/DataBuffer.h"
#include "autil/EnvUtil.h"
#include "autil/StringUtil.h"
#include "autil/TimeUtility.h"
#include "autil/legacy/exception.h"
#include "autil/legacy/legacy_jsonizable.h"
#include "ha3/util/TypeDefine.h"
#include "iquan/common/Common.h"
#include "iquan/config/ExecConfig.h"
#include "iquan/jni/IquanDqlRequest.h"
#include "iquan/jni/SqlPlan.h"
#include "navi/builder/GraphBuilder.h"
#include "navi/builder/GraphDesc.h"
#include "navi/proto/GraphDef.pb.h"
#include "sql/common/Log.h"
#include "sql/common/TableDistribution.h"
#include "sql/common/WatermarkType.h"
#include "sql/common/common.h"
#include "sql/ops/planTransform/TableScanOpUtil.h"
namespace iquan {
class DynamicParams;
} // namespace iquan
using namespace std;
using namespace autil;
using namespace autil::legacy;
using namespace navi;
using namespace iquan;
namespace sql {
static std::string TABLE_TYPE_UNKNOWN = "unknown_type";
namespace plan {
AUTIL_LOG_SETUP(plan, DelayDpInfo);
GraphRoot *GraphRoot::getRoot() {
return _parent == nullptr ? this : _parent;
}
void GraphRoot::addInput(GraphRoot *p) {
_inputs.insert(p);
}
void GraphRoot::removeInput(GraphRoot *p) {
_inputs.erase(p);
}
const std::set<GraphRoot *> &GraphRoot::getInputs() const {
return _inputs;
}
void GraphRoot::setOutput(GraphRoot *p) {
_output = p;
}
GraphRoot *GraphRoot::getOutput() const {
return _output;
}
const std::vector<GraphRoot *> &GraphRoot::getLeafs() const {
return _leafs;
}
void GraphRoot::addLeaf(GraphRoot *leaf) {
assert(_parent == nullptr);
leaf->_parent = this;
_leafs.push_back(leaf);
}
navi::GraphId GraphRoot::getGraphId() const {
return _graphId;
}
void GraphRoot::setGraphId(navi::GraphId id) {
_graphId = id;
}
navi::GraphBuilder *GraphRoot::getBuilder() const {
return _builder;
}
void GraphRoot::setBuilder(navi::GraphBuilder *p) {
_builder = p;
}
bool GraphRoot::getInlineMode() const {
return _inlineMode;
}
void GraphRoot::setInlineMode(bool mode) {
_inlineMode = mode;
}
const std::string &GraphRoot::getBizName() const {
return _bizName;
}
void GraphRoot::setBizName(const std::string &name) {
_bizName = name;
}
const std::string &GraphRoot::getCurDist() const {
return _curDist;
}
void GraphRoot::setCurDist(const std::string &dist) {
_curDist = dist;
}
const std::string &GraphRoot::getRemoteDist() const {
return _remoteDist;
}
void GraphRoot::setRemoteDist(const std::string &dist) {
_remoteDist = dist;
}
bool GraphRoot::canDelayDp() const {
return _delayDp;
}
bool GraphRoot::canRemoteDelayDp() const {
return _remoteDelayDp;
}
void GraphRoot::setRemoteDelayDp(bool delay) {
_remoteDelayDp = delay;
}
navi::GraphDef *GraphRoot::getGraphDef() const {
return _graphDef.get();
}
void GraphRoot::ownGraphDef(navi::GraphDef *p) {
_graphDef.reset(p);
}
void GraphRoot::mergeGraphInfo() {
assert(_parent == nullptr);
if (_output != nullptr) {
_output = _output->getRoot();
}
if (!_leafs.empty()) {
set<GraphRoot *> mergedInputs = _inputs;
for (auto p : _leafs) {
auto &inputs = p->_inputs;
mergedInputs.insert(inputs.begin(), inputs.end());
}
_inputs.swap(mergedInputs);
}
if (!_inputs.empty()) {
_delayDp = all_of(_inputs.begin(), _inputs.end(), [](auto p) { return p->_remoteDelayDp; });
}
}
void GraphRoot::mergeRoot(GraphRoot *a, GraphRoot *b) {
if (a->_leafs.size() < b->_leafs.size()) {
mergeRoot(b, a);
return;
}
a->addLeaf(b);
for (auto p : b->_leafs) {
a->addLeaf(p);
}
b->_leafs.clear();
}
PlanNode *PlanNode::accept(NodeVisitor &visitor) {
return visitor.visitNode(*this);
}
void PlanNode::addBuildInput(PlanNode &node, const navi::P &p) {
assert(false);
}
const std::string &PlanNode::getMergedName() const {
if (nodeNames.size() == 1u) {
return nodeNames[0];
} else {
return mergedName;
}
}
PlanNode *ScanNode::accept(NodeVisitor &visitor) {
return visitor.visitScan(*this);
}
PlanNode *ExchangeNode::accept(NodeVisitor &visitor) {
return visitor.visitExchange(*this);
}
void ExchangeNode::addBuildInput(PlanNode &node, const navi::P &p) {
auto ret = output2buildInputs.try_emplace(&node);
ret.first->second.emplace_back(p);
}
DelayDpInfo::DelayDpInfo(GraphRoot *root)
: _root(root) {}
void DelayDpInfo::addInputExchange(ExchangeNode *p) {
_inputExs.push_back(p);
}
void DelayDpInfo::addOutputExchange(ExchangeNode *p) {
_outputExs.push_back(p);
}
bool DelayDpInfo::prepareSubGraph() {
if (_root->getRoot() != _root || _root->getGraphDef() == nullptr) {
SQL_LOG(WARN, "bug: invalid graph root");
return false;
}
auto builder = _root->getBuilder();
builder->subGraph(_root->getGraphId());
auto inputSize = _inputExs.size();
_inputNames.reserve(inputSize);
_inputDists.reserve(inputSize);
for (size_t i = 0; i < inputSize; ++i) {
auto p = _inputExs[i];
auto phName = "ph_" + StringUtil::toString(i);
_inputNames.push_back(phName);
_inputDists.push_back(p->root->getRemoteDist());
auto buildOutput
= builder->node(phName).kernel(PLACEHOLDER_KERNEL_NAME).out(PLACEHOLDER_OUTPUT_PORT);
for (const auto &inputPair : p->output2buildInputs) {
for (const auto &buildInput : inputPair.second) {
buildOutput.to(buildInput).require(true);
}
}
}
auto outputSize = _outputExs.size();
_outputNames.reserve(outputSize);
for (size_t i = 0; i < outputSize; ++i) {
auto p = _outputExs[i];
auto exInputSize = p->input2buildOutput.size();
if (exInputSize != 1u) {
SQL_LOG(WARN, "bug: invalid exchange input size: %ld", exInputSize);
return false;
}
auto input = p->input2buildOutput.begin()->first;
_outputNames.push_back(input->getMergedName());
}
initWrapperOpAttr();
return true;
}
void DelayDpInfo::initWrapperOpAttr() {
DataBuffer buffer;
buffer.write(_inputNames);
buffer.serializeToStringWithSection(_opAttrs[DELAY_DP_INPUT_NAMES_ATTR]);
buffer.clear();
buffer.write(_inputDists);
buffer.serializeToStringWithSection(_opAttrs[DELAY_DP_INPUT_DISTS_ATTR]);
buffer.clear();
buffer.write(_outputNames);
buffer.serializeToStringWithSection(_opAttrs[DELAY_DP_OUTPUT_NAMES_ATTR]);
buffer.clear();
auto subDef = _root->getGraphDef();
subDef->SerializeToString(&(_opAttrs[DELAY_DP_GRAPH_ATTR]));
}
std::map<std::string, std::string> &DelayDpInfo::getWrapperOpAttrs() {
return _opAttrs;
}
bool DelayDpInfo::replaceSubGraph(plan::PlanNode &node,
navi::P buildInput,
navi::P buildOutput,
const std::function<GraphRoot *()> &rootCreator) {
node.inputMap[DEFAULT_INPUT_PORT].assign(_inputExs.begin(), _inputExs.end());
for (auto input : _inputExs) {
input->root->setRemoteDist(node.root->getCurDist());
input->output2buildInputs.clear();
auto ret = input->output2buildInputs.try_emplace(&node);
ret.first->second.emplace_back(buildInput.autoNext());
}
for (auto output : _outputExs) {
auto newRoot = rootCreator();
if (newRoot == nullptr) {
SQL_LOG(WARN, "create new GraphRoot failed");
return false;
}
node.root->addLeaf(newRoot);
newRoot->setBizName(node.root->getBizName());
newRoot->setCurDist(node.root->getCurDist());
newRoot->setRemoteDist(output->root->getRemoteDist());
output->root = newRoot;
output->inputMap[DEFAULT_INPUT_PORT] = {&node};
output->input2buildOutput.clear();
output->input2buildOutput.try_emplace(&node, buildOutput.autoNext());
}
return true;
}
} // namespace plan
AUTIL_LOG_SETUP(sql, GraphTransform);
const string GraphTransform::EMPTY_STRING = "";
GraphTransformEnv::GraphTransformEnv() {
disableWatermark = autil::EnvUtil::getEnv("disableWatermark", disableWatermark);
useQrsTimestamp = autil::EnvUtil::getEnv("useQrsTimestamp", useQrsTimestamp);
}
GraphTransformEnv::~GraphTransformEnv() {}
const GraphTransformEnv &GraphTransformEnv::get() {
static GraphTransformEnv instance;
return instance;
}
GraphTransform::Config::Config(const iquan::ExecConfig &execConfig,
const iquan::IquanDqlRequest &request,
const iquan::DynamicParams &innerDynamicParams)
: _execConfig(execConfig)
, _request(request)
, _innerDynamicParams(innerDynamicParams)
, parallelNum(_execConfig.parallelConfig.parallelNum)
, resultAllowSoftFailure(_execConfig.resultAllowSoftFailure)
, sourceId(getRequestParam(IQUAN_EXEC_SOURCE_ID))
, sourceSpec(getRequestParam(IQUAN_EXEC_SOURCE_SPEC))
, taskQueue(getRequestParam(IQUAN_EXEC_TASK_QUEUE))
, otherRunGraphParam(getRequestParam(IQUAN_EXEC_USER_KV))
, qrsBizName(getRequestParam(IQUAN_EXEC_QRS_BIZ_NAME, execConfig.thisBizName))
, searcherBizName(getRequestParam(IQUAN_EXEC_SEARCHER_BIZ_NAME))
, leaderPreferLevel(getRequestParam(IQUAN_EXEC_LEADER_PREFER_LEVEL)) {
{
auto inlineWorker = getRequestParam(IQUAN_EXEC_INLINE_WORKER);
if (inlineWorker == "qrs") {
qrsGraphInline = true;
} else if (inlineWorker == "searcher") {
searcherGraphInline = true;
} else if (inlineWorker == "all") {
qrsGraphInline = true;
searcherGraphInline = true;
}
}
{
const auto &tables = _execConfig.parallelConfig.parallelTables;
parallelTables.insert(tables.begin(), tables.end());
}
{
auto tmp = getRequestParam(IQUAN_EXEC_OVERRIDE_TO_LOGIC_TABLE);
auto tokens = StringUtil::split(tmp, ",");
logicTableOps.insert(tokens.begin(), tokens.end());
}
params = &_request.dynamicParams;
innerParams = &_innerDynamicParams;
}
const std::string &GraphTransform::Config::getRequestParam(const std::string &key,
const std::string &defaultValue) const {
const auto &execParams = _request.execParams;
auto iter = execParams.find(key);
if (iter != execParams.end()) {
return iter->second;
} else {
return defaultValue;
}
}
static autil::legacy::RapidValue *getJsonValue(iquan::PlanOp &op, const string &key) {
if (op.jsonAttrs == nullptr) {
return nullptr;
}
auto &jsonAttrs = *op.jsonAttrs;
if (!jsonAttrs.IsObject()) {
return nullptr;
}
auto iter = jsonAttrs.FindMember(key);
if (iter != jsonAttrs.MemberEnd()) {
return &(iter->value);
} else {
return nullptr;
}
}
static string getJsonStringValue(iquan::PlanOp &op, const string &key) {
auto p = getJsonValue(op, key);
if (p != nullptr && p->IsString()) {
return p->GetString();
} else {
return GraphTransform::EMPTY_STRING;
}
}
GraphTransform::GraphTransform(const Config &config)
: _config(config) {}
GraphTransform::~GraphTransform() {
for (auto &pair : _id2node) {
delete pair.second;
}
for (auto p : _subRoots) {
delete p;
}
for (auto p : _builders) {
delete p;
}
}
static string createDefaultTableDistribution() {
TableDistribution dist;
dist.partCnt = 1;
return ToJsonString(dist, true);
}
static const string ROOT_GRAPH_TABLE_DISTRIBUTION = createDefaultTableDistribution();
bool GraphTransform::sqlPlan2Graph(iquan::SqlPlan &sqlPlan,
navi::GraphDef &graph,
std::vector<std::string> &outputPorts,
std::vector<std::string> &outputNodes) {
auto rootNode = preparePlanNode(sqlPlan);
auto root = addGraphRoot();
if (root == nullptr) {
return false;
}
if (_config.qrsGraphInline) {
root->setInlineMode(true);
}
_rootBizName = getQrsBizName();
root->setBizName(_rootBizName);
root->setCurDist(ROOT_GRAPH_TABLE_DISTRIBUTION);
if (!linkVisit(*rootNode, *root)) {
return false;
}
_rootBuilder = new GraphBuilder(&graph);
_builders.push_back(_rootBuilder);
root->mergeGraphInfo();
_rootGraphId = newSubGraph(*root, _rootBuilder);
rootNode = buildVisit(*rootNode);
if (rootNode == nullptr) {
return false;
}
if (!buildDelayDpGraph()) {
SQL_LOG(WARN, "buildDelayDpGraph failed");
return false;
}
addExchangeBorder();
outputNodes = rootNode->nodeNames;
outputPorts.assign(outputNodes.size(), DEFAULT_OUTPUT_PORT);
return true;
}
GraphTransform::ErrorCode GraphTransform::getErrorCode() const {
return _errorCode;
}
plan::PlanNode *GraphTransform::visitNode(plan::PlanNode &node) {
SQL_LOG(TRACE3, "visit common nodeId:%ld", node.op->id);
switch (node.inputMap.size()) {
case 0:
buildNode(node, 1);
return &node;
case 1:
return buildSingleInputNode(node);
default:
return buildMultiInputNode(node);
}
}
plan::PlanNode *GraphTransform::visitScan(plan::ScanNode &node) {
SQL_LOG(TRACE3, "visit scan nodeId:%ld", node.op->id);
addTargetWatermark(node);
auto parallel = getScanParallel(*(node.op));
buildNode(node, parallel);
return &node;
}
plan::PlanNode *GraphTransform::visitExchange(plan::ExchangeNode &node) {
SQL_LOG(TRACE3, "visit exchange nodeId:%ld", node.op->id);
auto childIter = node.inputMap.begin();
if (childIter == node.inputMap.end()) {
_errorCode = ErrorCode::LACK_NODE_INPUT;
SQL_LOG(WARN, "exchange nodeId:%ld has no input", node.op->id);
return nullptr;
}
auto &inputNodes = childIter->second;
if (inputNodes.empty()) {
_errorCode = ErrorCode::LACK_NODE_INPUT;
SQL_LOG(WARN, "exchange nodeId:%ld has no input", node.op->id);
return nullptr;
}
auto curBuilder = _builder;
auto curGraphId = curBuilder->currentSubGraph();
auto root = node.root->getRoot();
auto graphId = root->getGraphId();
if (graphId == navi::INVALID_GRAPH_ID) {
root->mergeGraphInfo();
if (root->canDelayDp()) {
newDelayDpSubGraph(*root);
} else if (root->getBizName() != _rootBizName) {
newSubGraph(*root, _rootBuilder);
} else {
switchToSubGraph(_rootGraphId, _rootBuilder);
}
} else {
switchToSubGraph(graphId, root->getBuilder());
}
auto p = buildVisit(*(inputNodes[0]));
if (p == nullptr) {
return nullptr;
}
buildMergedNode(*p);
auto buildOutput = _builder->node(p->getMergedName()).out(DEFAULT_OUTPUT_PORT);
node.input2buildOutput.try_emplace(p, buildOutput);
switchToSubGraph(curGraphId, curBuilder);
_exchangeNodes.push_back(&node);
addDelayDpInfo(node);
return &node;
}
std::string GraphTransform::getQrsBizName() {
return _config.qrsBizName;
}
std::string GraphTransform::getRemoteBizName(iquan::PlanOp &op) {
if (_config.searcherBizName.empty()) {
auto name = getJsonStringValue(op, "node_name");
if (name.empty()) {
name = getJsonStringValue(op, "db_name");
if (name == SQL_DEFAULT_EXTERNAL_DATABASE_NAME) {
return getQrsBizName();
}
}
return name + "." + isearch::DEFAULT_SQL_BIZ_NAME;
} else {
return _config.searcherBizName;
}
}
std::string GraphTransform::getJsonSegment(iquan::PlanOp &op, const char *key) {
auto p = getJsonValue(op, key);
if (p == nullptr) {
return EMPTY_STRING;
}
return op.jsonAttr2Str(_config.params, _config.innerParams, p);
}
size_t GraphTransform::getScanParallel(iquan::PlanOp &op) {
if (_config.parallelNum > 1) {
if (_config.parallelTables.empty()) {
return _config.parallelNum;
}
auto tableName = getJsonStringValue(op, SCAN_TABLE_NAME_ATTR);
if (_config.parallelTables.count(tableName) > 0) {
return _config.parallelNum;
}
}
return 1;
}
plan::PlanNode *GraphTransform::preparePlanNode(iquan::SqlPlan &sqlPlan) {
size_t nodeNum = sqlPlan.opList.size();
if (nodeNum == 0u) {
return nullptr;
}
_id2node.reserve(nodeNum * 2);
plan::PlanNode *node = nullptr;
for (auto &op : sqlPlan.opList) {
preprocessOp(op);
node = createPlanNode(op);
if (node == nullptr) {
return nullptr;
}
}
return node;
}
void GraphTransform::preprocessOp(iquan::PlanOp &op) {
preprocessOpName(op);
preprocessOpInputs(op);
}
void GraphTransform::preprocessOpName(iquan::PlanOp &op) {
op.patchInt64Attrs[iquan::OP_ID] = iquan::BASE_OP_ID + op.id;
op.patchStrAttrs[IQUAN_EXEC_ATTR_SOURCE_SPEC] = _config.sourceSpec;
auto &opName = op.opName;
if (opName == "TableScanOp") {
std::string tableType = TABLE_TYPE_UNKNOWN;
auto it = op.jsonAttrs->FindMember("table_type");
if (it != op.jsonAttrs->MemberEnd()) {
if (it->value.IsString()) {
tableType = it->value.GetString();
}
}
opName = TableScanOpUtil::getScanKernelName(tableType);
SQL_LOG(TRACE3, "get kernel[%s] for table type[%s]", opName.c_str(), tableType.c_str());
} else if (opName == "AggregateOp") {
opName = "sql.AggKernel";
} else if (opName == "TableFunctionScanOp") {
opName = "sql.TvfKernel";
} else if (opName == "LookupJoinOp") {
std::string tableType = TABLE_TYPE_UNKNOWN;
auto it = op.jsonAttrs->FindMember("build_node");
if (it != op.jsonAttrs->MemberEnd() && it->value.IsObject()) {
const auto &valueMap = it->value;
auto typeIt = valueMap.FindMember("table_type");
if (typeIt != valueMap.MemberEnd() && typeIt->value.IsString()) {
tableType = typeIt->value.GetString();
}
}
if (IQUAN_TABLE_TYPE_EXTERNAL == tableType) {
opName = "sql.ExternalLookupJoinKernel";
} else {
opName = "sql.LookupJoinKernel";
}
} else {
if (autil::StringUtil::endsWith(opName, "Op")) {
opName = opName.substr(0, opName.size() - 2) + "Kernel";
}
opName = "sql." + opName;
}
if (_config.logicTableOps.count(opName) > 0) {
op.opName = "sql.LogicalTableScanKernel";
op.patchStrAttrs["table_type"] = "logical";
}
}
void GraphTransform::preprocessOpInputs(iquan::PlanOp &op) {
auto &inputs = op.inputs;
if (inputs.size() == 1 && inputs.find("input0") == inputs.end()) {
auto iter = inputs.begin();
inputs["input0"] = iter->second;
inputs.erase(iter);
}
}
plan::PlanNode *GraphTransform::createPlanNode(iquan::PlanOp &op) {
auto p = createPlanNode(op.id, op.opName);
if (p == nullptr) {
return nullptr;
}
p->op = &op;
return p;
}
plan::PlanNode *GraphTransform::createPlanNode(const std::string &opName) {
++_maxNodeId;
return createPlanNode(_maxNodeId, opName);
}
plan::PlanNode *GraphTransform::createPlanNode(size_t id, const std::string &opName) {
auto &p = _id2node[id];
if (p != nullptr) {
_errorCode = ErrorCode::DUPLICATE_NODE_ID;
SQL_LOG(WARN, "duplicate node id:%ld, opName:%s", id, opName.c_str());
return nullptr;
}
if (opName == SCAN_OP || opName == ASYNC_SCAN_OP) {
p = new plan::ScanNode();
} else if (opName == EXCHANGE_OP) {
p = new plan::ExchangeNode();
} else {
p = new plan::PlanNode();
}
if (id > _maxNodeId) {
_maxNodeId = id;
}
return p;
}
plan::GraphRoot *GraphTransform::addGraphRoot() {
auto root = new plan::GraphRoot();
_subRoots.push_back(root);
return root;
}
plan::PlanNode *GraphTransform::findNode(size_t nodeId) {
auto iter = _id2node.find(nodeId);
if (iter == _id2node.end()) {
_errorCode = ErrorCode::NODE_NOT_EXIST;
SQL_LOG(WARN, "nodeId:%ld not exist", nodeId);
return nullptr;
}
return iter->second;
}
plan::PlanNode *GraphTransform::linkVisit(size_t nodeId, plan::GraphRoot &root) {
auto p = findNode(nodeId);
if (p == nullptr) {
return nullptr;
}
return linkVisit(*p, root);
}
plan::PlanNode *GraphTransform::linkVisit(plan::PlanNode &node, plan::GraphRoot &root) {
auto nodeId = node.op->id;
plan::PlanNode *p = nullptr;
SQL_LOG(TRACE3, "link nodeId:%ld", nodeId);
switch (node.status) {
case plan::NodeStatus::LINKING:
_errorCode = ErrorCode::GRAPH_CIRCLE;
SQL_LOG(WARN, "nodeId:%ld is linking, detect graph circle", nodeId);
break;
case plan::NodeStatus::INIT:
SQL_LOG(TRACE3, "first link nodeId:%ld", nodeId);
node.status = plan::NodeStatus::LINKING;
node.root = &root;
p = linkInitNode(node, root);
node.status = plan::NodeStatus::LINKED;
break;
case plan::NodeStatus::LINKED:
SQL_LOG(TRACE3, "share link nodeId:%ld", nodeId);
p = linkSharedNode(node, root);
break;
default:
_errorCode = ErrorCode::UNKNOWN;
SQL_LOG(WARN, "unknown error");
break;
}
return p;
}
plan::PlanNode *GraphTransform::linkInitNode(plan::PlanNode &node, plan::GraphRoot &root) {
auto p = dynamic_cast<plan::ExchangeNode *>(&node);
if (p != nullptr) {
return linkExchangeNode(*p, root);
} else {
return linkCommonNode(node, root);
}
}
plan::PlanNode *GraphTransform::linkSharedNode(plan::PlanNode &node, plan::GraphRoot &curRoot) {
auto nodeId = node.op->id;
if (dynamic_cast<plan::ExchangeNode *>(&node) != nullptr) {
SQL_LOG(TRACE3, "exchange nodeId:%ld has share", nodeId);
return &node;
}
if (node.root == &curRoot) {
SQL_LOG(TRACE3, "nodeId:%ld shared in same graph", nodeId);
return &node;
}
auto curShared = curRoot.getRoot();
auto nodeShared = node.root->getRoot();
bool curEmptyLeaf = curShared->getLeafs().empty();
bool nodeEmptyLeaf = nodeShared->getLeafs().empty();
if (curEmptyLeaf == true && nodeEmptyLeaf == true) {
SQL_LOG(TRACE3, "nodeId:%ld and current root all have no share", nodeId);
curShared->addLeaf(nodeShared);
} else if (curEmptyLeaf == false && nodeEmptyLeaf == false) {
SQL_LOG(TRACE3, "nodeId:%ld and current root all have share", nodeId);
plan::GraphRoot::mergeRoot(curShared, nodeShared);
} else if (nodeEmptyLeaf == false) {
SQL_LOG(TRACE3, "only nodeId:%ld root has share, add current to it", nodeId);
nodeShared->addLeaf(curShared);
} else {
SQL_LOG(TRACE3, "only current root has share, add nodeId:%ld to it", nodeId);
curShared->addLeaf(nodeShared);
}
return &node;
}
plan::PlanNode *GraphTransform::linkExchangeNode(plan::ExchangeNode &node, plan::GraphRoot &root) {
SQL_LOG(TRACE3, "link exchange node");
auto subRoot = addGraphRoot();
root.addInput(subRoot);
subRoot->setOutput(&root);
if (subRoot == nullptr) {
return nullptr;
}
if (_config.searcherGraphInline) {
subRoot->setInlineMode(true);
}
auto &op = *(node.op);
subRoot->setBizName(getRemoteBizName(op));
subRoot->setCurDist(getJsonSegment(op, "table_distribution"));
auto remoteDist = getJsonSegment(op, "output_distribution");
if (remoteDist.empty()) {
remoteDist = root.getCurDist();
}
subRoot->setRemoteDist(remoteDist);
auto remoteDelayDp = getJsonSegment(op, "output_prunable");
if (remoteDelayDp == "1") {
SQL_LOG(TRACE3, "enable remote_delay_deploy, node:%ld", op.id);
subRoot->setRemoteDelayDp(true);
}
node.root = subRoot;
return linkCommonNode(node, *subRoot);
}
plan::PlanNode *GraphTransform::linkCommonNode(plan::PlanNode &node, plan::GraphRoot &root) {
SQL_LOG(TRACE3, "link common node");
for (const auto &pair : node.op->inputs) {
const auto &inputIds = pair.second;
auto &inputNodes = node.inputMap[pair.first];
inputNodes.reserve(inputIds.size());
for (auto nodeId : inputIds) {
auto p = linkVisit(nodeId, root);
if (p == nullptr) {
return nullptr;
}
inputNodes.push_back(p);
}
}
return &node;
}
plan::PlanNode *GraphTransform::buildVisit(plan::PlanNode &node) {
auto nodeId = node.op->id;
plan::PlanNode *p = nullptr;
switch (node.status) {
case plan::NodeStatus::LINKED:
SQL_LOG(TRACE3, "visit build nodeId:%ld", nodeId);
node.status = plan::NodeStatus::BUILDING;
p = node.accept(*this);
node.status = plan::NodeStatus::FINISH;
break;
case plan::NodeStatus::FINISH:
SQL_LOG(TRACE3, "share finish nodeId:%ld", nodeId);
p = &node;
break;
default:
_errorCode = ErrorCode::UNKNOWN;
SQL_LOG(WARN, "unknown error");
break;
}
return p;
}
void GraphTransform::buildNode(plan::PlanNode &node, size_t parallel) {
SQL_LOG(TRACE3, "build nodeId:%ld, parallel:%ld", node.op->id, parallel);
if (parallel <= 1u) {
auto nodeName = StringUtil::toString(node.op->id);
node.nodeNames = {nodeName};
buildNode(node, nodeName);
} else {
node.nodeNames.resize(parallel);
for (size_t i = 0; i < parallel; ++i) {
auto nodeName = StringUtil::toString(node.op->id) + "_" + StringUtil::toString(i);
node.nodeNames[i] = nodeName;
node.op->patchInt64Attrs[PARALLEL_NUM_ATTR] = parallel;
node.op->patchInt64Attrs[PARALLEL_INDEX_ATTR] = i;
buildNode(node, nodeName);
}
}
}
void GraphTransform::buildMergedNode(plan::PlanNode &node) {
auto parallel = node.nodeNames.size();
if (parallel <= 1u) {
return;
}
SQL_LOG(TRACE3, "merge parallel:%ld for nodeId:%ld", parallel, node.op->id);
node.mergedName = addMergedNode();
auto buildInput = _builder->node(node.mergedName).in(DEFAULT_INPUT_PORT);
for (size_t i = 0; i < parallel; ++i) {
buildEdge(node.nodeNames[i], buildInput.autoNext());
}
return;
}
std::string GraphTransform::addMergedNode() {
auto nodeName = UNION_OP + "_" + StringUtil::toString(_mergedNodeCount);
++_mergedNodeCount;
SQL_LOG(TRACE3, "add merge node:%s", nodeName.c_str());
_builder->node(nodeName).kernel(UNION_OP);
return nodeName;
}
void GraphTransform::buildNode(plan::PlanNode &node, const std::string &nodeName) {
_builder->node(nodeName)
.kernel(node.op->opName)
.jsonAttrs(node.op->jsonAttr2Str(_config.params, _config.innerParams))
.binaryAttrsFromMap(node.op->binaryAttrs);
}
plan::PlanNode *GraphTransform::buildSingleInputNode(plan::PlanNode &node) {
auto childIter = node.inputMap.begin();
const auto &inputNodes = childIter->second;
switch (inputNodes.size()) {
case 0:
buildNode(node, 1);
return &node;
case 1:
return buildSinglePlainInputNode(node);
default:
return buildSingleGroupInputNode(node);
}
}
plan::PlanNode *GraphTransform::buildSinglePlainInputNode(plan::PlanNode &node) {
SQL_LOG(TRACE3, "build single plain input node");
auto childIter = node.inputMap.begin();
const auto &inputPort = childIter->first;
auto p = buildVisit(*(childIter->second[0]));
if (p == nullptr) {
return nullptr;
}
auto inputSize = p->nodeNames.size();
if (inputSize > 1u) {
buildNode(node, inputSize);
for (size_t i = 0; i < inputSize; ++i) {
auto buildInput = _builder->node(node.nodeNames[i]).in(inputPort);
buildEdge(p->nodeNames[i], buildInput);
}
} else {
buildNode(node, 1);
auto buildInput = _builder->node(node.nodeNames[0]).in(inputPort);
if (inputSize == 1u) {
buildEdge(p->nodeNames[0], buildInput);
} else {
p->addBuildInput(node, buildInput);
}
}
return &node;
}
plan::PlanNode *GraphTransform::buildSingleGroupInputNode(plan::PlanNode &node) {
SQL_LOG(TRACE3, "build single group input node");
auto childIter = node.inputMap.begin();
const auto &inputPort = childIter->first;
auto &inputNodes = childIter->second;
for (auto p : inputNodes) {
if (buildVisit(*p) == nullptr) {
return nullptr;
}
}
buildNode(node, 1);
const auto &nodeName = node.nodeNames[0];
auto port = _builder->node(nodeName).in(inputPort);
auto groupSize = inputNodes.size();
for (size_t i = 0; i < groupSize; ++i) {
auto buildInput = port.autoNext();
auto p = inputNodes[i];
auto inputSize = p->nodeNames.size();
if (inputSize > 1u) {
buildMergedNode(*p);
}
if (inputSize == 0u) {
p->addBuildInput(node, buildInput);
} else {
buildEdge(p->getMergedName(), buildInput);
}
}
return &node;
}
plan::PlanNode *GraphTransform::buildMultiInputNode(plan::PlanNode &node) {
for (const auto &pair : node.inputMap) {
const auto &inputNodes = pair.second;
if (inputNodes.size() != 1u) {
_errorCode = ErrorCode::MULTI_GROUP_INPUT;
SQL_LOG(WARN, "detect multi group input, not supported");
return nullptr;
}
}
return buildMultiPlainInputNode(node);
}
plan::PlanNode *GraphTransform::buildMultiPlainInputNode(plan::PlanNode &node) {
SQL_LOG(TRACE3, "build multi plain input node");
struct InputInfo {
const string &inputPort;
plan::PlanNode *p;
size_t parallel;
InputInfo(const std::string &inputPort_, plan::PlanNode *p_, size_t parallel_)
: inputPort(inputPort_)
, p(p_)
, parallel(parallel_) {}
};
vector<InputInfo> inputInfos;
inputInfos.reserve(node.inputMap.size());
size_t maxParallel = 1;
for (auto &pair : node.inputMap) {
auto p = buildVisit(*(pair.second[0]));
if (p == nullptr) {
return nullptr;
}
size_t inputParallel = p->nodeNames.size();
if (maxParallel > 1 && inputParallel > 1) {
buildMergedNode(*p);
inputParallel = 1;
}
inputInfos.emplace_back(pair.first, p, inputParallel);
maxParallel = (inputParallel > maxParallel ? inputParallel : maxParallel);
}
buildNode(node, maxParallel);
unordered_map<plan::PlanNode *, vector<navi::P>> node2merge;
for (size_t i = 0; i < maxParallel; ++i) {
auto buildNode = _builder->node(node.nodeNames[i]);
for (const auto &inputInfo : inputInfos) {
auto buildInput = buildNode.in(inputInfo.inputPort);
if (inputInfo.parallel == 1u) {
buildEdge(inputInfo.p->getMergedName(), buildInput);
} else if (inputInfo.parallel == 0u) {
auto ret = node2merge.try_emplace(inputInfo.p);
ret.first->second.emplace_back(buildInput);
} else {
buildEdge(inputInfo.p->nodeNames[i], buildInput);
}
}
}
for (const auto &pair : node2merge) {
auto p = pair.first;
const auto &buildInputs = pair.second;
auto finalBuildInput = buildInputs[0];
if (buildInputs.size() > 1u) {
const auto &mergedName = addMergedNode();
for (const auto &buildInput : buildInputs) {
buildEdge(mergedName, buildInput);
}
finalBuildInput = _builder->node(mergedName).in(DEFAULT_INPUT_PORT).autoNext();
}
p->addBuildInput(node, finalBuildInput);
}
return &node;
}
void GraphTransform::switchToSubGraph(plan::PlanNode &node) {
auto root = node.root->getRoot();
switchToSubGraph(root->getGraphId(), root->getBuilder());
}
void GraphTransform::switchToSubGraph(navi::GraphId id, navi::GraphBuilder *builder) {
SQL_LOG(TRACE3, "switchToSubGraph: builder: %p, graphId: %d", builder, id);
_builder = builder;
builder->subGraph(id);
}
navi::GraphId GraphTransform::newSubGraph(plan::GraphRoot &root, navi::GraphBuilder *builder) {
const auto &bizName = root.getBizName();
_builder = builder;
auto graphId = _builder->newSubGraph(bizName);
builder->errorHandleStrategy(_config.resultAllowSoftFailure ? EHS_ERROR_AS_EOF
: EHS_ERROR_AS_FATAL);
SQL_LOG(
TRACE3, "new subGraph: %s, builder: %p, graphId: %d", bizName.c_str(), builder, graphId);
root.setGraphId(graphId);
root.setBuilder(_builder);
if (root.getInlineMode()) {
_builder->inlineMode(true);
}
const auto &curDist = root.getCurDist();
if (!curDist.empty()) {
_builder->subGraphAttr("table_distribution", curDist);
}
return graphId;
}
navi::GraphId GraphTransform::newDelayDpSubGraph(plan::GraphRoot &root) {
auto p = new navi::GraphDef();
root.ownGraphDef(p);
auto builder = new GraphBuilder(p);
_builders.push_back(builder);
return newSubGraph(root, builder);
}
void GraphTransform::addDelayDpInfo(plan::ExchangeNode &node) {
auto curRoot = node.root->getRoot();
if (curRoot->canDelayDp()) {
auto &info = getDelayDpInfo(*curRoot);
info.addOutputExchange(&node);
SQL_LOG(TRACE3, "delay dp info %p add output node %ld", &info, node.op->id);
}
auto outputRoot = node.root->getOutput();
if (!outputRoot) {
return;
}
outputRoot = outputRoot->getRoot();
if (outputRoot->canDelayDp()) {
auto &info = getDelayDpInfo(*outputRoot);
info.addInputExchange(&node);
SQL_LOG(TRACE3, "delay dp info %p add input node %ld", &info, node.op->id);
}
}
plan::DelayDpInfo &GraphTransform::getDelayDpInfo(plan::GraphRoot &root) {
auto p = &root;
auto ret = _delayDpInfoMap.try_emplace(p, p);
auto &info = ret.first->second;
if (ret.second) {
_delayDpInfos.push_back(&info);
}
return info;
}
bool GraphTransform::buildDelayDpGraph() {
size_t newNodeCount = 0;
auto rootRoot = _subRoots[0]->getRoot();
for (auto info : _delayDpInfos) {
SQL_LOG(TRACE3, "build delay dp %p begin", info);
if (!info->prepareSubGraph()) {
return false;
}
switchToSubGraph(_rootGraphId, _rootBuilder);
auto newNodeName = DELAY_DP_OP + "_" + StringUtil::toString(newNodeCount);
++newNodeCount;
auto &opAttrs = info->getWrapperOpAttrs();
if (_config.debug) {
opAttrs[DELAY_DP_DEBUG_ATTR] = "1";
} else {
opAttrs[DELAY_DP_DEBUG_ATTR] = "0";
}
auto newNode = _builder->node(newNodeName).kernel(DELAY_DP_OP).binaryAttrsFromMap(opAttrs);
auto p = createPlanNode(DELAY_DP_OP);
if (p == nullptr) {
SQL_LOG(WARN, "allocate plan node failed");
return false;
}
p->status = plan::NodeStatus::FINISH;
p->root = rootRoot;
p->nodeNames = {newNodeName};
auto buildInput = newNode.in(DEFAULT_INPUT_PORT);
auto buildOutput = newNode.out(DEFAULT_OUTPUT_PORT);
auto rootCreator = [this]() -> plan::GraphRoot * { return this->addGraphRoot(); };
if (!info->replaceSubGraph(*p, buildInput, buildOutput, rootCreator)) {
return false;
}
SQL_LOG(TRACE3, "build delay dp graph end");
}
return true;
}
void GraphTransform::addExchangeBorder() {
_builder = _rootBuilder;
for (auto p : _exchangeNodes) {
auto &node = *p;
SQL_LOG(TRACE3, "link exchange border, nodeId:%ld", node.op->id);
auto graphId = node.root->getRoot()->getGraphId();
const auto &buildOutput = node.input2buildOutput.begin()->second;
for (const auto &pair : node.output2buildInputs) {
auto remoteGraphId = pair.first->root->getRoot()->getGraphId();
bool sameGraph = (graphId == remoteGraphId);
for (const auto &buildInput : pair.second) {
addExchangeBorder(buildOutput, node, buildInput, sameGraph);
}
}
}
}
void GraphTransform::addExchangeBorder(navi::P buildOutput,
plan::ExchangeNode &exchangeNode,
navi::P buildInput,
bool sameGraph) {
if (sameGraph) {
if (exchangeNode.root->getRoot()->getGraphId() == _rootGraphId) {
buildInput.from(buildOutput).require(true);
} else {
_builder->subGraph(_rootGraphId);
auto identityName = addMergedNode();
auto innerBuildOutput = _builder->node(identityName).out(DEFAULT_OUTPUT_PORT);
buildInput.from(innerBuildOutput).require(true).merge("sql.TableMergeKernel");
_builder->subGraphAttr("table_distribution", ROOT_GRAPH_TABLE_DISTRIBUTION);
auto innerBuildInput = _builder->node(identityName).in(DEFAULT_INPUT_PORT).autoNext();
addExchangeBorder(buildOutput, exchangeNode, innerBuildInput, false);
}
} else {
auto buildEdge = buildInput.from(buildOutput).require(true);
buildEdge.merge("sql.TableMergeKernel");
buildEdge.split("sql.TableSplitKernel")
.attr("table_distribution", exchangeNode.root->getRemoteDist());
}
}
void GraphTransform::addTargetWatermark(plan::ScanNode &node) {
static const bool disableWatermark = GraphTransformEnv::get().disableWatermark;
static const bool useQrsTimestamp = GraphTransformEnv::get().useQrsTimestamp;
const auto &leaderPreferLevel = _config.leaderPreferLevel;
int64_t watermark = 0;
if (disableWatermark || !autil::StringUtil::fromString(leaderPreferLevel, watermark)
|| watermark == 0) {
// disabled, pass
} else {
switch (watermark) {
case 1:
node.op->patchInt64Attrs["target_watermark"]
= useQrsTimestamp ? TimeUtility::currentTime() : 0;
node.op->patchInt64Attrs["target_watermark_type"] = WatermarkType::WM_TYPE_SYSTEM_TS;
break;
default:
node.op->patchInt64Attrs["target_watermark"] = watermark;
node.op->patchInt64Attrs["target_watermark_type"] = WatermarkType::WM_TYPE_MANUAL;
}
}
}
void GraphTransform::buildEdge(const std::string &outputNode, const navi::P &buildInput) {
_builder->node(outputNode).out(DEFAULT_OUTPUT_PORT).to(buildInput).require(true);
}
} // namespace sql