public void testCrossClusterSearch()

in x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java [314:614]


    public void testCrossClusterSearch() throws Exception {
        configureRemoteCluster();
        final String crossClusterAccessApiKeyId = (String) API_KEY_MAP_REF.get().get("id");

        // Fulfilling cluster
        {
            // Spread the shards to all nodes
            final Request createIndexRequest = new Request("PUT", "shared-metrics");
            createIndexRequest.setJsonEntity("""
                {
                  "settings": {
                    "number_of_shards": 3,
                    "number_of_replicas": 0
                  }
                }""");
            assertOK(performRequestAgainstFulfillingCluster(createIndexRequest));

            // Index some documents, so we can attempt to search them from the querying cluster
            final Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
            bulkRequest.setJsonEntity(Strings.format("""
                { "index": { "_index": "index1" } }
                { "foo": "bar" }
                { "index": { "_index": "index2" } }
                { "bar": "foo" }
                { "index": { "_index": "prefixed_index" } }
                { "baz": "fee" }
                { "index": { "_index": "shared-metrics" } }
                { "name": "metric1" }
                { "index": { "_index": "shared-metrics" } }
                { "name": "metric2" }
                { "index": { "_index": "shared-metrics" } }
                { "name": "metric3" }
                { "index": { "_index": "shared-metrics" } }
                { "name": "metric4" }
                """));
            assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
        }

        // Query cluster
        {
            // Index some documents, to use them in a mixed-cluster search
            final var indexDocRequest = new Request("POST", "/local_index/_doc?refresh=true");
            indexDocRequest.setJsonEntity("{\"local_foo\": \"local_bar\"}");
            assertOK(client().performRequest(indexDocRequest));

            // Create user role with privileges for remote and local indices
            final var putRoleRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);
            putRoleRequest.setJsonEntity("""
                {
                  "description": "Role with privileges for remote and local indices.",
                  "indices": [
                    {
                      "names": ["local_index"],
                      "privileges": ["read"]
                    }
                  ],
                  "remote_indices": [
                    {
                      "names": ["index1", "not_found_index", "prefixed_index"],
                      "privileges": ["read", "read_cross_cluster"],
                      "clusters": ["my_remote_cluster"]
                    }
                  ]
                }""");
            assertOK(adminClient().performRequest(putRoleRequest));
            final var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER);
            putUserRequest.setJsonEntity("""
                {
                  "password": "x-pack-test-password",
                  "roles" : ["remote_search"]
                }""");
            assertOK(adminClient().performRequest(putUserRequest));

            // Check that we can search the fulfilling cluster from the querying cluster
            final boolean alsoSearchLocally = randomBoolean();
            final var searchRequest = new Request(
                "GET",
                String.format(
                    Locale.ROOT,
                    "/%s%s:%s/_search?ccs_minimize_roundtrips=%s",
                    alsoSearchLocally ? "local_index," : "",
                    randomFrom("my_remote_cluster", "*", "my_remote_*"),
                    randomFrom("index1", "*"),
                    randomBoolean()
                )
            );
            final Response response = performRequestWithRemoteSearchUser(searchRequest);
            assertOK(response);
            final SearchResponse searchResponse = SearchResponseUtils.parseSearchResponse(responseAsParser(response));
            try {
                final List<String> actualIndices = Arrays.stream(searchResponse.getHits().getHits())
                    .map(SearchHit::getIndex)
                    .collect(Collectors.toList());
                if (alsoSearchLocally) {
                    assertThat(actualIndices, containsInAnyOrder("index1", "local_index"));
                } else {
                    assertThat(actualIndices, containsInAnyOrder("index1"));
                }
            } finally {
                searchResponse.decRef();
            }

            // Check remote metric users can search metric documents from all FC nodes
            final var metricSearchRequest = new Request(
                "GET",
                String.format(Locale.ROOT, "/my_remote_cluster:*/_search?ccs_minimize_roundtrips=%s", randomBoolean())
            );
            final SearchResponse metricSearchResponse = SearchResponseUtils.parseSearchResponse(
                responseAsParser(performRequestWithRemoteMetricUser(metricSearchRequest))
            );
            try {
                assertThat(metricSearchResponse.getHits().getTotalHits().value(), equalTo(4L));
                assertThat(
                    Arrays.stream(metricSearchResponse.getHits().getHits()).map(SearchHit::getIndex).collect(Collectors.toSet()),
                    containsInAnyOrder("shared-metrics")
                );
            } finally {
                metricSearchResponse.decRef();
            }

            // Check that access is denied because of user privileges
            final ResponseException exception = expectThrows(
                ResponseException.class,
                () -> performRequestWithRemoteSearchUser(new Request("GET", "/my_remote_cluster:index2/_search"))
            );
            assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(403));
            assertThat(
                exception.getMessage(),
                containsString(
                    "action [indices:data/read/search] towards remote cluster is unauthorized for user [remote_search_user] "
                        + "with assigned roles [remote_search] authenticated by API key id ["
                        + crossClusterAccessApiKeyId
                        + "] of user [test_user] on indices [index2]"
                )
            );

            // Check that access is denied because of API key privileges
            final ResponseException exception2 = expectThrows(
                ResponseException.class,
                () -> performRequestWithRemoteSearchUser(new Request("GET", "/my_remote_cluster:prefixed_index/_search"))
            );
            assertThat(exception2.getResponse().getStatusLine().getStatusCode(), equalTo(403));
            assertThat(
                exception2.getMessage(),
                containsString(
                    "action [indices:data/read/search] towards remote cluster is unauthorized for user [remote_search_user] "
                        + "with assigned roles [remote_search] authenticated by API key id ["
                        + crossClusterAccessApiKeyId
                        + "] of user [test_user] on indices [prefixed_index]"
                )
            );

            // Check access is denied when user has no remote indices privileges
            final var putLocalSearchRoleRequest = new Request("PUT", "/_security/role/local_search");
            putLocalSearchRoleRequest.setJsonEntity(Strings.format("""
                {
                  "description": "Role with privileges for searching local only indices.",
                  "indices": [
                    {
                      "names": ["local_index"],
                      "privileges": ["read"]
                    }
                  ]%s
                }""", randomBoolean() ? "" : """
                ,
                "remote_indices": [
                   {
                     "names": ["*"],
                     "privileges": ["read", "read_cross_cluster"],
                     "clusters": ["other_remote_*"]
                   }
                 ]"""));
            assertOK(adminClient().performRequest(putLocalSearchRoleRequest));
            final var putlocalSearchUserRequest = new Request("PUT", "/_security/user/local_search_user");
            putlocalSearchUserRequest.setJsonEntity("""
                {
                  "password": "x-pack-test-password",
                  "roles" : ["local_search"]
                }""");
            assertOK(adminClient().performRequest(putlocalSearchUserRequest));
            final ResponseException exception3 = expectThrows(
                ResponseException.class,
                () -> performRequestWithLocalSearchUser(
                    new Request("GET", "/" + randomFrom("my_remote_cluster:*", "*:*", "*,*:*", "my_*:*,local_index") + "/_search")
                )
            );
            assertThat(exception3.getResponse().getStatusLine().getStatusCode(), equalTo(403));
            assertThat(
                exception3.getMessage(),
                containsString(
                    "action [indices:data/read/search] towards remote cluster [my_remote_cluster]"
                        + " is unauthorized for user [local_search_user] with effective roles [local_search]"
                        + " because no remote indices privileges apply for the target cluster"
                )
            );

            // Check that authentication fails if we use a non-existent API key (when skip_unavailable=false)
            boolean skipUnavailable = randomBoolean();
            updateClusterSettings(
                randomBoolean()
                    ? Settings.builder()
                        .put("cluster.remote.invalid_remote.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0))
                        .put("cluster.remote.invalid_remote.skip_unavailable", Boolean.toString(skipUnavailable))
                        .build()
                    : Settings.builder()
                        .put("cluster.remote.invalid_remote.mode", "proxy")
                        .put("cluster.remote.invalid_remote.skip_unavailable", Boolean.toString(skipUnavailable))
                        .put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
                        .build()
            );
            if (skipUnavailable) {
                /*
                  when skip_unavailable=true, response should be something like:
                  {"took":1,"timed_out":false,"num_reduce_phases":0,"_shards":{"total":0,"successful":0,"skipped":0,"failed":0},
                   "_clusters":{"total":1,"successful":0,"skipped":1,"running":0,"partial":0,"failed":0,
                                "details":{"invalid_remote":{"status":"skipped","indices":"index1","timed_out":false,
                                "failures":[{"shard":-1,"index":null,"reason":{"type":"connect_transport_exception",
                                             "reason":"Unable to connect to [invalid_remote]"}}]}}},
                    "hits":{"total":{"value":0,"relation":"eq"},"max_score":null,"hits":[]}}
                 */
                Response invalidRemoteResponse = performRequestWithRemoteSearchUser(new Request("GET", "/invalid_remote:index1/_search"));
                assertThat(invalidRemoteResponse.getStatusLine().getStatusCode(), equalTo(200));
                String responseJson = EntityUtils.toString(invalidRemoteResponse.getEntity());
                assertThat(responseJson, containsString("\"status\":\"skipped\""));
                assertThat(responseJson, containsString("connect_transport_exception"));
            } else {
                final ResponseException exception4 = expectThrows(
                    ResponseException.class,
                    () -> performRequestWithRemoteSearchUser(new Request("GET", "/invalid_remote:index1/_search"))
                );
                assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(401));
                assertThat(exception4.getMessage(), containsString("unable to find apikey"));
            }

            // check that REST API key is not supported by cross cluster access (when skip_unavailable=false)
            skipUnavailable = randomBoolean();
            updateClusterSettings(
                randomBoolean()
                    ? Settings.builder()
                        .put("cluster.remote.wrong_api_key_type.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0))
                        .put("cluster.remote.wrong_api_key_type.skip_unavailable", Boolean.toString(skipUnavailable))
                        .build()
                    : Settings.builder()
                        .put("cluster.remote.wrong_api_key_type.mode", "proxy")
                        .put("cluster.remote.wrong_api_key_type.skip_unavailable", Boolean.toString(skipUnavailable))
                        .put("cluster.remote.wrong_api_key_type.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
                        .build()
            );
            if (skipUnavailable) {
                Response invalidRemoteResponse = performRequestWithRemoteSearchUser(new Request("GET", "/wrong_api_key_type:*/_search"));
                assertThat(invalidRemoteResponse.getStatusLine().getStatusCode(), equalTo(200));
                String responseJson = EntityUtils.toString(invalidRemoteResponse.getEntity());
                assertThat(responseJson, containsString("\"status\":\"skipped\""));
                assertThat(responseJson, containsString("connect_transport_exception"));
            } else {
                final ResponseException exception5 = expectThrows(
                    ResponseException.class,
                    () -> performRequestWithRemoteSearchUser(new Request("GET", "/wrong_api_key_type:*/_search"))
                );
                assertThat(exception5.getResponse().getStatusLine().getStatusCode(), equalTo(401));
                assertThat(
                    exception5.getMessage(),
                    containsString(
                        "authentication expected API key type of [cross_cluster], but API key ["
                            + REST_API_KEY_MAP_REF.get().get("id")
                            + "] has type [rest]"
                    )
                );
            }

            // Check invalid cross-cluster API key length is rejected (and gets security error when skip_unavailable=false)
            skipUnavailable = randomBoolean();
            updateClusterSettings(
                randomBoolean()
                    ? Settings.builder()
                        .put("cluster.remote.invalid_secret_length.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0))
                        .put("cluster.remote.invalid_secret_length.skip_unavailable", Boolean.toString(skipUnavailable))
                        .build()
                    : Settings.builder()
                        .put("cluster.remote.invalid_secret_length.mode", "proxy")
                        .put("cluster.remote.invalid_secret_length.skip_unavailable", Boolean.toString(skipUnavailable))
                        .put("cluster.remote.invalid_secret_length.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
                        .build()
            );
            if (skipUnavailable) {
                Response invalidRemoteResponse = performRequestWithRemoteSearchUser(new Request("GET", "/invalid_secret_length:*/_search"));
                assertThat(invalidRemoteResponse.getStatusLine().getStatusCode(), equalTo(200));
                String responseJson = EntityUtils.toString(invalidRemoteResponse.getEntity());
                assertThat(responseJson, containsString("\"status\":\"skipped\""));
                assertThat(responseJson, containsString("connect_transport_exception"));
            } else {
                final ResponseException exception6 = expectThrows(
                    ResponseException.class,
                    () -> performRequestWithRemoteSearchUser(new Request("GET", "/invalid_secret_length:*/_search"))
                );
                assertThat(exception6.getResponse().getStatusLine().getStatusCode(), equalTo(401));
                assertThat(exception6.getMessage(), containsString("invalid cross-cluster API key value"));
            }
        }
        assertNoRcs1DeprecationWarnings();
    }