private void sweepShard()

in src/main/java/org/opensearch/jobscheduler/sweeper/JobSweeper.java [356:400]


    private void sweepShard(ShardId shardId, ShardNodes shardNodes, String startAfter) {
        ConcurrentHashMap<String, JobDocVersion> currentJobs = this.sweptJobs.containsKey(shardId) ?
                this.sweptJobs.get(shardId) : new ConcurrentHashMap<>();

        for (String jobId : currentJobs.keySet()) {
            if (!shardNodes.isOwningNode(jobId)) {
                this.scheduler.deschedule(shardId.getIndexName(), jobId);
                currentJobs.remove(jobId);
            }
        }

        String searchAfter = startAfter == null ? "" : startAfter;
        while (searchAfter != null) {
            SearchRequest jobSearchRequest = new SearchRequest()
                    .indices(shardId.getIndexName())
                    .preference("_shards:" + shardId.id() + "|_only_local")
                    .source(new SearchSourceBuilder()
                            .version(true)
                            .seqNoAndPrimaryTerm(true)
                            .sort(new FieldSortBuilder("_id").unmappedType("keyword").missing("_last"))
                            .searchAfter(new String[]{searchAfter})
                            .size(this.sweepPageMaxSize)
                            .query(QueryBuilders.matchAllQuery()));

            SearchResponse response = this.retry((searchRequest) -> this.client.search(searchRequest),
                    jobSearchRequest, this.sweepSearchBackoff).actionGet(this.sweepSearchTimeout);
            if (response.status() != RestStatus.OK) {
                log.error("Error sweeping shard {}, failed querying jobs on this shard", shardId);
                return;
            }
            for (SearchHit hit : response.getHits()) {
                String jobId = hit.getId();
                if (shardNodes.isOwningNode(jobId)) {
                    this.sweep(shardId, jobId, hit.getSourceRef(), new JobDocVersion(hit.getPrimaryTerm(), hit.getSeqNo(),
                            hit.getVersion()));
                }
            }
            if (response.getHits() == null || response.getHits().getHits().length < 1) {
                searchAfter = null;
            } else {
                SearchHit lastHit = response.getHits().getHits()[response.getHits().getHits().length - 1];
                searchAfter = lastHit.getId();
            }
        }
    }