in core/src/main/scala/org/apache/spark/eventhubs/PartitionsStatusTracker.scala [150:171]
def updatePartitionPerformance(nAndP: NameAndPartition,
requestSeqNo: SequenceNumber,
batchSize: Int,
receiveTimeInMillis: Long): Unit = {
// find the batchId based on partition-requestSeqNo pair in the partitionSeqNoPairToBatchIdMap ... ignore if it doesn't exist
val batchId = getBatchIdForPartitionSeqNoPair(nAndP, requestSeqNo)
if (batchId == BatchNotFound) {
logInfo(
s"Can't find the corresponding batchId for the partition-requestSeqNo pair ($nAndP, $requestSeqNo) " +
s"in the partition status tracker. Assume the message is for an old batch, so ignore it.")
return
}
// find the batch in batchesStatusList and update the partition performacne in the batch
// if it doesn't exist there should be an error adding/removing batches in the tracker
if (!batchesStatusList.contains(batchId)) {
throw new IllegalStateException(
s"Batch with local batch id = $batchId doesn't exist in the partition status tracker, while mapping " +
s"from a partition-seqNo to this batchId exists in the partition status tracker.")
}
val batchStatus = batchesStatusList(batchId)
batchStatus.updatePartitionPerformance(nAndP, batchSize, receiveTimeInMillis)
}