in src/main/java/org/opensearch/jobscheduler/sweeper/JobSweeper.java [316:354]
private void sweepIndex(String indexName) {
ClusterState clusterState = this.clusterService.state();
if (!clusterState.routingTable().hasIndex(indexName)) {
// deschedule jobs for this index
for (ShardId shardId : this.sweptJobs.keySet()) {
if (shardId.getIndexName().equals(indexName) && this.sweptJobs.containsKey(shardId)) {
log.info("Descheduling jobs, shard {} index {} as the index is removed.", shardId.getId(), indexName);
this.scheduler.bulkDeschedule(shardId.getIndexName(), this.sweptJobs.get(shardId).keySet());
}
}
return;
}
String localNodeId = clusterState.getNodes().getLocalNodeId();
Map<ShardId, List<ShardRouting>> localShards = this.getLocalShards(clusterState, localNodeId, indexName);
// deschedule jobs in removed shards
Iterator<Map.Entry<ShardId, ConcurrentHashMap<String, JobDocVersion>>> sweptJobIter = this.sweptJobs.entrySet().iterator();
while (sweptJobIter.hasNext()) {
Map.Entry<ShardId, ConcurrentHashMap<String, JobDocVersion>> entry = sweptJobIter.next();
if (entry.getKey().getIndexName().equals(indexName) && !localShards.containsKey(entry.getKey())) {
log.info("Descheduling jobs of shard {} index {} as the shard is removed from this node.",
entry.getKey().getId(), indexName);
//shard is removed, deschedule jobs of this shard
this.scheduler.bulkDeschedule(indexName, entry.getValue().keySet());
sweptJobIter.remove();
}
}
// sweep each local shard
for (Map.Entry<ShardId, List<ShardRouting>> shard : localShards.entrySet()) {
try {
List<ShardRouting> shardRoutingList = shard.getValue();
List<String> shardNodeIds = shardRoutingList.stream().map(ShardRouting::currentNodeId).collect(Collectors.toList());
sweepShard(shard.getKey(), new ShardNodes(localNodeId, shardNodeIds), null);
} catch (Exception e) {
log.info("Error while sweeping shard {}, error message: {}", shard.getKey(), e.getMessage());
}
}
}