in src/main/kotlin/org/opensearch/replication/action/status/TransportReplicationStatusAction.kt [46:108]
override fun doExecute(task: Task, request: ShardInfoRequest, listener: ActionListener<ReplicationStatusResponse>) {
launch(threadPool.coroutineContext()) {
listener.completeWith {
try {
val metadata = replicationMetadataManager.getIndexReplicationMetadata(request!!.indices()[0])
var status = if (metadata.overallState.isNullOrEmpty()) "STOPPED" else metadata.overallState
var reason = metadata.reason
if (!status.equals("RUNNING")) {
var replicationStatusResponse= ReplicationStatusResponse(status)
replicationStatusResponse.connectionAlias = metadata.connectionName
replicationStatusResponse.followerIndexName = metadata.followerContext.resource
replicationStatusResponse.leaderIndexName = metadata.leaderContext.resource
replicationStatusResponse.status = status
replicationStatusResponse.reason = reason
return@completeWith replicationStatusResponse
}
var followerResponse = client.suspendExecute(ShardsInfoAction.INSTANCE,
ShardInfoRequest(metadata.followerContext.resource),true)
val remoteClient = client.getRemoteClusterClient(metadata.connectionName)
var leaderResponse = remoteClient.suspendExecute(ShardsInfoAction.INSTANCE,
ShardInfoRequest(metadata.leaderContext.resource),true)
if (followerResponse.shardInfoResponse.size > 0) {
status = followerResponse.shardInfoResponse.get(0).status
}
if (!status.equals("BOOTSTRAPPING")) {
var shardResponses = followerResponse.shardInfoResponse
leaderResponse.shardInfoResponse.listIterator().forEach {
val leaderShardName = it.shardId.toString()
if (it.isReplayDetailsInitialized()) {
val remoteCheckPoint = it.replayDetails.remoteCheckpoint
shardResponses.listIterator().forEach {
if (it.isReplayDetailsInitialized()) {
if (leaderShardName.equals(it.shardId.toString()
.replace(metadata.followerContext.resource, metadata.leaderContext.resource))) {
it.replayDetails.remoteCheckpoint = remoteCheckPoint
}
}
}
followerResponse.shardInfoResponse = shardResponses
}
}
}
followerResponse.connectionAlias = metadata.connectionName
followerResponse.followerIndexName = metadata.followerContext.resource
followerResponse.leaderIndexName = metadata.leaderContext.resource
followerResponse.status = status
followerResponse.reason = reason
populateAggregatedResponse(followerResponse)
if (!request.verbose) {
followerResponse.isVerbose = false
}
followerResponse
} catch (e : ResourceNotFoundException) {
log.error("got ResourceNotFoundException while querying for status ",e)
ReplicationStatusResponse("REPLICATION NOT IN PROGRESS")
} catch(e : Exception) {
log.error("got Exception while querying for status ",e)
throw org.opensearch.replication.ReplicationException("failed to fetch replication status")
}
}
}
}