private static long mergeSortedBatches()

in baremaps-core/src/main/java/org/apache/baremaps/database/algorithm/ExternalMergeSort.java [66:133]


  private static <T> long mergeSortedBatches(
      List<DataList<T>> batches,
      DataList<T> output,
      Comparator<T> comparator,
      boolean distinct) throws IOException {

    PriorityQueue<DataStack<T>> queue =
        new PriorityQueue<>(batches.size(), (i, j) -> comparator.compare(i.peek(), j.peek()));

    for (DataList<T> input : batches) {
      if (input.sizeAsLong() == 0) {
        continue;
      }
      DataStack stack = new DataStack(input);
      if (!stack.empty()) {
        queue.add(stack);
      }
    }

    long counter = 0;
    if (!distinct) {
      while (queue.size() > 0) {
        DataStack<T> stack = queue.poll();
        T value = stack.pop();
        output.addIndexed(value);
        ++counter;
        if (stack.empty()) {
          stack.close();
        } else {
          queue.add(stack); // add it back
        }
      }
    } else {
      T last = null;
      if (queue.size() > 0) {
        DataStack<T> stack = queue.poll();
        last = stack.pop();
        output.addIndexed(last);
        ++counter;
        if (stack.empty()) {
          stack.close();
        } else {
          queue.add(stack);
        }
      }
      while (queue.size() > 0) {
        DataStack<T> stack = queue.poll();
        T value = stack.pop();
        // Skip duplicate lines
        if (comparator.compare(value, last) != 0) {
          output.addIndexed(value);
          last = value;
        }
        ++counter;
        if (stack.empty()) {
          stack.close();
        } else {
          queue.add(stack); // add it back
        }
      }
    }

    for (DataList<T> batch : batches) {
      batch.clear();
    }

    return counter;
  }