in atlas-core/src/main/scala/com/netflix/atlas/core/index/BatchUpdateTagIndex.scala [81:132]
def rebuildIndex(filter: Query = Query.True): List[T] = {
val timerId = rebuildTimer.start()
try {
// Drain the update queue and create map of items for deduping, we put new items in the
// map first so that an older item, if present, will be preferred
val size = pendingUpdates.size
val updates = new java.util.ArrayList[T](size)
pendingUpdates.drainTo(updates, size)
val items = new java.util.HashMap[ItemId, T](size)
updates.forEach { i =>
if (filter.matches(i.tags))
items.put(i.id, i)
}
// Get set of all items in the current index that are not expired and match the filter
val currentItems = currentIndex.get
.findItems(TagQuery(None))
.filter(i => !i.isExpired)
// Remove items that are already indexed from the set of new items
currentItems.foreach { i =>
items.remove(i.id)
}
// Find set of items matching and not matching the filter
val (matches, nonMatches) = currentItems.partition(item => filter.matches(item.tags))
// Merge previous with new updates. Previous matches are coming from index and
// will already be sorted by the id.
val a1 = matches.toArray
val a2 = items.values.toArray(new Array[T](0))
java.util.Arrays.sort(a2, RoaringTagIndex.IdComparator)
val dst = new Array[T](a1.length + a2.length)
ArrayHelper.merge[T](
RoaringTagIndex.IdComparator.asInstanceOf[Comparator[T]],
(a, _) => a,
a1,
a1.length,
a2,
a2.length,
dst
)
// Create array of items and build the index
rebuildIndex(dst)
// Return set of items that have been filtered out
nonMatches
} finally {
rebuildTimer.stop(timerId)
}
}