in x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java [108:423]
public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQueryingAnyNodeWhenTheyAreOutsideOfTheQueryRange()
throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
final String dataNodeHoldingRegularIndex = internalCluster().startDataOnlyNode();
final String dataNodeHoldingSearchableSnapshot = internalCluster().startDataOnlyNode();
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNodeHoldingSearchableSnapshot);
final String indexOutsideSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final int indexOutsideSearchRangeShardCount = randomIntBetween(1, 3);
createIndexWithTimestampAndEventIngested(indexOutsideSearchRange, indexOutsideSearchRangeShardCount, Settings.EMPTY);
final String indexWithinSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final int indexWithinSearchRangeShardCount = randomIntBetween(1, 3);
createIndexWithTimestampAndEventIngested(
indexWithinSearchRange,
indexWithinSearchRangeShardCount,
Settings.builder()
.put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingRegularIndex)
.build()
);
final int totalShards = indexOutsideSearchRangeShardCount + indexWithinSearchRangeShardCount;
// Either add data outside of the range, or documents that don't have timestamp data
final boolean indexDataWithTimestamp = randomBoolean();
// Add enough documents to have non-metadata segment files in all shards,
// otherwise the mount operation might go through as the read won't be blocked
final int numberOfDocsInIndexOutsideSearchRange = between(350, 1000);
if (indexDataWithTimestamp) {
indexDocumentsWithTimestampAndEventIngestedDates(
indexOutsideSearchRange,
numberOfDocsInIndexOutsideSearchRange,
TIMESTAMP_TEMPLATE_OUTSIDE_RANGE
);
} else {
indexRandomDocs(indexOutsideSearchRange, numberOfDocsInIndexOutsideSearchRange);
}
// Index enough documents to ensure that all shards have at least some documents
int numDocsWithinRange = between(100, 1000);
indexDocumentsWithTimestampAndEventIngestedDates(indexWithinSearchRange, numDocsWithinRange, TIMESTAMP_TEMPLATE_WITHIN_RANGE);
final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createRepository(repositoryName, "mock");
final SnapshotId snapshotId = createSnapshot(repositoryName, "snapshot-1", List.of(indexOutsideSearchRange)).snapshotId();
assertAcked(indicesAdmin().prepareDelete(indexOutsideSearchRange));
final String searchableSnapshotIndexOutsideSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
// Block the repository for the node holding the searchable snapshot shards
// to delay its restore
blockDataNode(repositoryName, dataNodeHoldingSearchableSnapshot);
// Force the searchable snapshot to be allocated in a particular node
Settings restoredIndexSettings = Settings.builder()
.put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingSearchableSnapshot)
.build();
final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest(
TEST_REQUEST_TIMEOUT,
searchableSnapshotIndexOutsideSearchRange,
repositoryName,
snapshotId.getName(),
indexOutsideSearchRange,
restoredIndexSettings,
Strings.EMPTY_ARRAY,
false,
randomFrom(MountSearchableSnapshotRequest.Storage.values())
);
client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).actionGet();
final IndexMetadata indexMetadata = getIndexMetadata(searchableSnapshotIndexOutsideSearchRange);
assertThat(indexMetadata.getTimestampRange(), equalTo(IndexLongFieldRange.NO_SHARDS));
assertThat(indexMetadata.getEventIngestedRange(), equalTo(IndexLongFieldRange.NO_SHARDS));
DateFieldRangeInfo timestampFieldTypeInfo = indicesService.getTimestampFieldTypeInfo(indexMetadata.getIndex());
assertThat(timestampFieldTypeInfo, nullValue());
final boolean includeIndexCoveringSearchRangeInSearchRequest = randomBoolean();
List<String> indicesToSearch = new ArrayList<>();
if (includeIndexCoveringSearchRangeInSearchRequest) {
indicesToSearch.add(indexWithinSearchRange);
}
indicesToSearch.add(searchableSnapshotIndexOutsideSearchRange);
String timeField = randomFrom(IndexMetadata.EVENT_INGESTED_FIELD_NAME, DataStream.TIMESTAMP_FIELD_NAME);
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery(timeField)
.from("2020-11-28T00:00:00.000000000Z", true)
.to("2020-11-29T00:00:00.000000000Z");
SearchRequest request = new SearchRequest().indices(indicesToSearch.toArray(new String[0]))
.source(new SearchSourceBuilder().query(rangeQuery));
if (includeIndexCoveringSearchRangeInSearchRequest) {
assertResponse(client().search(request), searchResponse -> {
// All the regular index searches succeeded
assertThat(searchResponse.getSuccessfulShards(), equalTo(indexWithinSearchRangeShardCount));
// All the searchable snapshots shard search failed
assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(searchResponse.getSkippedShards(), equalTo(0));
assertThat(searchResponse.getTotalShards(), equalTo(totalShards));
});
} else {
// All shards failed, since all shards are unassigned and the IndexMetadata min/max timestamp
// is not available yet
expectThrows(SearchPhaseExecutionException.class, () -> client().search(request).actionGet());
}
// test with SearchShardsAPI
{
boolean allowPartialSearchResults = includeIndexCoveringSearchRangeInSearchRequest;
SearchShardsRequest searchShardsRequest = new SearchShardsRequest(
indicesToSearch.toArray(new String[0]),
SearchRequest.DEFAULT_INDICES_OPTIONS,
rangeQuery,
null,
null,
allowPartialSearchResults,
null
);
if (includeIndexCoveringSearchRangeInSearchRequest) {
SearchShardsResponse searchShardsResponse = client().execute(TransportSearchShardsAction.TYPE, searchShardsRequest)
.actionGet();
assertThat(searchShardsResponse.getGroups().size(), equalTo(totalShards));
List<List<SearchShardsGroup>> partitionedBySkipped = searchShardsResponse.getGroups()
.stream()
.collect(
Collectors.teeing(
Collectors.filtering(g -> g.skipped(), Collectors.toList()),
Collectors.filtering(g -> g.skipped() == false, Collectors.toList()),
List::of
)
);
List<SearchShardsGroup> skipped = partitionedBySkipped.get(0);
List<SearchShardsGroup> notSkipped = partitionedBySkipped.get(1);
assertThat(skipped.size(), equalTo(0));
assertThat(notSkipped.size(), equalTo(totalShards));
} else {
SearchShardsResponse searchShardsResponse = null;
try {
searchShardsResponse = client().execute(TransportSearchShardsAction.TYPE, searchShardsRequest).actionGet();
} catch (SearchPhaseExecutionException e) {
// ignore as this is expected to happen
}
if (searchShardsResponse != null) {
for (SearchShardsGroup group : searchShardsResponse.getGroups()) {
assertFalse("no shard should be marked as skipped", group.skipped());
}
}
}
}
// Allow the searchable snapshots to be finally mounted
unblockNode(repositoryName, dataNodeHoldingSearchableSnapshot);
waitUntilRecoveryIsDone(searchableSnapshotIndexOutsideSearchRange);
ensureGreen(searchableSnapshotIndexOutsideSearchRange);
final IndexMetadata updatedIndexMetadata = getIndexMetadata(searchableSnapshotIndexOutsideSearchRange);
// check that @timestamp and 'event.ingested' are now in cluster state
final IndexLongFieldRange updatedTimestampMillisRange = updatedIndexMetadata.getTimestampRange();
assertThat(updatedTimestampMillisRange.isComplete(), equalTo(true));
final IndexLongFieldRange updatedEventIngestedRange = updatedIndexMetadata.getEventIngestedRange();
assertThat(updatedEventIngestedRange.isComplete(), equalTo(true));
timestampFieldTypeInfo = indicesService.getTimestampFieldTypeInfo(updatedIndexMetadata.getIndex());
final DateFieldMapper.DateFieldType timestampDataFieldType = timestampFieldTypeInfo.timestampFieldType();
assertThat(timestampDataFieldType, notNullValue());
final DateFieldMapper.DateFieldType eventIngestedDataFieldType = timestampFieldTypeInfo.eventIngestedFieldType();
assertThat(eventIngestedDataFieldType, notNullValue());
final DateFieldMapper.Resolution timestampResolution = timestampDataFieldType.resolution();
final DateFieldMapper.Resolution eventIngestedResolution = eventIngestedDataFieldType.resolution();
if (indexDataWithTimestamp) {
assertThat(updatedTimestampMillisRange, not(sameInstance(IndexLongFieldRange.EMPTY)));
assertThat(
updatedTimestampMillisRange.getMin(),
greaterThanOrEqualTo(timestampResolution.convert(Instant.parse("2020-11-26T00:00:00Z")))
);
assertThat(
updatedTimestampMillisRange.getMax(),
lessThanOrEqualTo(timestampResolution.convert(Instant.parse("2020-11-27T00:00:00Z")))
);
assertThat(updatedEventIngestedRange, not(sameInstance(IndexLongFieldRange.EMPTY)));
assertThat(
updatedEventIngestedRange.getMin(),
greaterThanOrEqualTo(eventIngestedResolution.convert(Instant.parse("2020-11-26T00:00:00Z")))
);
assertThat(
updatedEventIngestedRange.getMax(),
lessThanOrEqualTo(eventIngestedResolution.convert(Instant.parse("2020-11-27T00:00:00Z")))
);
} else {
assertThat(updatedTimestampMillisRange, sameInstance(IndexLongFieldRange.EMPTY));
assertThat(updatedEventIngestedRange, sameInstance(IndexLongFieldRange.EMPTY));
}
// Stop the node holding the searchable snapshots, and since we defined
// the index allocation criteria to require the searchable snapshot
// index to be allocated in that node, the shards should remain unassigned
internalCluster().stopNode(dataNodeHoldingSearchableSnapshot);
waitUntilAllShardsAreUnassigned(updatedIndexMetadata.getIndex());
if (includeIndexCoveringSearchRangeInSearchRequest) {
assertResponse(client().search(request), newSearchResponse -> {
assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards));
assertThat(newSearchResponse.getFailedShards(), equalTo(0));
assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards));
assertThat(newSearchResponse.getHits().getTotalHits().value(), equalTo((long) numDocsWithinRange));
});
// test with SearchShardsAPI
{
boolean allowPartialSearchResults = true;
SearchShardsRequest searchShardsRequest = new SearchShardsRequest(
indicesToSearch.toArray(new String[0]),
SearchRequest.DEFAULT_INDICES_OPTIONS,
rangeQuery,
null,
null,
allowPartialSearchResults,
null
);
SearchShardsResponse searchShardsResponse = client().execute(TransportSearchShardsAction.TYPE, searchShardsRequest)
.actionGet();
assertThat(searchShardsResponse.getGroups().size(), equalTo(totalShards));
List<List<SearchShardsGroup>> partitionedBySkipped = searchShardsResponse.getGroups()
.stream()
.collect(
Collectors.teeing(
Collectors.filtering(g -> g.skipped(), Collectors.toList()),
Collectors.filtering(g -> g.skipped() == false, Collectors.toList()),
List::of
)
);
List<SearchShardsGroup> skipped = partitionedBySkipped.get(0);
List<SearchShardsGroup> notSkipped = partitionedBySkipped.get(1);
assertThat(skipped.size(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(notSkipped.size(), equalTo(totalShards - indexOutsideSearchRangeShardCount));
}
} else {
if (indexOutsideSearchRangeShardCount == 1) {
expectThrows(SearchPhaseExecutionException.class, () -> client().search(request).actionGet());
// test with SearchShardsAPI
{
boolean allowPartialSearchResults = false;
SearchShardsRequest searchShardsRequest = new SearchShardsRequest(
indicesToSearch.toArray(new String[0]),
SearchRequest.DEFAULT_INDICES_OPTIONS,
rangeQuery,
null,
null,
allowPartialSearchResults,
null
);
SearchShardsResponse searchShardsResponse = null;
try {
searchShardsResponse = client().execute(TransportSearchShardsAction.TYPE, searchShardsRequest).actionGet();
} catch (SearchPhaseExecutionException e) {
// ignore as this is what should happen
}
if (searchShardsResponse != null) {
for (SearchShardsGroup group : searchShardsResponse.getGroups()) {
assertTrue("the shard is skipped because index value is outside the query time range", group.skipped());
}
}
}
} else {
assertResponse(client().search(request), newSearchResponse -> {
assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(newSearchResponse.getFailedShards(), equalTo(0));
assertThat(newSearchResponse.getTotalShards(), equalTo(indexOutsideSearchRangeShardCount));
});
// test with SearchShardsAPI
{
boolean allowPartialSearchResults = true;
SearchShardsRequest searchShardsRequest = new SearchShardsRequest(
indicesToSearch.toArray(new String[0]),
SearchRequest.DEFAULT_INDICES_OPTIONS,
rangeQuery,
null,
null,
allowPartialSearchResults,
null
);
SearchShardsResponse searchShardsResponse = client().execute(TransportSearchShardsAction.TYPE, searchShardsRequest)
.actionGet();
assertThat(searchShardsResponse.getGroups().size(), equalTo(indexOutsideSearchRangeShardCount));
List<List<SearchShardsGroup>> partitionedBySkipped = searchShardsResponse.getGroups()
.stream()
.collect(
Collectors.teeing(
Collectors.filtering(g -> g.skipped(), Collectors.toList()),
Collectors.filtering(g -> g.skipped() == false, Collectors.toList()),
List::of
)
);
List<SearchShardsGroup> skipped = partitionedBySkipped.get(0);
List<SearchShardsGroup> notSkipped = partitionedBySkipped.get(1);
assertThat(skipped.size(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(notSkipped.size(), equalTo(indexOutsideSearchRangeShardCount - indexOutsideSearchRangeShardCount));
}
}
}
}