in core/src/main/scala/org/apache/spark/eventhubs/PartitionsStatusTracker.scala [51:85]
def addorUpdateBatch(batchId: Long, offsetRanges: Array[OffsetRange]): Unit = {
if (batchesStatusList.contains(batchId)) {
// Batches are not supposed to be updated. Log an error if a batch is being updated
logError(
s"Batch with local batch id: $batchId already exists in the partition status tracker. Batches" +
s"are not supposed to be updated in the partition status tracker.")
} else {
// remove the oldest batch from the batchesStatusList to realse space for adding the new batch.
val batchIdToRemove = batchId - PartitionsStatusTracker.TrackingBatchCount
logDebug(
s"Remove the batch ${if (batchIdToRemove >= 0) batchIdToRemove else None} from the tracker.")
if (batchIdToRemove >= 0) {
removeBatch(batchIdToRemove)
}
}
// find partitions with a zero size batch.. No performance metric msg will be received for those partitions
val isZeroSizeBatchPartition: Map[NameAndPartition, Boolean] =
offsetRanges.map(range => (range.nameAndPartition, (range.fromSeqNo == range.untilSeqNo)))(
breakOut)
// create the batchStatus tracker and add it to the map
batchesStatusList(batchId) = new BatchStatus(batchId, offsetRanges.map(range => {
val np = range.nameAndPartition
(np, new PartitionStatus(np, range.fromSeqNo, isZeroSizeBatchPartition(np)))
})(breakOut))
// add the mapping from partition-startSeqNo pair to the batchId ... ignore partitions with zero batch size
offsetRanges
.filter(r => !isZeroSizeBatchPartition(r.nameAndPartition))
.foreach(range => {
val key = partitionSeqNoKey(range.nameAndPartition, range.fromSeqNo)
addPartitionSeqNoToBatchIdMapping(key, batchId)
})
}