in x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java [314:614]
public void testCrossClusterSearch() throws Exception {
configureRemoteCluster();
final String crossClusterAccessApiKeyId = (String) API_KEY_MAP_REF.get().get("id");
// Fulfilling cluster
{
// Spread the shards to all nodes
final Request createIndexRequest = new Request("PUT", "shared-metrics");
createIndexRequest.setJsonEntity("""
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 0
}
}""");
assertOK(performRequestAgainstFulfillingCluster(createIndexRequest));
// Index some documents, so we can attempt to search them from the querying cluster
final Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
bulkRequest.setJsonEntity(Strings.format("""
{ "index": { "_index": "index1" } }
{ "foo": "bar" }
{ "index": { "_index": "index2" } }
{ "bar": "foo" }
{ "index": { "_index": "prefixed_index" } }
{ "baz": "fee" }
{ "index": { "_index": "shared-metrics" } }
{ "name": "metric1" }
{ "index": { "_index": "shared-metrics" } }
{ "name": "metric2" }
{ "index": { "_index": "shared-metrics" } }
{ "name": "metric3" }
{ "index": { "_index": "shared-metrics" } }
{ "name": "metric4" }
"""));
assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
}
// Query cluster
{
// Index some documents, to use them in a mixed-cluster search
final var indexDocRequest = new Request("POST", "/local_index/_doc?refresh=true");
indexDocRequest.setJsonEntity("{\"local_foo\": \"local_bar\"}");
assertOK(client().performRequest(indexDocRequest));
// Create user role with privileges for remote and local indices
final var putRoleRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);
putRoleRequest.setJsonEntity("""
{
"description": "Role with privileges for remote and local indices.",
"indices": [
{
"names": ["local_index"],
"privileges": ["read"]
}
],
"remote_indices": [
{
"names": ["index1", "not_found_index", "prefixed_index"],
"privileges": ["read", "read_cross_cluster"],
"clusters": ["my_remote_cluster"]
}
]
}""");
assertOK(adminClient().performRequest(putRoleRequest));
final var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER);
putUserRequest.setJsonEntity("""
{
"password": "x-pack-test-password",
"roles" : ["remote_search"]
}""");
assertOK(adminClient().performRequest(putUserRequest));
// Check that we can search the fulfilling cluster from the querying cluster
final boolean alsoSearchLocally = randomBoolean();
final var searchRequest = new Request(
"GET",
String.format(
Locale.ROOT,
"/%s%s:%s/_search?ccs_minimize_roundtrips=%s",
alsoSearchLocally ? "local_index," : "",
randomFrom("my_remote_cluster", "*", "my_remote_*"),
randomFrom("index1", "*"),
randomBoolean()
)
);
final Response response = performRequestWithRemoteSearchUser(searchRequest);
assertOK(response);
final SearchResponse searchResponse = SearchResponseUtils.parseSearchResponse(responseAsParser(response));
try {
final List<String> actualIndices = Arrays.stream(searchResponse.getHits().getHits())
.map(SearchHit::getIndex)
.collect(Collectors.toList());
if (alsoSearchLocally) {
assertThat(actualIndices, containsInAnyOrder("index1", "local_index"));
} else {
assertThat(actualIndices, containsInAnyOrder("index1"));
}
} finally {
searchResponse.decRef();
}
// Check remote metric users can search metric documents from all FC nodes
final var metricSearchRequest = new Request(
"GET",
String.format(Locale.ROOT, "/my_remote_cluster:*/_search?ccs_minimize_roundtrips=%s", randomBoolean())
);
final SearchResponse metricSearchResponse = SearchResponseUtils.parseSearchResponse(
responseAsParser(performRequestWithRemoteMetricUser(metricSearchRequest))
);
try {
assertThat(metricSearchResponse.getHits().getTotalHits().value(), equalTo(4L));
assertThat(
Arrays.stream(metricSearchResponse.getHits().getHits()).map(SearchHit::getIndex).collect(Collectors.toSet()),
containsInAnyOrder("shared-metrics")
);
} finally {
metricSearchResponse.decRef();
}
// Check that access is denied because of user privileges
final ResponseException exception = expectThrows(
ResponseException.class,
() -> performRequestWithRemoteSearchUser(new Request("GET", "/my_remote_cluster:index2/_search"))
);
assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(
exception.getMessage(),
containsString(
"action [indices:data/read/search] towards remote cluster is unauthorized for user [remote_search_user] "
+ "with assigned roles [remote_search] authenticated by API key id ["
+ crossClusterAccessApiKeyId
+ "] of user [test_user] on indices [index2]"
)
);
// Check that access is denied because of API key privileges
final ResponseException exception2 = expectThrows(
ResponseException.class,
() -> performRequestWithRemoteSearchUser(new Request("GET", "/my_remote_cluster:prefixed_index/_search"))
);
assertThat(exception2.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(
exception2.getMessage(),
containsString(
"action [indices:data/read/search] towards remote cluster is unauthorized for user [remote_search_user] "
+ "with assigned roles [remote_search] authenticated by API key id ["
+ crossClusterAccessApiKeyId
+ "] of user [test_user] on indices [prefixed_index]"
)
);
// Check access is denied when user has no remote indices privileges
final var putLocalSearchRoleRequest = new Request("PUT", "/_security/role/local_search");
putLocalSearchRoleRequest.setJsonEntity(Strings.format("""
{
"description": "Role with privileges for searching local only indices.",
"indices": [
{
"names": ["local_index"],
"privileges": ["read"]
}
]%s
}""", randomBoolean() ? "" : """
,
"remote_indices": [
{
"names": ["*"],
"privileges": ["read", "read_cross_cluster"],
"clusters": ["other_remote_*"]
}
]"""));
assertOK(adminClient().performRequest(putLocalSearchRoleRequest));
final var putlocalSearchUserRequest = new Request("PUT", "/_security/user/local_search_user");
putlocalSearchUserRequest.setJsonEntity("""
{
"password": "x-pack-test-password",
"roles" : ["local_search"]
}""");
assertOK(adminClient().performRequest(putlocalSearchUserRequest));
final ResponseException exception3 = expectThrows(
ResponseException.class,
() -> performRequestWithLocalSearchUser(
new Request("GET", "/" + randomFrom("my_remote_cluster:*", "*:*", "*,*:*", "my_*:*,local_index") + "/_search")
)
);
assertThat(exception3.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(
exception3.getMessage(),
containsString(
"action [indices:data/read/search] towards remote cluster [my_remote_cluster]"
+ " is unauthorized for user [local_search_user] with effective roles [local_search]"
+ " because no remote indices privileges apply for the target cluster"
)
);
// Check that authentication fails if we use a non-existent API key (when skip_unavailable=false)
boolean skipUnavailable = randomBoolean();
updateClusterSettings(
randomBoolean()
? Settings.builder()
.put("cluster.remote.invalid_remote.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.put("cluster.remote.invalid_remote.skip_unavailable", Boolean.toString(skipUnavailable))
.build()
: Settings.builder()
.put("cluster.remote.invalid_remote.mode", "proxy")
.put("cluster.remote.invalid_remote.skip_unavailable", Boolean.toString(skipUnavailable))
.put("cluster.remote.invalid_remote.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
);
if (skipUnavailable) {
/*
when skip_unavailable=true, response should be something like:
{"took":1,"timed_out":false,"num_reduce_phases":0,"_shards":{"total":0,"successful":0,"skipped":0,"failed":0},
"_clusters":{"total":1,"successful":0,"skipped":1,"running":0,"partial":0,"failed":0,
"details":{"invalid_remote":{"status":"skipped","indices":"index1","timed_out":false,
"failures":[{"shard":-1,"index":null,"reason":{"type":"connect_transport_exception",
"reason":"Unable to connect to [invalid_remote]"}}]}}},
"hits":{"total":{"value":0,"relation":"eq"},"max_score":null,"hits":[]}}
*/
Response invalidRemoteResponse = performRequestWithRemoteSearchUser(new Request("GET", "/invalid_remote:index1/_search"));
assertThat(invalidRemoteResponse.getStatusLine().getStatusCode(), equalTo(200));
String responseJson = EntityUtils.toString(invalidRemoteResponse.getEntity());
assertThat(responseJson, containsString("\"status\":\"skipped\""));
assertThat(responseJson, containsString("connect_transport_exception"));
} else {
final ResponseException exception4 = expectThrows(
ResponseException.class,
() -> performRequestWithRemoteSearchUser(new Request("GET", "/invalid_remote:index1/_search"))
);
assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(401));
assertThat(exception4.getMessage(), containsString("unable to find apikey"));
}
// check that REST API key is not supported by cross cluster access (when skip_unavailable=false)
skipUnavailable = randomBoolean();
updateClusterSettings(
randomBoolean()
? Settings.builder()
.put("cluster.remote.wrong_api_key_type.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.put("cluster.remote.wrong_api_key_type.skip_unavailable", Boolean.toString(skipUnavailable))
.build()
: Settings.builder()
.put("cluster.remote.wrong_api_key_type.mode", "proxy")
.put("cluster.remote.wrong_api_key_type.skip_unavailable", Boolean.toString(skipUnavailable))
.put("cluster.remote.wrong_api_key_type.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
);
if (skipUnavailable) {
Response invalidRemoteResponse = performRequestWithRemoteSearchUser(new Request("GET", "/wrong_api_key_type:*/_search"));
assertThat(invalidRemoteResponse.getStatusLine().getStatusCode(), equalTo(200));
String responseJson = EntityUtils.toString(invalidRemoteResponse.getEntity());
assertThat(responseJson, containsString("\"status\":\"skipped\""));
assertThat(responseJson, containsString("connect_transport_exception"));
} else {
final ResponseException exception5 = expectThrows(
ResponseException.class,
() -> performRequestWithRemoteSearchUser(new Request("GET", "/wrong_api_key_type:*/_search"))
);
assertThat(exception5.getResponse().getStatusLine().getStatusCode(), equalTo(401));
assertThat(
exception5.getMessage(),
containsString(
"authentication expected API key type of [cross_cluster], but API key ["
+ REST_API_KEY_MAP_REF.get().get("id")
+ "] has type [rest]"
)
);
}
// Check invalid cross-cluster API key length is rejected (and gets security error when skip_unavailable=false)
skipUnavailable = randomBoolean();
updateClusterSettings(
randomBoolean()
? Settings.builder()
.put("cluster.remote.invalid_secret_length.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.put("cluster.remote.invalid_secret_length.skip_unavailable", Boolean.toString(skipUnavailable))
.build()
: Settings.builder()
.put("cluster.remote.invalid_secret_length.mode", "proxy")
.put("cluster.remote.invalid_secret_length.skip_unavailable", Boolean.toString(skipUnavailable))
.put("cluster.remote.invalid_secret_length.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
.build()
);
if (skipUnavailable) {
Response invalidRemoteResponse = performRequestWithRemoteSearchUser(new Request("GET", "/invalid_secret_length:*/_search"));
assertThat(invalidRemoteResponse.getStatusLine().getStatusCode(), equalTo(200));
String responseJson = EntityUtils.toString(invalidRemoteResponse.getEntity());
assertThat(responseJson, containsString("\"status\":\"skipped\""));
assertThat(responseJson, containsString("connect_transport_exception"));
} else {
final ResponseException exception6 = expectThrows(
ResponseException.class,
() -> performRequestWithRemoteSearchUser(new Request("GET", "/invalid_secret_length:*/_search"))
);
assertThat(exception6.getResponse().getStatusLine().getStatusCode(), equalTo(401));
assertThat(exception6.getMessage(), containsString("invalid cross-cluster API key value"));
}
}
assertNoRcs1DeprecationWarnings();
}