fun getMetadata()

in src/main/kotlin/org/opensearch/replication/metadata/store/ReplicationMetadataStore.kt [152:187]


    fun getMetadata(getMetadataReq: GetReplicationMetadataRequest,
                    fetch_from_primary: Boolean,
                    timeout: Long): GetReplicationMetadataResponse {
        val id = getId(getMetadataReq.metadataType, getMetadataReq.connectionName, getMetadataReq.resourceName)

        if(!configStoreExists()) {
            throw ResourceNotFoundException("Metadata for $id doesn't exist")
        }

        val clusterHealthReq = ClusterHealthRequest(REPLICATION_CONFIG_SYSTEM_INDEX).waitForYellowStatus()
        val clusterHealthRes = client.admin().cluster().health(clusterHealthReq).actionGet(timeout)
        assert(clusterHealthRes.status <= ClusterHealthStatus.YELLOW) { "Replication metadata store is unhealthy" }

        val getReq = GetRequest(REPLICATION_CONFIG_SYSTEM_INDEX, id)
        getReq.realtime(true)
        getReq.refresh(true)
        if(fetch_from_primary) {
            val preference = getPreferenceOnPrimaryNode() ?: throw IllegalStateException("Primary shard to fetch id[$id] in index[$REPLICATION_CONFIG_SYSTEM_INDEX] doesn't exist")
            getReq.preference(preference)
        }

        var storedContext: ThreadContext.StoredContext? = null
        try {
            storedContext = client.threadPool().threadContext.stashContext()
            val getRes = client.get(getReq).actionGet(timeout)
            if(getRes.sourceAsBytesRef == null) {
                throw ResourceNotFoundException("Metadata for $id doesn't exist")
            }
            val parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
                    getRes.sourceAsBytesRef, XContentType.JSON)
            return GetReplicationMetadataResponse(ReplicationMetadata.fromXContent(parser), getRes.seqNo, getRes.primaryTerm)
        } finally {
            storedContext?.close()
        }

    }