def addorUpdateBatch()

in core/src/main/scala/org/apache/spark/eventhubs/PartitionsStatusTracker.scala [51:85]


  def addorUpdateBatch(batchId: Long, offsetRanges: Array[OffsetRange]): Unit = {
    if (batchesStatusList.contains(batchId)) {
      // Batches are not supposed to be updated. Log an error if a batch is being updated
      logError(
        s"Batch with local batch id: $batchId already exists in the partition status tracker. Batches" +
          s"are not supposed to be updated in the partition status tracker.")
    } else {
      // remove the oldest batch from the batchesStatusList to realse space for adding the new batch.
      val batchIdToRemove = batchId - PartitionsStatusTracker.TrackingBatchCount
      logDebug(
        s"Remove the batch ${if (batchIdToRemove >= 0) batchIdToRemove else None} from the tracker.")
      if (batchIdToRemove >= 0) {
        removeBatch(batchIdToRemove)
      }
    }

    // find partitions with a zero size batch.. No performance metric msg will be received for those partitions
    val isZeroSizeBatchPartition: Map[NameAndPartition, Boolean] =
      offsetRanges.map(range => (range.nameAndPartition, (range.fromSeqNo == range.untilSeqNo)))(
        breakOut)

    // create the batchStatus tracker and add it to the map
    batchesStatusList(batchId) = new BatchStatus(batchId, offsetRanges.map(range => {
      val np = range.nameAndPartition
      (np, new PartitionStatus(np, range.fromSeqNo, isZeroSizeBatchPartition(np)))
    })(breakOut))

    // add the mapping from partition-startSeqNo pair to the batchId ... ignore partitions with zero batch size
    offsetRanges
      .filter(r => !isZeroSizeBatchPartition(r.nameAndPartition))
      .foreach(range => {
        val key = partitionSeqNoKey(range.nameAndPartition, range.fromSeqNo)
        addPartitionSeqNoToBatchIdMapping(key, batchId)
      })
  }