in src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt [163:256]
public override suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) {
checkNotNull(initialState) { "Missing initial state" }
followingTaskState = FollowingState(emptyMap())
currentTaskState = initialState as IndexReplicationState
while (scope.isActive) {
try {
val newState = when (currentTaskState.state) {
ReplicationState.INIT -> {
addListenerToInterruptTask()
if (isResumed()) {
log.debug("Resuming tasks now.")
InitFollowState
} else {
setupAndStartRestore()
}
}
ReplicationState.RESTORING -> {
log.info("In restoring state for $followerIndexName")
waitForRestore()
}
ReplicationState.INIT_FOLLOW -> {
log.info("Starting shard tasks")
addIndexBlockForReplication()
startShardFollowTasks(emptyMap())
}
ReplicationState.FOLLOWING -> {
if (currentTaskState is FollowingState) {
followingTaskState = (currentTaskState as FollowingState)
shouldCallEvalMonitoring = false
MonitoringState
} else {
throw ReplicationException("Wrong state type: ${currentTaskState::class}")
}
}
ReplicationState.MONITORING -> {
var state = evalMonitoringState()
if (metadataPoller == null) {
metadataPoller = scope.launch {
pollForMetadata(this)
}
}
if (state !is MonitoringState) {
// Tasks need to be started
state
} else {
state = pollShardTaskStatus((followingTaskState as FollowingState).shardReplicationTasks)
followingTaskState = startMissingShardTasks((followingTaskState as FollowingState).shardReplicationTasks)
when (state) {
is MonitoringState -> {
updateMetadata()
}
is FailedState -> {
// Try pausing first if we get Failed state. This returns failed state if pause failed
pauseReplication(state)
}
else -> {
state
}
}
}
}
ReplicationState.FAILED -> {
assert(currentTaskState is FailedState)
failReplication(currentTaskState as FailedState)
currentTaskState
}
ReplicationState.COMPLETED -> {
markAsCompleted()
CompletedState
}
}
if (newState != currentTaskState) {
currentTaskState = updateState(newState)
}
if (isCompleted) break
} catch(e: ReplicationException) {
log.error("Exiting index replication task", e)
throw e
} catch(e: OpenSearchException) {
val status = e.status().status
// Index replication task shouldn't exit before shard replication tasks
// As long as shard replication tasks doesn't encounter any errors, Index task
// should continue to poll and Any failure encoutered from shard task should
// invoke state transition and exit
if(status < 500 && status != RestStatus.TOO_MANY_REQUESTS.status) {
log.error("Exiting index replication task", e)
throw e
}
log.debug("Encountered transient error while running index replication task", e)
delay(SLEEP_TIME_BETWEEN_POLL_MS)
}
}
}