in mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java [406:472]
public static PartitionReader createReader(Settings settings, PartitionDefinition partition, Log log) {
if (!SettingsUtils.hasPinnedNode(settings) && partition.getLocations().length > 0) {
String pinAddress = checkLocality(partition.getLocations(), log);
if (pinAddress != null) {
if (log.isDebugEnabled()) {
log.debug(String.format("Partition reader instance [%s] assigned to [%s]", partition, pinAddress));
}
SettingsUtils.pinNode(settings, pinAddress);
}
}
ClusterInfo clusterInfo = InitializationUtils.discoverClusterInfo(settings, log);
ValueReader reader = ObjectUtils.instantiate(settings.getSerializerValueReaderClassName(), settings);
// initialize REST client
RestRepository repository = new RestRepository(settings);
Mapping fieldMapping = null;
if (StringUtils.hasText(partition.getSerializedMapping())) {
fieldMapping = IOUtils.deserializeFromJsonString(partition.getSerializedMapping(), Mapping.class);
}
else {
log.warn(String.format("No mapping found for [%s] - either no index exists or the partition configuration has been corrupted", partition));
}
ScrollReader scrollReader = new ScrollReader(ScrollReaderConfigBuilder.builder(reader, fieldMapping, settings));
if (settings.getNodesClientOnly()) {
String clientNode = repository.getRestClient().getCurrentNode();
if (log.isDebugEnabled()) {
log.debug(String.format("Client-node routing detected; partition reader instance [%s] assigned to [%s]",
partition, clientNode));
}
SettingsUtils.pinNode(settings, clientNode);
}
// take into account client node routing
boolean includeVersion = settings.getReadMetadata() && settings.getReadMetadataVersion();
Resource read = new Resource(settings, true);
SearchRequestBuilder requestBuilder =
new SearchRequestBuilder(clusterInfo.getMajorVersion(), includeVersion)
.resource(read)
// Overwrite the index name from the resource to be that of the concrete index in the partition definition
.indices(partition.getIndex())
.query(QueryUtils.parseQuery(settings))
.scroll(settings.getScrollKeepAlive())
.size(settings.getScrollSize())
.limit(settings.getScrollLimit())
.fields(SettingsUtils.determineSourceFields(settings))
.filters(QueryUtils.parseFilters(settings))
.shard(Integer.toString(partition.getShardId()))
.readMetadata(settings.getReadMetadata())
.local(true)
.preference(settings.getShardPreference())
.excludeSource(settings.getExcludeSource());
if (partition.getSlice() != null && partition.getSlice().max > 1) {
requestBuilder.slice(partition.getSlice().id, partition.getSlice().max);
}
String[] indices = read.index().split(",");
if (QueryUtils.isExplicitlyRequested(partition.getIndex(), indices) == false) {
IndicesAliases indicesAliases =
new GetAliasesRequestBuilder(repository.getRestClient())
.indices(partition.getIndex())
.execute().getIndices();
Map<String, IndicesAliases.Alias> aliases = indicesAliases.getAliases(partition.getIndex());
if (aliases != null && aliases.size() > 0) {
requestBuilder = applyAliasMetadata(clusterInfo.getMajorVersion(), aliases, requestBuilder, partition.getIndex(), indices);
}
}
return new PartitionReader(scrollReader, repository, requestBuilder);
}