override fun asyncShardOperation()

in src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt [71:144]


    override fun asyncShardOperation(request: GetChangesRequest, shardId: ShardId, listener: ActionListener<GetChangesResponse>) {
        GlobalScope.launch(threadPool.coroutineContext(REPLICATION_EXECUTOR_NAME_LEADER)) {
            // TODO: Figure out if we need to acquire a primary permit here
            listener.completeWith {
                var relativeStartNanos  = System.nanoTime()
                remoteStatsService.stats[shardId] = remoteStatsService.stats.getOrDefault(shardId, RemoteShardMetric())
                val indexMetric = remoteStatsService.stats[shardId]!!

                indexMetric.lastFetchTime.set(relativeStartNanos)

                val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
                if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) {
                    // There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
                    // the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
                    // should catch and start a new poll.
                    val gcp = indexShard.waitForGlobalCheckpoint(request.fromSeqNo, WAIT_FOR_NEW_OPS_TIMEOUT)

                    // At this point indexShard.lastKnownGlobalCheckpoint  has advanced but it may not yet have been synced
                    // to the translog, which means we can't return those changes. Return to the caller to retry.
                    // TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog
                    if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) {
                        assert(gcp > indexShard.lastSyncedGlobalCheckpoint) { "Checkpoint didn't advance at all" }
                        throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...")
                    }
                }

                relativeStartNanos  = System.nanoTime()
                // At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
                val toSeqNo = min(indexShard.lastSyncedGlobalCheckpoint, request.toSeqNo)

                var ops: List<Translog.Operation> = listOf()
                var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId)
                if(fetchFromTranslog) {
                    try {
                        ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo)
                    } catch (e: Exception) {
                        log.debug("Fetching changes from translog for ${request.shardId} " +
                                "- from:${request.fromSeqNo}, to:$toSeqNo failed with exception - ${e.stackTraceToString()}")
                        fetchFromTranslog = false
                    }
                }

                // Translog fetch is disabled or not found
                if(!fetchFromTranslog) {
                    log.debug("Fetching changes from lucene for ${request.shardId} - from:${request.fromSeqNo}, to:$toSeqNo")
                    relativeStartNanos  = System.nanoTime()
                    indexShard.newChangesSnapshot("odr", request.fromSeqNo, toSeqNo, true).use { snapshot ->
                        ops = ArrayList(snapshot.totalOperations())
                        var op = snapshot.next()
                        while (op != null) {
                            (ops as ArrayList<Translog.Operation>).add(op)
                            op = snapshot.next()
                        }
                    }
                }

                val tookInNanos = System.nanoTime() - relativeStartNanos
                val tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos)
                if (fetchFromTranslog) {
                    indexMetric.latencyTlog.addAndGet(tookInMillis)
                    indexMetric.opsTlog.addAndGet(ops.size.toLong())
                } else {
                    indexMetric.latencyLucene.addAndGet(tookInMillis)
                    indexMetric.opsLucene.addAndGet(ops.size.toLong())
                }
                indexMetric.tlogSize.set(indexShard.translogStats().translogSizeInBytes)
                indexMetric.ops.addAndGet(ops.size.toLong())

                ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) }

                GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, indexShard.lastSyncedGlobalCheckpoint)
            }
        }
    }