public void init()

in controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentOrchestrator.java [110:201]


    public void init() {
        scheduler.scheduleWithFixedDelay(() -> {

            try {
                launchedSpawnersMap.forEach((key, metadata) -> {
                    if (metadata.spawner.getLaunchState().isDone()) {
                        metadata.transferInfos.forEach((agentTransferId, transferInfo) -> {
                            try {
                                String agentId = metadata.spawner.getLaunchState().get();
                                List<String> liveAgentIds = transferDispatcher.getMftConsulClient().getLiveAgentIds();

                                if (liveAgentIds.stream().noneMatch(id -> id.equals(agentId))) {
                                    throw new Exception("Agent was not registered even though the agent maked as up");
                                }

                                this.transferDispatcher.submitTransferToAgent(Collections.singletonList(agentId),
                                        transferInfo.transferId,
                                        transferInfo.transferApiRequest,
                                        transferInfo.agentTransferRequest,
                                        transferInfo.consulKey);

                                metadata.lastScannedTime = System.currentTimeMillis();
                            } catch (Exception e) {
                                logger.error("Failed to launch agent for agent transfer id {} and transfer {}",
                                        agentTransferId, transferInfo.transferId, e);
                                try {
                                    transferDispatcher.getMftConsulClient().saveTransferState(transferInfo.transferId, null, new TransferState()
                                            .setUpdateTimeMils(System.currentTimeMillis())
                                            .setState("FAILED").setPercentage(0)
                                            .setPublisher("controller")
                                            .setDescription("Failed to launch the agent. " + ExceptionUtils.getRootCauseMessage(e)));
                                } catch (Exception e2) {
                                    logger.error("Failed to submit transfer fail error for transfer id {}", transferInfo.transferId, e2);
                                }

                                logger.info("Removing consul key {}", transferInfo.consulKey);
                                transferDispatcher.getMftConsulClient().getKvClient().deleteKey(transferInfo.consulKey);
                                logger.info("Terminating the spawner");
                                metadata.spawner.terminate(true);
                                launchedSpawnersMap.remove(key);
                            } finally {
                                metadata.transferInfos.remove(agentTransferId);
                            }
                        });
                    }

                    if ((System.currentTimeMillis() - metadata.lastScannedTime) >  SPAWNER_MAX_IDLE_SECONDS * 1000) {

                        if (metadata.transferInfos.size() > 0) {
                            return;
                        }

                        logger.info("No transfer infos for spawner {}. Checking for termination", key);

                        try {
                            String agentId = null;
                            try {
                                agentId = metadata.spawner.getLaunchState().get();

                            } catch (Exception e) {
                                logger.info("Killing spawner with key {} as the agent is not responding and inactive for {} seconds",
                                    key, SPAWNER_MAX_IDLE_SECONDS);
                                metadata.spawner.terminate(false);
                                launchedSpawnersMap.remove(key);
                                return;
                            }

                            List<String> pendingAgentTransfers = transferDispatcher.getMftConsulClient().listPendingAgentTransfers(agentId);
                            if (pendingAgentTransfers.isEmpty()) {
                                int totalFilesInProgress = transferDispatcher.getMftConsulClient().getEndpointHookCountForAgent(agentId);
                                if (totalFilesInProgress == 0) {
                                    logger.info("Killing spawner with key {} as all files were transferred and the agent" +
                                                    " is inactive for {} seconds",
                                            key, SPAWNER_MAX_IDLE_SECONDS);
                                    metadata.spawner.terminate(false);
                                    launchedSpawnersMap.remove(key);
                                }
                            }

                        } catch (Exception e) {
                            logger.error("Failed while fetching the endpoint count for agent", e);
                        }
                    }
                });

            } catch (Exception e) {
                // Just to keep the thread running
                logger.error("Some error occurred while processing spawners map", e);
            }

        }, 3, 5, TimeUnit.SECONDS);
    }