in linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java [878:1048]
public static <TResult, TSource, TInner, TKey> Enumerable<TResult> asofJoin(
Enumerable<TSource> outer, Enumerable<TInner> inner,
Function1<TSource, TKey> outerKeySelector,
Function1<TInner, TKey> innerKeySelector,
Function2<TSource, @Nullable TInner, TResult> resultSelector,
Predicate2<TSource, TInner> matchComparator,
Comparator<TInner> timestampComparator,
boolean emitNullsOnRight) {
// The basic algorithm is simple:
// - scan and index left collection by key
// - for each left record keep the best right record, initialized to 'null'
// - scan the right collection and for each record
// - match it against all left collection records with the same key
// - if the timestamp is closer, update the right record
// - emit all items in the index
Map<TKey, List<TSource>> leftIndex = new HashMap<>();
// For each left element the corresponding best right element
Map<TKey, List<@Nullable TInner>> rightIndex = new HashMap<>();
// Outer elements that have null keys. Will remain empty if !emitNullsOnRight.
List<TSource> outerWithNullKeys = new ArrayList<>();
try (Enumerator<TSource> os = outer.enumerator()) {
while (os.moveNext()) {
TSource l = os.current();
TKey key = outerKeySelector.apply(l);
if (key == null) {
// key contains null fields (result of key selector is null)
if (emitNullsOnRight) {
outerWithNullKeys.add(l);
}
} else {
List<TSource> left;
List<@Nullable TInner> right;
if (!leftIndex.containsKey(key)) {
left = new ArrayList<>();
right = new ArrayList<>();
leftIndex.put(key, left);
rightIndex.put(key, right);
} else {
left = leftIndex.get(key);
right = rightIndex.get(key);
}
left.add(l);
requireNonNull(right, "right").add(null);
}
}
}
// Scan right collection
try (Enumerator<TInner> is = inner.enumerator()) {
while (is.moveNext()) {
TInner r = is.current();
TKey key = innerKeySelector.apply(r);
if (key == null) {
// key contains null fields (result of key selector is null)
continue;
}
List<TSource> left = leftIndex.get(key);
if (left == null) {
continue;
}
assert !left.isEmpty();
List<@Nullable TInner> best = requireNonNull(rightIndex.get(key));
assert left.size() == best.size();
for (int i = 0; i < left.size(); i++) {
TSource leftElement = left.get(i);
boolean matches = matchComparator.apply(leftElement, r);
if (!matches) {
continue;
}
@Nullable TInner bestElement = best.get(i);
if (bestElement == null) {
best.set(i, r);
} else {
boolean isCloser = timestampComparator.compare(bestElement, r) < 0;
if (isCloser) {
best.set(i, r);
}
}
}
}
}
return new AbstractEnumerable<TResult>() {
@Override public Enumerator<TResult> enumerator() {
return new Enumerator<TResult>() {
final Enumerator<Map.Entry<TKey, List<TSource>>> enumerator =
new Linq4j.IterableEnumerator<>(leftIndex.entrySet());
boolean emittingNullKeys = false; // True when we emit the records with null keys
@Nullable Enumerator<TSource> left = null; // Iterates over values with same key
@Nullable Enumerator<@Nullable TInner> right = null;
final Enumerator<TSource> leftNullKeys = // not used for inner ASOF joins
new Linq4j.IterableEnumerator<>(outerWithNullKeys);
// This is a small state machine
// if (emittingNullKeys) {
// we are iterating over 'outerWithNullKeys' using 'leftNullKeys'
// } else {
// we are iterating over the 'leftIndex' using 'enumerator'
// for each value of the key we iterate advancing
// concurrently using 'left' and 'right'
// when finished set emittingNullKeys = true
// }
@Override public TResult current() {
if (emittingNullKeys) {
TSource l = leftNullKeys.current();
return resultSelector.apply(l, null);
}
TSource l = requireNonNull(left, "left").current();
@Nullable TInner r = requireNonNull(right, "right").current();
return resultSelector.apply(l, r);
}
@Override public boolean moveNext() {
while (true) {
boolean hasNext = false;
if (emittingNullKeys) {
return leftNullKeys.moveNext();
} else {
if (left != null) {
// Advance left, right
hasNext = left.moveNext();
boolean rightHasNext =
requireNonNull(right, "right").moveNext();
assert hasNext == rightHasNext;
}
if (hasNext) {
if (!emitNullsOnRight) {
@Nullable TInner r =
requireNonNull(right, "right").current();
if (r == null) {
continue;
}
}
return true;
}
// Advance enumerator
hasNext = enumerator.moveNext();
if (hasNext) {
Map.Entry<TKey, List<TSource>> current = enumerator.current();
TKey key = current.getKey();
List<TSource> value = current.getValue();
left = new Linq4j.IterableEnumerator<>(value);
List<@Nullable TInner> rightList =
requireNonNull(rightIndex.get(key));
right = new Linq4j.IterableEnumerator<>(rightList);
} else {
// Done with the data, start emitting records with null keys
emittingNullKeys = true;
}
}
}
}
@Override public void reset() {
enumerator.reset();
left = null;
right = null;
}
@Override public void close() {
enumerator.close();
left = null;
right = null;
}
};
}
};
}