in processing/src/main/java/org/apache/druid/extendedset/intset/ImmutableConciseSet.java [342:489]
private static ImmutableConciseSet doUnion(Iterator<ImmutableConciseSet> sets)
{
IntList retVal = new IntList();
// Use PriorityQueue, because sometimes as much as 20k of bitsets are unified, and the asymptotic complexity of
// keeping bitsets in a sorted array (n^2), as in doIntersection(), becomes more important factor than PriorityQueue
// inefficiency.
// Need to specify initial capacity because JDK 7 doesn't have Comparator-only constructor of PriorityQueue
PriorityQueue<WordIterator> theQ = new PriorityQueue<>(11, UNION_COMPARATOR);
// populate priority queue
while (sets.hasNext()) {
ImmutableConciseSet set = sets.next();
if (set != null && !set.isEmpty()) {
WordIterator itr = set.newWordIterator();
itr.word = itr.next();
theQ.add(itr);
}
}
int currIndex = 0;
List<WordIterator> changedIterators = new ArrayList<>();
while (!theQ.isEmpty()) {
// grab the top element from the priority queue
WordIterator itr = theQ.poll();
int word = itr.getWord();
// if the next word in the queue starts at a different point than where we ended off we need to create a zero gap
// to fill the space
if (currIndex < itr.startIndex) {
addAndCompact(retVal, itr.startIndex - currIndex - 1);
currIndex = itr.startIndex;
}
if (ConciseSetUtils.isLiteral(word)) {
// advance all other literals
while (!theQ.isEmpty() && theQ.peek().startIndex == itr.startIndex) {
WordIterator i = theQ.poll();
int w = i.getWord();
// if we still have zero fills with flipped bits, OR them here
if (ConciseSetUtils.isLiteral(w)) {
word |= w;
} else {
int flipBitLiteral = ConciseSetUtils.getLiteralFromZeroSeqFlipBit(w);
if (flipBitLiteral != ConciseSetUtils.ALL_ZEROS_LITERAL) {
word |= flipBitLiteral;
i.advanceTo(itr.wordsWalked);
}
}
if (i.hasNext()) {
i.word = i.next();
changedIterators.add(i);
}
}
// advance the set with the current literal forward and push result back to priority queue
addAndCompact(retVal, word);
currIndex++;
if (itr.hasNext()) {
itr.word = itr.next();
changedIterators.add(itr);
}
} else if (ConciseSetUtils.isZeroSequence(word)) {
int flipBitLiteral;
while (!theQ.isEmpty() && theQ.peek().startIndex == itr.startIndex) {
WordIterator i = theQ.poll();
int w = i.getWord();
flipBitLiteral = ConciseSetUtils.getLiteralFromZeroSeqFlipBit(w);
if (flipBitLiteral != ConciseSetUtils.ALL_ZEROS_LITERAL) {
i.word = flipBitLiteral;
changedIterators.add(i);
} else if (i.hasNext()) {
i.word = i.next();
changedIterators.add(i);
}
}
// check if a literal needs to be created from the flipped bits of this sequence
flipBitLiteral = ConciseSetUtils.getLiteralFromZeroSeqFlipBit(word);
if (flipBitLiteral != ConciseSetUtils.ALL_ZEROS_LITERAL) {
itr.word = flipBitLiteral;
changedIterators.add(itr);
} else if (itr.hasNext()) {
itr.word = itr.next();
changedIterators.add(itr);
}
} else {
assert ConciseSetUtils.isOneSequence(word);
// extract a literal from the flip bits of the one sequence
int flipBitLiteral = ConciseSetUtils.getLiteralFromOneSeqFlipBit(word);
// advance everything past the longest ones sequence
while (!theQ.isEmpty() && theQ.peek().startIndex < itr.wordsWalked) {
WordIterator i = theQ.poll();
int w = i.getWord();
if (i.startIndex == itr.startIndex) {
// if a literal was created from a flip bit, OR it with other literals or literals from flip bits in the same
// position
if (ConciseSetUtils.isLiteral(w)) {
flipBitLiteral |= w;
} else if (ConciseSetUtils.isZeroSequence(w)) {
flipBitLiteral |= ConciseSetUtils.getLiteralFromZeroSeqFlipBit(w);
} else {
assert ConciseSetUtils.isOneSequence(w);
flipBitLiteral |= ConciseSetUtils.getLiteralFromOneSeqFlipBit(w);
}
}
i.advanceTo(itr.wordsWalked);
if (i.hasNext()) {
i.word = i.next();
changedIterators.add(i);
}
}
// advance longest one literal forward and push result back to priority queue
// if a flip bit is still needed, put it in the correct position
int newWord = word & 0xC1FFFFFF;
if (flipBitLiteral != ConciseSetUtils.ALL_ONES_LITERAL) {
flipBitLiteral ^= ConciseSetUtils.ALL_ONES_LITERAL;
int position = Integer.numberOfTrailingZeros(flipBitLiteral) + 1;
newWord |= (position << 25);
}
addAndCompact(retVal, newWord);
currIndex = itr.wordsWalked;
if (itr.hasNext()) {
itr.word = itr.next();
changedIterators.add(itr);
}
}
theQ.addAll(changedIterators);
changedIterators.clear();
}
if (retVal.isEmpty()) {
return new ImmutableConciseSet();
}
return new ImmutableConciseSet(IntBuffer.wrap(retVal.toArray()));
}