in src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt [71:144]
override fun asyncShardOperation(request: GetChangesRequest, shardId: ShardId, listener: ActionListener<GetChangesResponse>) {
GlobalScope.launch(threadPool.coroutineContext(REPLICATION_EXECUTOR_NAME_LEADER)) {
// TODO: Figure out if we need to acquire a primary permit here
listener.completeWith {
var relativeStartNanos = System.nanoTime()
remoteStatsService.stats[shardId] = remoteStatsService.stats.getOrDefault(shardId, RemoteShardMetric())
val indexMetric = remoteStatsService.stats[shardId]!!
indexMetric.lastFetchTime.set(relativeStartNanos)
val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) {
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
// the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
// should catch and start a new poll.
val gcp = indexShard.waitForGlobalCheckpoint(request.fromSeqNo, WAIT_FOR_NEW_OPS_TIMEOUT)
// At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced
// to the translog, which means we can't return those changes. Return to the caller to retry.
// TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog
if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) {
assert(gcp > indexShard.lastSyncedGlobalCheckpoint) { "Checkpoint didn't advance at all" }
throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...")
}
}
relativeStartNanos = System.nanoTime()
// At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
val toSeqNo = min(indexShard.lastSyncedGlobalCheckpoint, request.toSeqNo)
var ops: List<Translog.Operation> = listOf()
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId)
if(fetchFromTranslog) {
try {
ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo)
} catch (e: Exception) {
log.debug("Fetching changes from translog for ${request.shardId} " +
"- from:${request.fromSeqNo}, to:$toSeqNo failed with exception - ${e.stackTraceToString()}")
fetchFromTranslog = false
}
}
// Translog fetch is disabled or not found
if(!fetchFromTranslog) {
log.debug("Fetching changes from lucene for ${request.shardId} - from:${request.fromSeqNo}, to:$toSeqNo")
relativeStartNanos = System.nanoTime()
indexShard.newChangesSnapshot("odr", request.fromSeqNo, toSeqNo, true).use { snapshot ->
ops = ArrayList(snapshot.totalOperations())
var op = snapshot.next()
while (op != null) {
(ops as ArrayList<Translog.Operation>).add(op)
op = snapshot.next()
}
}
}
val tookInNanos = System.nanoTime() - relativeStartNanos
val tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos)
if (fetchFromTranslog) {
indexMetric.latencyTlog.addAndGet(tookInMillis)
indexMetric.opsTlog.addAndGet(ops.size.toLong())
} else {
indexMetric.latencyLucene.addAndGet(tookInMillis)
indexMetric.opsLucene.addAndGet(ops.size.toLong())
}
indexMetric.tlogSize.set(indexShard.translogStats().translogSizeInBytes)
indexMetric.ops.addAndGet(ops.size.toLong())
ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) }
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, indexShard.lastSyncedGlobalCheckpoint)
}
}
}