in core/src/main/scala/org/apache/spark/eventhubs/PartitionsStatusTracker.scala [94:111]
private def removeBatch(batchId: Long): Unit = {
if (!batchesStatusList.contains(batchId)) {
logInfo(
s"Batch with local batchId = $batchId doesn't exist in the batch status tracker, so it can't be removed.")
return
}
// remove the mapping from partition-seqNo pair to the batchId (ignore partitions with empty batch size)
val batchStatus = batchesStatusList(batchId)
batchStatus.paritionsStatusList
.filter(p => !p._2.emptyBatch)
.values
.foreach(ps => {
val key = partitionSeqNoKey(ps.nAndP, ps.requestSeqNo)
removePartitionSeqNoToBatchIdMapping(key)
})
// remove the batchStatus tracker from the map
batchesStatusList.remove(batchId)
}