in velox/exec/MergeJoin.cpp [403:594]
RowVectorPtr MergeJoin::doGetOutput() {
// Check if we ran out of space in the output vector in the middle of the
// match.
if (leftMatch_ && leftMatch_->cursor) {
VELOX_CHECK(rightMatch_ && rightMatch_->cursor);
// Not all rows from the last match fit in the output. Continue producing
// results from the current match.
if (addToOutput()) {
return std::move(output_);
}
}
// There is no output-in-progress match, but there could be incomplete
// match.
if (leftMatch_) {
VELOX_CHECK(rightMatch_);
if (input_) {
// Look for continuation of a match on the left and/or right sides.
if (!findEndOfMatch(leftMatch_.value(), input_, leftKeys_)) {
// Continue looking for the end of the match.
input_ = nullptr;
return nullptr;
}
if (leftMatch_->inputs.back() == input_) {
index_ = leftMatch_->endIndex;
}
} else if (noMoreInput_) {
leftMatch_->complete = true;
} else {
// Need more input.
return nullptr;
}
if (rightInput_) {
if (!findEndOfMatch(rightMatch_.value(), rightInput_, rightKeys_)) {
// Continue looking for the end of the match.
rightInput_ = nullptr;
return nullptr;
}
if (rightMatch_->inputs.back() == rightInput_) {
rightIndex_ = rightMatch_->endIndex;
}
} else if (noMoreRightInput_) {
rightMatch_->complete = true;
} else {
// Need more input.
return nullptr;
}
}
// There is no output-in-progress match, but there can be a complete match
// ready for output.
if (leftMatch_) {
VELOX_CHECK(leftMatch_->complete);
VELOX_CHECK(rightMatch_ && rightMatch_->complete);
if (addToOutput()) {
return std::move(output_);
}
}
if (!input_ || !rightInput_) {
if (isLeftJoin(joinType_)) {
if (input_ && noMoreRightInput_) {
prepareOutput();
while (true) {
if (outputSize_ == outputBatchSize_) {
return std::move(output_);
}
addOutputRowForLeftJoin();
++index_;
if (index_ == input_->size()) {
// Ran out of rows on the left side.
input_ = nullptr;
return nullptr;
}
}
}
if (noMoreInput_ && output_) {
output_->resize(outputSize_);
return std::move(output_);
}
} else {
if (noMoreInput_ || noMoreRightInput_) {
if (output_) {
output_->resize(outputSize_);
return std::move(output_);
}
input_ = nullptr;
}
}
return nullptr;
}
// Look for a new match starting with index_ row on the left and rightIndex_
// row on the right.
auto compareResult = compare();
for (;;) {
// Catch up input_ with rightInput_.
while (compareResult < 0) {
if (isLeftJoin(joinType_)) {
prepareOutput();
if (outputSize_ == outputBatchSize_) {
return std::move(output_);
}
addOutputRowForLeftJoin();
}
++index_;
if (index_ == input_->size()) {
// Ran out of rows on the left side.
input_ = nullptr;
return nullptr;
}
compareResult = compare();
}
// Catch up rightInput_ with input_.
while (compareResult > 0) {
++rightIndex_;
if (rightIndex_ == rightInput_->size()) {
// Ran out of rows on the right side.
rightInput_ = nullptr;
return nullptr;
}
compareResult = compare();
}
if (compareResult == 0) {
// Found a match. Identify all rows on the left and right that have the
// matching keys.
vector_size_t endIndex = index_ + 1;
while (endIndex < input_->size() && compareLeft(endIndex) == 0) {
++endIndex;
}
if (endIndex == input_->size()) {
// Matches continue in subsequent input. Load all lazies.
loadColumns(input_, *operatorCtx_->execCtx());
}
leftMatch_ = Match{
{input_}, index_, endIndex, endIndex < input_->size(), std::nullopt};
vector_size_t endRightIndex = rightIndex_ + 1;
while (endRightIndex < rightInput_->size() &&
compareRight(endRightIndex) == 0) {
++endRightIndex;
}
rightMatch_ = Match{
{rightInput_},
rightIndex_,
endRightIndex,
endRightIndex < rightInput_->size(),
std::nullopt};
if (!leftMatch_->complete || !rightMatch_->complete) {
if (!leftMatch_->complete) {
// Need to continue looking for the end of match.
input_ = nullptr;
}
if (!rightMatch_->complete) {
// Need to continue looking for the end of match.
rightInput_ = nullptr;
}
return nullptr;
}
index_ = endIndex;
rightIndex_ = endRightIndex;
if (addToOutput()) {
return std::move(output_);
}
compareResult = compare();
}
}
VELOX_UNREACHABLE();
}