in src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationChangesTracker.kt [83:115]
fun updateBatchFetched(success: Boolean, fromSeqNoRequested: Long, toSeqNoRequested: Long, toSeqNoReceived: Long, seqNoAtLeader: Long) {
if (success) {
// we shouldn't ever be getting more operations than requested.
assert(toSeqNoRequested >= toSeqNoReceived) { "${Thread.currentThread().getName()} Got more operations in the batch than requested" }
logDebug("Updating the batch fetched. ${fromSeqNoRequested}-${toSeqNoReceived}/${toSeqNoRequested}, seqNoAtLeader:$seqNoAtLeader")
// If we didn't get the complete batch that we had requested.
if (toSeqNoRequested > toSeqNoReceived) {
// If this is the last batch being fetched, update the seqNoAlreadyRequested.
if (seqNoAlreadyRequested.get() == toSeqNoRequested) {
seqNoAlreadyRequested.updateAndGet { toSeqNoReceived }
} else {
// Else, add to the missing operations to missing batch
logDebug("Didn't get the complete batch. Adding the missing operations ${toSeqNoReceived + 1}-${toSeqNoRequested}")
missingBatches.add(Pair(toSeqNoReceived + 1, toSeqNoRequested))
}
}
// Update the sequence number observed at leader.
observedSeqNoAtLeader.getAndUpdate { value -> if (seqNoAtLeader > value) seqNoAtLeader else value }
logDebug("observedSeqNoAtLeader: ${observedSeqNoAtLeader.get()}")
} else {
// If this is the last batch being fetched, update the seqNoAlreadyRequested.
if (seqNoAlreadyRequested.get() == toSeqNoRequested) {
seqNoAlreadyRequested.updateAndGet { fromSeqNoRequested - 1 }
} else {
// If this was not the last batch, we might have already fetched other batch of
// operations after this. Adding this to missing.
logDebug("Adding batch to missing $fromSeqNoRequested-$toSeqNoRequested")
missingBatches.add(Pair(fromSeqNoRequested, toSeqNoRequested))
}
}
}