private static long mergeSortedBatches()

in baremaps-data/src/main/java/org/apache/baremaps/data/algorithm/ExternalMergeSort.java [73:140]


  private static <T> long mergeSortedBatches(
      List<DataList<T>> batches,
      DataList<T> output,
      Comparator<T> comparator,
      boolean distinct) {

    PriorityQueue<DataStack<T>> queue =
        new PriorityQueue<>(batches.size(), (i, j) -> comparator.compare(i.peek(), j.peek()));

    for (DataList<T> input : batches) {
      if (input.isEmpty()) {
        continue;
      }
      DataStack<T> stack = new DataStack<>(input);
      if (!stack.empty()) {
        queue.add(stack);
      }
    }

    long counter = 0;
    if (!distinct) {
      while (!queue.isEmpty()) {
        DataStack<T> stack = queue.poll();
        T value = stack.pop();
        output.addIndexed(value);
        ++counter;
        if (stack.empty()) {
          stack.close();
        } else {
          queue.add(stack); // add it back
        }
      }
    } else {
      T last = null;
      if (!queue.isEmpty()) {
        DataStack<T> stack = queue.poll();
        last = stack.pop();
        output.addIndexed(last);
        ++counter;
        if (stack.empty()) {
          stack.close();
        } else {
          queue.add(stack);
        }
      }
      while (!queue.isEmpty()) {
        DataStack<T> stack = queue.poll();
        T value = stack.pop();
        // Skip duplicate lines
        if (comparator.compare(value, last) != 0) {
          output.addIndexed(value);
          last = value;
        }
        ++counter;
        if (stack.empty()) {
          stack.close();
        } else {
          queue.add(stack); // add it back
        }
      }
    }

    for (DataList<T> batch : batches) {
      batch.clear();
    }

    return counter;
  }