public void delete()

in mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java [376:471]


    public void delete() {
        if (client.clusterInfo.getMajorVersion().on(EsMajorVersion.V_1_X)) {
            // ES 1.x - delete as usual
            // Delete just the mapping
            client.delete(resources.getResourceWrite().index() + "/" + resources.getResourceWrite().type());
        }
        else {
            // try first a blind delete by query (since the plugin might be installed)
            try {
                if (resources.getResourceWrite().isTyped()) {
                    client.delete(resources.getResourceWrite().index() + "/" + resources.getResourceWrite().type() + "/_query?q=*");
                } else {
                    client.delete(resources.getResourceWrite().index() + "/_query?q=*");
                }
            } catch (EsHadoopInvalidRequest ehir) {
                log.info("Skipping delete by query as the plugin is not installed...");
            }

            // in ES 2.0 and higher this means scrolling and deleting the docs by hand...
            // do a scroll-scan without source

            // as this is a delete, there's not much value in making this configurable so we just go for some sane/safe defaults
            // 10m scroll timeout
            // 250 results

            int batchSize = 500;
            StringBuilder sb = new StringBuilder(resources.getResourceWrite().index());
            if (resources.getResourceWrite().isTyped()) {
                sb.append('/').append(resources.getResourceWrite().type());
            }
            sb.append("/_search?scroll=10m&_source=false&size=");
            sb.append(batchSize);
            if (client.clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X)) {
                sb.append("&sort=_doc");
            }
            else {
                sb.append("&search_type=scan");
            }
            String scanQuery = sb.toString();
            ScrollReader scrollReader = new ScrollReader(
                    ScrollReaderConfigBuilder.builder(new JdkValueReader(), settings)
                            .setReadMetadata(true)
                            .setMetadataName("_metadata")
                            .setReturnRawJson(false)
                            .setIgnoreUnmappedFields(false)
                            .setIncludeFields(Collections.<String>emptyList())
                            .setExcludeFields(Collections.<String>emptyList())
                            .setIncludeArrayFields(Collections.<String>emptyList())
                            .setErrorHandlerLoader(new AbortOnlyHandlerLoader()) // Only abort since this is internal
            );

            // start iterating
            ScrollQuery sq = scanAll(scanQuery, null, scrollReader);
            try {
                BytesArray entry = new BytesArray(0);

                // delete each retrieved batch, keep routing in mind:
                String baseFormat = "{\"delete\":{\"_id\":\"%s\"}}\n";
                String routedFormat;
                if (client.clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_7_X)) {
                    routedFormat = "{\"delete\":{\"_id\":\"%s\", \"routing\":\"%s\"}}\n";
                } else {
                    routedFormat = "{\"delete\":{\"_id\":\"%s\", \"_routing\":\"%s\"}}\n";
                }

                boolean hasData = false;
                while (sq.hasNext()) {
                    hasData = true;
                    entry.reset();
                    Object[] kv = sq.next();
                    @SuppressWarnings("unchecked")
                    Map<String, Object> value = (Map<String, Object>) kv[1];
                    @SuppressWarnings("unchecked")
                    Map<String, Object> metadata = (Map<String, Object>) value.get("_metadata");
                    String routing = (String) metadata.get("_routing");
                    String encodedId = StringUtils.jsonEncoding((String) kv[0]);
                    if (StringUtils.hasText(routing)) {
                        String encodedRouting = StringUtils.jsonEncoding(routing);
                        entry.add(StringUtils.toUTF(String.format(routedFormat, encodedId, encodedRouting)));
                    } else {
                        entry.add(StringUtils.toUTF(String.format(baseFormat, encodedId)));
                    }
                    writeProcessedToIndex(entry);
                }

                if (hasData) {
                    flush();
                    // once done force a refresh
                    client.refresh(resources.getResourceWrite());
                }
            } finally {
                stats.aggregate(sq.stats());
                sq.close();
            }
        }
    }