in mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java [376:471]
public void delete() {
if (client.clusterInfo.getMajorVersion().on(EsMajorVersion.V_1_X)) {
// ES 1.x - delete as usual
// Delete just the mapping
client.delete(resources.getResourceWrite().index() + "/" + resources.getResourceWrite().type());
}
else {
// try first a blind delete by query (since the plugin might be installed)
try {
if (resources.getResourceWrite().isTyped()) {
client.delete(resources.getResourceWrite().index() + "/" + resources.getResourceWrite().type() + "/_query?q=*");
} else {
client.delete(resources.getResourceWrite().index() + "/_query?q=*");
}
} catch (EsHadoopInvalidRequest ehir) {
log.info("Skipping delete by query as the plugin is not installed...");
}
// in ES 2.0 and higher this means scrolling and deleting the docs by hand...
// do a scroll-scan without source
// as this is a delete, there's not much value in making this configurable so we just go for some sane/safe defaults
// 10m scroll timeout
// 250 results
int batchSize = 500;
StringBuilder sb = new StringBuilder(resources.getResourceWrite().index());
if (resources.getResourceWrite().isTyped()) {
sb.append('/').append(resources.getResourceWrite().type());
}
sb.append("/_search?scroll=10m&_source=false&size=");
sb.append(batchSize);
if (client.clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X)) {
sb.append("&sort=_doc");
}
else {
sb.append("&search_type=scan");
}
String scanQuery = sb.toString();
ScrollReader scrollReader = new ScrollReader(
ScrollReaderConfigBuilder.builder(new JdkValueReader(), settings)
.setReadMetadata(true)
.setMetadataName("_metadata")
.setReturnRawJson(false)
.setIgnoreUnmappedFields(false)
.setIncludeFields(Collections.<String>emptyList())
.setExcludeFields(Collections.<String>emptyList())
.setIncludeArrayFields(Collections.<String>emptyList())
.setErrorHandlerLoader(new AbortOnlyHandlerLoader()) // Only abort since this is internal
);
// start iterating
ScrollQuery sq = scanAll(scanQuery, null, scrollReader);
try {
BytesArray entry = new BytesArray(0);
// delete each retrieved batch, keep routing in mind:
String baseFormat = "{\"delete\":{\"_id\":\"%s\"}}\n";
String routedFormat;
if (client.clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_7_X)) {
routedFormat = "{\"delete\":{\"_id\":\"%s\", \"routing\":\"%s\"}}\n";
} else {
routedFormat = "{\"delete\":{\"_id\":\"%s\", \"_routing\":\"%s\"}}\n";
}
boolean hasData = false;
while (sq.hasNext()) {
hasData = true;
entry.reset();
Object[] kv = sq.next();
@SuppressWarnings("unchecked")
Map<String, Object> value = (Map<String, Object>) kv[1];
@SuppressWarnings("unchecked")
Map<String, Object> metadata = (Map<String, Object>) value.get("_metadata");
String routing = (String) metadata.get("_routing");
String encodedId = StringUtils.jsonEncoding((String) kv[0]);
if (StringUtils.hasText(routing)) {
String encodedRouting = StringUtils.jsonEncoding(routing);
entry.add(StringUtils.toUTF(String.format(routedFormat, encodedId, encodedRouting)));
} else {
entry.add(StringUtils.toUTF(String.format(baseFormat, encodedId)));
}
writeProcessedToIndex(entry);
}
if (hasData) {
flush();
// once done force a refresh
client.refresh(resources.getResourceWrite());
}
} finally {
stats.aggregate(sq.stats());
sq.close();
}
}
}