in velox/experimental/codegen/CodegenCompiledExpressionTransform.h [504:630]
std::shared_ptr<core::PlanNode> visitProjection(
const core::ProjectNode& projection,
const Children& children) {
std::vector<std::pair<size_t, GeneratedExpressionStruct>> generatedColumns;
std::shared_ptr<const core::PlanNode> source = *ranges::begin(children);
// Check for filter
std::optional<GeneratedExpressionStruct> filterExpr = std::nullopt;
auto filterNode = std::dynamic_pointer_cast<const core::FilterNode>(source);
if (compileFilter_ && mergeFilter_ && filterNode) {
if (auto filterCode = getGeneratedCode(filterNode->filter())) {
source = filterNode->sources()[0]; // change source to filter's source
filterExpr.emplace(filterCode.value());
}
}
// Collect all the columns with generated code.
for (size_t outputColumn = 0;
outputColumn < projection.projections().size();
++outputColumn) {
// TODO: We should exclude here direct access columns.
if (auto generatedCode =
getGeneratedCode(projection.projections()[outputColumn])) {
generatedColumns.push_back({outputColumn, generatedCode.value()});
}
}
VELOX_CHECK_EQ(projection.sources().size(), 1);
const auto& outputType = *projection.outputType().get();
std::shared_ptr<RowType> concatInputType;
std::shared_ptr<RowType> concatOutputType;
const std::string genCode = generateConcatExpr(
"ProjectExpr",
outputType,
filterExpr,
generatedColumns,
concatInputType,
concatOutputType);
if (concatInputType->size() == 0) {
// the whole expression doesn't take input, don't compile it
return utils::adapter::ProjectCopy::copyWith(
projection,
std::placeholders::_1,
std::placeholders::_1,
std::placeholders::_1,
*ranges::begin(children));
}
std::stringstream includes;
std::unordered_set<std::string> includeSet;
for (const auto& [columnIndex, expressionStruct] : generatedColumns) {
includeSet.insert(
expressionStruct.headers().begin(), expressionStruct.headers().end());
};
for (const auto& includePath : includeSet) {
includes << fmt::format("#include {}\n", includePath);
};
bool isDefaultNull, isDefaultNullStrict;
if (filterExpr) {
isDefaultNull = this->isDefaultNull(filterNode->id());
isDefaultNullStrict = this->isDefaultNullStrict(filterNode->id());
} else {
isDefaultNull = this->isDefaultNull(projection.id());
isDefaultNullStrict = this->isDefaultNullStrict(projection.id());
}
const std::string fileString = fmt::vformat(
fileFormat(),
fmt::make_format_args(
fmt::arg("includes", includes.str()),
fmt::arg("GeneratedCode", genCode),
fmt::arg("GeneratedCodeClass", "ProjectExpr"),
fmt::arg("isDefaultNull", isDefaultNull ? "true" : "false"),
fmt::arg(
"isDefaultNullStrict",
isDefaultNullStrict ? "true" : "false")));
auto compiledObject = codeManager_.compiler().compileString({}, fileString);
auto dynamicObject = codeManager_.compiler().link({}, {compiledObject});
std::vector<std::shared_ptr<const ITypedExpr>> newProjections;
// Extract the row input expression from the current projection
const auto inputType = projection.sources()[0]->outputType();
std::vector<std::shared_ptr<const ITypedExpr>> newExpressions =
buildCompiledCallExpr(
dynamicObject, concatOutputType, concatInputType, inputType);
// oldToNewExpressionColumnMap[Index] in the new projection list maps to
// projection.projections()[Index] in the old;
std::map<size_t, std::shared_ptr<const ITypedExpr>>
oldToNewExpressionColumnMap;
for (size_t newExpressionIdx = 0; newExpressionIdx < newExpressions.size();
++newExpressionIdx) {
auto oldColumnIndex = generatedColumns[newExpressionIdx].first;
oldToNewExpressionColumnMap[oldColumnIndex] =
newExpressions[newExpressionIdx];
}
// Replace the selected columns with new generated expressions
for (size_t outputColumn = 0;
outputColumn < projection.projections().size();
++outputColumn) {
if (auto expr = oldToNewExpressionColumnMap.find(outputColumn);
expr != oldToNewExpressionColumnMap.end()) {
newProjections.push_back(
std::dynamic_pointer_cast<const ITypedExpr>(expr->second));
} else {
newProjections.push_back(projection.projections()[outputColumn]);
}
}
// Build new projection node with newly generated expressions
return utils::adapter::ProjectCopy::copyWith(
projection,
std::placeholders::_1,
std::placeholders::_1,
std::move(newProjections),
source);
}