in baremaps-core/src/main/java/org/apache/baremaps/database/algorithm/ExternalMergeSort.java [66:133]
private static <T> long mergeSortedBatches(
List<DataList<T>> batches,
DataList<T> output,
Comparator<T> comparator,
boolean distinct) throws IOException {
PriorityQueue<DataStack<T>> queue =
new PriorityQueue<>(batches.size(), (i, j) -> comparator.compare(i.peek(), j.peek()));
for (DataList<T> input : batches) {
if (input.sizeAsLong() == 0) {
continue;
}
DataStack stack = new DataStack(input);
if (!stack.empty()) {
queue.add(stack);
}
}
long counter = 0;
if (!distinct) {
while (queue.size() > 0) {
DataStack<T> stack = queue.poll();
T value = stack.pop();
output.addIndexed(value);
++counter;
if (stack.empty()) {
stack.close();
} else {
queue.add(stack); // add it back
}
}
} else {
T last = null;
if (queue.size() > 0) {
DataStack<T> stack = queue.poll();
last = stack.pop();
output.addIndexed(last);
++counter;
if (stack.empty()) {
stack.close();
} else {
queue.add(stack);
}
}
while (queue.size() > 0) {
DataStack<T> stack = queue.poll();
T value = stack.pop();
// Skip duplicate lines
if (comparator.compare(value, last) != 0) {
output.addIndexed(value);
last = value;
}
++counter;
if (stack.empty()) {
stack.close();
} else {
queue.add(stack); // add it back
}
}
}
for (DataList<T> batch : batches) {
batch.clear();
}
return counter;
}