public override suspend fun execute()

in src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt [163:256]


    public override suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) {
        checkNotNull(initialState) { "Missing initial state" }
        followingTaskState = FollowingState(emptyMap())
        currentTaskState = initialState as IndexReplicationState
        while (scope.isActive) {
            try {
                val newState = when (currentTaskState.state) {
                    ReplicationState.INIT -> {
                        addListenerToInterruptTask()
                        if (isResumed()) {
                            log.debug("Resuming tasks now.")
                            InitFollowState
                        } else {
                            setupAndStartRestore()
                        }
                    }
                    ReplicationState.RESTORING -> {
                        log.info("In restoring state for $followerIndexName")
                        waitForRestore()
                    }
                    ReplicationState.INIT_FOLLOW -> {
                        log.info("Starting shard tasks")
                        addIndexBlockForReplication()
                        startShardFollowTasks(emptyMap())
                    }
                    ReplicationState.FOLLOWING -> {
                        if (currentTaskState is FollowingState) {
                            followingTaskState = (currentTaskState as FollowingState)
                            shouldCallEvalMonitoring = false
                            MonitoringState
                        } else {
                            throw ReplicationException("Wrong state type: ${currentTaskState::class}")
                        }
                    }
                    ReplicationState.MONITORING -> {
                        var state = evalMonitoringState()
                        if (metadataPoller == null) {
                            metadataPoller = scope.launch {
                                pollForMetadata(this)
                            }
                        }

                        if (state !is MonitoringState) {
                            // Tasks need to be started
                            state
                        } else {
                            state = pollShardTaskStatus((followingTaskState as FollowingState).shardReplicationTasks)
                            followingTaskState = startMissingShardTasks((followingTaskState as FollowingState).shardReplicationTasks)
                            when (state) {
                                is MonitoringState -> {
                                    updateMetadata()
                                }
                                is FailedState -> {
                                    // Try pausing first if we get Failed state. This returns failed state if pause failed
                                    pauseReplication(state)
                                }
                                else -> {
                                    state
                                }
                            }
                        }
                    }
                    ReplicationState.FAILED -> {
                        assert(currentTaskState is FailedState)
                        failReplication(currentTaskState as FailedState)
                        currentTaskState
                    }
                    ReplicationState.COMPLETED -> {
                        markAsCompleted()
                        CompletedState
                    }
                }
                if (newState != currentTaskState) {
                    currentTaskState = updateState(newState)
                }
                if (isCompleted) break
            } catch(e: ReplicationException) {
                log.error("Exiting index replication task", e)
                throw e
            } catch(e: OpenSearchException) {
                val status = e.status().status
                // Index replication task shouldn't exit before shard replication tasks
                // As long as shard replication tasks doesn't encounter any errors, Index task
                // should continue to poll and Any failure encoutered from shard task should
                // invoke state transition and exit
                if(status < 500 && status != RestStatus.TOO_MANY_REQUESTS.status) {
                    log.error("Exiting index replication task", e)
                    throw e
                }
                log.debug("Encountered transient error while running index replication task", e)
                delay(SLEEP_TIME_BETWEEN_POLL_MS)
            }
        }
    }