private void sweepIndex()

in src/main/java/org/opensearch/jobscheduler/sweeper/JobSweeper.java [316:354]


    private void sweepIndex(String indexName) {
        ClusterState clusterState = this.clusterService.state();
        if (!clusterState.routingTable().hasIndex(indexName)) {
            // deschedule jobs for this index
            for (ShardId shardId : this.sweptJobs.keySet()) {
                if (shardId.getIndexName().equals(indexName) && this.sweptJobs.containsKey(shardId)) {
                    log.info("Descheduling jobs, shard {} index {} as the index is removed.", shardId.getId(), indexName);
                    this.scheduler.bulkDeschedule(shardId.getIndexName(), this.sweptJobs.get(shardId).keySet());
                }
            }
            return;
        }
        String localNodeId = clusterState.getNodes().getLocalNodeId();
        Map<ShardId, List<ShardRouting>> localShards = this.getLocalShards(clusterState, localNodeId, indexName);

        // deschedule jobs in removed shards
        Iterator<Map.Entry<ShardId, ConcurrentHashMap<String, JobDocVersion>>> sweptJobIter = this.sweptJobs.entrySet().iterator();
        while (sweptJobIter.hasNext()) {
            Map.Entry<ShardId, ConcurrentHashMap<String, JobDocVersion>> entry = sweptJobIter.next();
            if (entry.getKey().getIndexName().equals(indexName) && !localShards.containsKey(entry.getKey())) {
                log.info("Descheduling jobs of shard {} index {} as the shard is removed from this node.",
                        entry.getKey().getId(), indexName);
                //shard is removed, deschedule jobs of this shard
                this.scheduler.bulkDeschedule(indexName, entry.getValue().keySet());
                sweptJobIter.remove();
            }
        }

        // sweep each local shard
        for (Map.Entry<ShardId, List<ShardRouting>> shard : localShards.entrySet()) {
            try {
                List<ShardRouting> shardRoutingList = shard.getValue();
                List<String> shardNodeIds = shardRoutingList.stream().map(ShardRouting::currentNodeId).collect(Collectors.toList());
                sweepShard(shard.getKey(), new ShardNodes(localNodeId, shardNodeIds), null);
            } catch (Exception e) {
                log.info("Error while sweeping shard {}, error message: {}", shard.getKey(), e.getMessage());
            }
        }
    }