public static PartitionReader createReader()

in mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java [406:472]


    public static PartitionReader createReader(Settings settings, PartitionDefinition partition, Log log) {
        if (!SettingsUtils.hasPinnedNode(settings) && partition.getLocations().length > 0) {
            String pinAddress = checkLocality(partition.getLocations(), log);
            if (pinAddress != null) {
                if (log.isDebugEnabled()) {
                    log.debug(String.format("Partition reader instance [%s] assigned to [%s]", partition, pinAddress));
                }
                SettingsUtils.pinNode(settings, pinAddress);
            }
        }
        ClusterInfo clusterInfo = InitializationUtils.discoverClusterInfo(settings, log);
        ValueReader reader = ObjectUtils.instantiate(settings.getSerializerValueReaderClassName(), settings);
        // initialize REST client
        RestRepository repository = new RestRepository(settings);
        Mapping fieldMapping = null;
        if (StringUtils.hasText(partition.getSerializedMapping())) {
            fieldMapping = IOUtils.deserializeFromJsonString(partition.getSerializedMapping(), Mapping.class);
        }
        else {
            log.warn(String.format("No mapping found for [%s] - either no index exists or the partition configuration has been corrupted", partition));
        }

        ScrollReader scrollReader = new ScrollReader(ScrollReaderConfigBuilder.builder(reader, fieldMapping, settings));
        if (settings.getNodesClientOnly()) {
            String clientNode = repository.getRestClient().getCurrentNode();
            if (log.isDebugEnabled()) {
                log.debug(String.format("Client-node routing detected; partition reader instance [%s] assigned to [%s]",
                        partition, clientNode));
            }
            SettingsUtils.pinNode(settings, clientNode);
        }

        // take into account client node routing
        boolean includeVersion = settings.getReadMetadata() && settings.getReadMetadataVersion();
        Resource read = new Resource(settings, true);
        SearchRequestBuilder requestBuilder =
                new SearchRequestBuilder(clusterInfo.getMajorVersion(), includeVersion)
                        .resource(read)
                        // Overwrite the index name from the resource to be that of the concrete index in the partition definition
                        .indices(partition.getIndex())
                        .query(QueryUtils.parseQuery(settings))
                        .scroll(settings.getScrollKeepAlive())
                        .size(settings.getScrollSize())
                        .limit(settings.getScrollLimit())
                        .fields(SettingsUtils.determineSourceFields(settings))
                        .filters(QueryUtils.parseFilters(settings))
                        .shard(Integer.toString(partition.getShardId()))
                        .readMetadata(settings.getReadMetadata())
                        .local(true)
                        .preference(settings.getShardPreference())
                        .excludeSource(settings.getExcludeSource());
        if (partition.getSlice() != null && partition.getSlice().max > 1) {
            requestBuilder.slice(partition.getSlice().id, partition.getSlice().max);
        }
        String[] indices = read.index().split(",");
        if (QueryUtils.isExplicitlyRequested(partition.getIndex(), indices) == false) {
            IndicesAliases indicesAliases =
                    new GetAliasesRequestBuilder(repository.getRestClient())
                            .indices(partition.getIndex())
                            .execute().getIndices();
            Map<String, IndicesAliases.Alias> aliases = indicesAliases.getAliases(partition.getIndex());
            if (aliases != null && aliases.size() > 0) {
                requestBuilder = applyAliasMetadata(clusterInfo.getMajorVersion(), aliases, requestBuilder, partition.getIndex(), indices);
            }
        }
        return new PartitionReader(scrollReader, repository, requestBuilder);
    }