in src/main/kotlin/org/opensearch/replication/metadata/store/ReplicationMetadataStore.kt [152:187]
fun getMetadata(getMetadataReq: GetReplicationMetadataRequest,
fetch_from_primary: Boolean,
timeout: Long): GetReplicationMetadataResponse {
val id = getId(getMetadataReq.metadataType, getMetadataReq.connectionName, getMetadataReq.resourceName)
if(!configStoreExists()) {
throw ResourceNotFoundException("Metadata for $id doesn't exist")
}
val clusterHealthReq = ClusterHealthRequest(REPLICATION_CONFIG_SYSTEM_INDEX).waitForYellowStatus()
val clusterHealthRes = client.admin().cluster().health(clusterHealthReq).actionGet(timeout)
assert(clusterHealthRes.status <= ClusterHealthStatus.YELLOW) { "Replication metadata store is unhealthy" }
val getReq = GetRequest(REPLICATION_CONFIG_SYSTEM_INDEX, id)
getReq.realtime(true)
getReq.refresh(true)
if(fetch_from_primary) {
val preference = getPreferenceOnPrimaryNode() ?: throw IllegalStateException("Primary shard to fetch id[$id] in index[$REPLICATION_CONFIG_SYSTEM_INDEX] doesn't exist")
getReq.preference(preference)
}
var storedContext: ThreadContext.StoredContext? = null
try {
storedContext = client.threadPool().threadContext.stashContext()
val getRes = client.get(getReq).actionGet(timeout)
if(getRes.sourceAsBytesRef == null) {
throw ResourceNotFoundException("Metadata for $id doesn't exist")
}
val parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
getRes.sourceAsBytesRef, XContentType.JSON)
return GetReplicationMetadataResponse(ReplicationMetadata.fromXContent(parser), getRes.seqNo, getRes.primaryTerm)
} finally {
storedContext?.close()
}
}