def rebuildIndex()

in atlas-core/src/main/scala/com/netflix/atlas/core/index/BatchUpdateTagIndex.scala [81:132]


  def rebuildIndex(filter: Query = Query.True): List[T] = {
    val timerId = rebuildTimer.start()
    try {
      // Drain the update queue and create map of items for deduping, we put new items in the
      // map first so that an older item, if present, will be preferred
      val size = pendingUpdates.size
      val updates = new java.util.ArrayList[T](size)
      pendingUpdates.drainTo(updates, size)
      val items = new java.util.HashMap[ItemId, T](size)
      updates.forEach { i =>
        if (filter.matches(i.tags))
          items.put(i.id, i)
      }

      // Get set of all items in the current index that are not expired and match the filter
      val currentItems = currentIndex.get
        .findItems(TagQuery(None))
        .filter(i => !i.isExpired)

      // Remove items that are already indexed from the set of new items
      currentItems.foreach { i =>
        items.remove(i.id)
      }

      // Find set of items matching and not matching the filter
      val (matches, nonMatches) = currentItems.partition(item => filter.matches(item.tags))

      // Merge previous with new updates. Previous matches are coming from index and
      // will already be sorted by the id.
      val a1 = matches.toArray
      val a2 = items.values.toArray(new Array[T](0))
      java.util.Arrays.sort(a2, RoaringTagIndex.IdComparator)
      val dst = new Array[T](a1.length + a2.length)
      ArrayHelper.merge[T](
        RoaringTagIndex.IdComparator.asInstanceOf[Comparator[T]],
        (a, _) => a,
        a1,
        a1.length,
        a2,
        a2.length,
        dst
      )

      // Create array of items and build the index
      rebuildIndex(dst)

      // Return set of items that have been filtered out
      nonMatches
    } finally {
      rebuildTimer.stop(timerId)
    }
  }