in linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java [1851:2006]
public static <TSource, TInner, TResult> Enumerable<TResult> correlateBatchJoin(
final JoinType joinType,
final Enumerable<TSource> outer,
final Function1<List<TSource>, Enumerable<TInner>> inner,
final Function2<TSource, TInner, TResult> resultSelector,
final Predicate2<TSource, TInner> predicate,
final int batchSize) {
return new AbstractEnumerable<TResult>() {
@Override public Enumerator<TResult> enumerator() {
return new Enumerator<TResult>() {
final Enumerator<TSource> outerEnumerator = outer.enumerator();
final List<TSource> outerValues = new ArrayList<>(batchSize);
final List<TInner> innerValues = new ArrayList<>();
@Nullable TSource outerValue;
@Nullable TInner innerValue;
@Nullable Enumerable<TInner> innerEnumerable;
@Nullable Enumerator<TInner> innerEnumerator;
boolean innerEnumHasNext = false;
boolean atLeastOneResult = false;
int i = -1; // outer position
int j = -1; // inner position
@SuppressWarnings("argument.type.incompatible")
@Override public TResult current() {
return resultSelector.apply(outerValue, innerValue);
}
@Override public boolean moveNext() {
while (true) {
// Fetch a new batch
if (i == outerValues.size() || i == -1) {
i = 0;
j = 0;
outerValues.clear();
innerValues.clear();
while (outerValues.size() < batchSize && outerEnumerator.moveNext()) {
TSource tSource = outerEnumerator.current();
outerValues.add(tSource);
}
if (outerValues.isEmpty()) {
return false;
}
innerEnumerable = inner.apply(new AbstractList<TSource>() {
// If the last batch isn't complete fill it with the first value
// No harm since it's a disjunction
@Override public TSource get(final int index) {
return index < outerValues.size() ? outerValues.get(index) : outerValues.get(0);
}
@Override public int size() {
return batchSize;
}
});
if (innerEnumerable == null) {
innerEnumerable = Linq4j.emptyEnumerable();
}
closeInner();
Enumerable<TInner> innerEnumerable = requireNonNull(this.innerEnumerable);
innerEnumerator = innerEnumerable.enumerator();
innerEnumHasNext = innerEnumerator.moveNext();
// If no inner values skip the whole batch
// in case of SEMI and INNER join
if (!innerEnumHasNext
&& (joinType == JoinType.SEMI || joinType == JoinType.INNER)) {
i = outerValues.size();
continue;
}
}
if (innerHasNext()) {
outerValue = outerValues.get(i); // get current outer value
nextInnerValue();
// Compare current block row to current inner value
if (predicate.apply(castNonNull(outerValue), castNonNull(innerValue))) {
atLeastOneResult = true;
// Skip the rest of inner values in case of
// ANTI and SEMI when a match is found
if (joinType == JoinType.ANTI || joinType == JoinType.SEMI) {
// Two ways of skipping inner values,
// enumerator way and ArrayList way
if (i == 0) {
Enumerator<TInner> innerEnumerator = requireNonNull(this.innerEnumerator);
while (innerEnumHasNext) {
innerValues.add(innerEnumerator.current());
innerEnumHasNext = innerEnumerator.moveNext();
}
} else {
j = innerValues.size();
}
if (joinType == JoinType.ANTI) {
continue;
}
}
return true;
}
} else { // End of inner
if (!atLeastOneResult
&& (joinType == JoinType.LEFT
|| joinType == JoinType.ANTI)) {
outerValue = outerValues.get(i); // get current outer value
innerValue = null;
nextOuterValue();
return true;
}
nextOuterValue();
}
}
}
public void nextOuterValue() {
i++; // next outerValue
j = 0; // rewind innerValues
atLeastOneResult = false;
}
private void nextInnerValue() {
if (i == 0) {
Enumerator<TInner> innerEnumerator = requireNonNull(this.innerEnumerator);
innerValue = innerEnumerator.current();
innerValues.add(innerValue);
innerEnumHasNext = innerEnumerator.moveNext(); // next enumerator inner value
} else {
innerValue = innerValues.get(j++); // next ArrayList inner value
}
}
private boolean innerHasNext() {
return i == 0 ? innerEnumHasNext : j < innerValues.size();
}
@Override public void reset() {
outerEnumerator.reset();
innerValue = null;
outerValue = null;
outerValues.clear();
innerValues.clear();
atLeastOneResult = false;
i = -1;
}
private void closeInner() {
if (innerEnumerator != null) {
innerEnumerator.close();
innerEnumerator = null;
}
}
@Override public void close() {
outerEnumerator.close();
closeInner();
outerValue = null;
innerValue = null;
}
};
}
};
}