in src/main/java/org/opensearch/jobscheduler/sweeper/JobSweeper.java [356:400]
private void sweepShard(ShardId shardId, ShardNodes shardNodes, String startAfter) {
ConcurrentHashMap<String, JobDocVersion> currentJobs = this.sweptJobs.containsKey(shardId) ?
this.sweptJobs.get(shardId) : new ConcurrentHashMap<>();
for (String jobId : currentJobs.keySet()) {
if (!shardNodes.isOwningNode(jobId)) {
this.scheduler.deschedule(shardId.getIndexName(), jobId);
currentJobs.remove(jobId);
}
}
String searchAfter = startAfter == null ? "" : startAfter;
while (searchAfter != null) {
SearchRequest jobSearchRequest = new SearchRequest()
.indices(shardId.getIndexName())
.preference("_shards:" + shardId.id() + "|_only_local")
.source(new SearchSourceBuilder()
.version(true)
.seqNoAndPrimaryTerm(true)
.sort(new FieldSortBuilder("_id").unmappedType("keyword").missing("_last"))
.searchAfter(new String[]{searchAfter})
.size(this.sweepPageMaxSize)
.query(QueryBuilders.matchAllQuery()));
SearchResponse response = this.retry((searchRequest) -> this.client.search(searchRequest),
jobSearchRequest, this.sweepSearchBackoff).actionGet(this.sweepSearchTimeout);
if (response.status() != RestStatus.OK) {
log.error("Error sweeping shard {}, failed querying jobs on this shard", shardId);
return;
}
for (SearchHit hit : response.getHits()) {
String jobId = hit.getId();
if (shardNodes.isOwningNode(jobId)) {
this.sweep(shardId, jobId, hit.getSourceRef(), new JobDocVersion(hit.getPrimaryTerm(), hit.getSeqNo(),
hit.getVersion()));
}
}
if (response.getHits() == null || response.getHits().getHits().length < 1) {
searchAfter = null;
} else {
SearchHit lastHit = response.getHits().getHits()[response.getHits().getHits().length - 1];
searchAfter = lastHit.getId();
}
}
}