public static Enumerable asofJoin()

in linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java [878:1048]


  public static <TResult, TSource, TInner, TKey> Enumerable<TResult> asofJoin(
      Enumerable<TSource> outer, Enumerable<TInner> inner,
      Function1<TSource, TKey> outerKeySelector,
      Function1<TInner, TKey> innerKeySelector,
      Function2<TSource, @Nullable TInner, TResult> resultSelector,
      Predicate2<TSource, TInner> matchComparator,
      Comparator<TInner> timestampComparator,
      boolean emitNullsOnRight) {

    // The basic algorithm is simple:
    // - scan and index left collection by key
    // - for each left record keep the best right record, initialized to 'null'
    // - scan the right collection and for each record
    //   - match it against all left collection records with the same key
    //   - if the timestamp is closer, update the right record
    // - emit all items in the index
    Map<TKey, List<TSource>> leftIndex = new HashMap<>();
    // For each left element the corresponding best right element
    Map<TKey, List<@Nullable TInner>> rightIndex = new HashMap<>();
    // Outer elements that have null keys.  Will remain empty if !emitNullsOnRight.
    List<TSource> outerWithNullKeys = new ArrayList<>();
    try (Enumerator<TSource> os = outer.enumerator()) {
      while (os.moveNext()) {
        TSource l = os.current();
        TKey key = outerKeySelector.apply(l);
        if (key == null) {
          // key contains null fields (result of key selector is null)
          if (emitNullsOnRight) {
            outerWithNullKeys.add(l);
          }
        } else {
          List<TSource> left;
          List<@Nullable TInner> right;
          if (!leftIndex.containsKey(key)) {
            left = new ArrayList<>();
            right = new ArrayList<>();
            leftIndex.put(key, left);
            rightIndex.put(key, right);
          } else {
            left = leftIndex.get(key);
            right = rightIndex.get(key);
          }
          left.add(l);
          requireNonNull(right, "right").add(null);
        }
      }
    }
    // Scan right collection
    try (Enumerator<TInner> is = inner.enumerator()) {
      while (is.moveNext()) {
        TInner r = is.current();
        TKey key = innerKeySelector.apply(r);
        if (key == null) {
          // key contains null fields (result of key selector is null)
          continue;
        }
        List<TSource> left = leftIndex.get(key);
        if (left == null) {
          continue;
        }
        assert !left.isEmpty();
        List<@Nullable TInner> best = requireNonNull(rightIndex.get(key));
        assert left.size() == best.size();
        for (int i = 0; i < left.size(); i++) {
          TSource leftElement = left.get(i);
          boolean matches = matchComparator.apply(leftElement, r);
          if (!matches) {
            continue;
          }
          @Nullable TInner bestElement = best.get(i);
          if (bestElement == null) {
            best.set(i, r);
          } else {
            boolean isCloser = timestampComparator.compare(bestElement, r) < 0;
            if (isCloser) {
              best.set(i, r);
            }
          }
        }
      }
    }

    return new AbstractEnumerable<TResult>() {
      @Override public Enumerator<TResult> enumerator() {
        return new Enumerator<TResult>() {
          final Enumerator<Map.Entry<TKey, List<TSource>>> enumerator =
              new Linq4j.IterableEnumerator<>(leftIndex.entrySet());

          boolean emittingNullKeys = false;  // True when we emit the records with null keys
          @Nullable Enumerator<TSource> left = null;  // Iterates over values with same key
          @Nullable Enumerator<@Nullable TInner> right = null;
          final Enumerator<TSource> leftNullKeys =    // not used for inner ASOF joins
              new Linq4j.IterableEnumerator<>(outerWithNullKeys);

          // This is a small state machine
          // if (emittingNullKeys) {
          //    we are iterating over 'outerWithNullKeys' using 'leftNullKeys'
          // } else {
          //    we are iterating over the 'leftIndex' using 'enumerator'
          //    for each value of the key we iterate advancing
          //        concurrently using 'left' and 'right'
          //    when finished set emittingNullKeys = true
          // }

          @Override public TResult current() {
            if (emittingNullKeys) {
              TSource l = leftNullKeys.current();
              return resultSelector.apply(l, null);
            }

            TSource l = requireNonNull(left, "left").current();
            @Nullable TInner r = requireNonNull(right, "right").current();
            return resultSelector.apply(l, r);
          }

          @Override public boolean moveNext() {
            while (true) {
              boolean hasNext = false;
              if (emittingNullKeys) {
                return leftNullKeys.moveNext();
              } else {
                if (left != null) {
                  // Advance left, right
                  hasNext = left.moveNext();
                  boolean rightHasNext =
                      requireNonNull(right, "right").moveNext();
                  assert hasNext == rightHasNext;
                }
                if (hasNext) {
                  if (!emitNullsOnRight) {
                    @Nullable TInner r =
                        requireNonNull(right, "right").current();
                    if (r == null) {
                      continue;
                    }
                  }
                  return true;
                }
                // Advance enumerator
                hasNext = enumerator.moveNext();
                if (hasNext) {
                  Map.Entry<TKey, List<TSource>> current = enumerator.current();
                  TKey key = current.getKey();
                  List<TSource> value = current.getValue();
                  left = new Linq4j.IterableEnumerator<>(value);
                  List<@Nullable TInner> rightList =
                      requireNonNull(rightIndex.get(key));
                  right = new Linq4j.IterableEnumerator<>(rightList);
                } else {
                  // Done with the data, start emitting records with null keys
                  emittingNullKeys = true;
                }
              }
            }
          }

          @Override public void reset() {
            enumerator.reset();
            left = null;
            right = null;
          }

          @Override public void close() {
            enumerator.close();
            left = null;
            right = null;
          }
        };
      }
    };
  }