in crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java [165:253]
public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
if (doMirroring && (expandDbq != CrossDcConf.ExpandDbq.NONE) && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
CloudDescriptor cloudDesc =
cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
HttpClient httpClient = cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
try (HttpSolrClient client =
new HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build()) {
String uniqueField = cmd.getReq().getSchema().getUniqueKeyField().getName();
// TODO: implement "expand without deep paging"
int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
String cursorMark = CursorMarkParams.CURSOR_MARK_START;
int cnt = 1;
boolean done = false;
while (!done) {
q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
QueryResponse rsp =
client.query(collection, q);
String nextCursorMark = rsp.getNextCursorMark();
if (log.isDebugEnabled()) {
log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, nextCursorMark, cnt,
rsp.getResults());
cnt++;
}
processDBQResults(client, collection, uniqueField, rsp);
if (cursorMark.equals(nextCursorMark)) {
done = true;
}
cursorMark = nextCursorMark;
}
} catch (SolrServerException e) {
throw new SolrException(SERVER_ERROR, e);
}
return;
}
super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
if (doMirroring) {
boolean isLeader = false;
UpdateRequest mirrorRequest = createMirrorRequest();
if (cmd.isDeleteById()) {
// deleteById requests runs once per leader, so we just submit the request from the leader shard
isLeader = isLeader(cmd.getReq(), ((DeleteUpdateCommand)cmd).getId(), null != cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
ShardParams._ROUTE_), null);
if (isLeader) {
mirrorRequest.deleteById(cmd.getId()); // strip versions from deletes
try {
requestMirroringHandler.mirror(mirrorRequest);
} catch (Exception e) {
log.error("mirror submit failed", e);
throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
}
}
if (log.isDebugEnabled())
log.debug("processDelete doMirroring={} isLeader={} cmd={}", true, isLeader, cmd);
} else {
// DBQs are sent to each shard leader, so we mirror from the original node to only mirror once
// In general there's no way to guarantee that these run identically on the mirror since there are no
// external doc versions.
// TODO: Can we actually support this considering DBQs aren't versioned.
if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
mirrorRequest.deleteByQuery(cmd.query);
try {
requestMirroringHandler.mirror(mirrorRequest);
} catch (Exception e) {
log.error("mirror submit failed", e);
throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
}
}
if (log.isDebugEnabled())
log.debug("processDelete doMirroring={} cmd={}", true, cmd);
}
}
}