in controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentOrchestrator.java [110:201]
public void init() {
scheduler.scheduleWithFixedDelay(() -> {
try {
launchedSpawnersMap.forEach((key, metadata) -> {
if (metadata.spawner.getLaunchState().isDone()) {
metadata.transferInfos.forEach((agentTransferId, transferInfo) -> {
try {
String agentId = metadata.spawner.getLaunchState().get();
List<String> liveAgentIds = transferDispatcher.getMftConsulClient().getLiveAgentIds();
if (liveAgentIds.stream().noneMatch(id -> id.equals(agentId))) {
throw new Exception("Agent was not registered even though the agent maked as up");
}
this.transferDispatcher.submitTransferToAgent(Collections.singletonList(agentId),
transferInfo.transferId,
transferInfo.transferApiRequest,
transferInfo.agentTransferRequest,
transferInfo.consulKey);
metadata.lastScannedTime = System.currentTimeMillis();
} catch (Exception e) {
logger.error("Failed to launch agent for agent transfer id {} and transfer {}",
agentTransferId, transferInfo.transferId, e);
try {
transferDispatcher.getMftConsulClient().saveTransferState(transferInfo.transferId, null, new TransferState()
.setUpdateTimeMils(System.currentTimeMillis())
.setState("FAILED").setPercentage(0)
.setPublisher("controller")
.setDescription("Failed to launch the agent. " + ExceptionUtils.getRootCauseMessage(e)));
} catch (Exception e2) {
logger.error("Failed to submit transfer fail error for transfer id {}", transferInfo.transferId, e2);
}
logger.info("Removing consul key {}", transferInfo.consulKey);
transferDispatcher.getMftConsulClient().getKvClient().deleteKey(transferInfo.consulKey);
logger.info("Terminating the spawner");
metadata.spawner.terminate(true);
launchedSpawnersMap.remove(key);
} finally {
metadata.transferInfos.remove(agentTransferId);
}
});
}
if ((System.currentTimeMillis() - metadata.lastScannedTime) > SPAWNER_MAX_IDLE_SECONDS * 1000) {
if (metadata.transferInfos.size() > 0) {
return;
}
logger.info("No transfer infos for spawner {}. Checking for termination", key);
try {
String agentId = null;
try {
agentId = metadata.spawner.getLaunchState().get();
} catch (Exception e) {
logger.info("Killing spawner with key {} as the agent is not responding and inactive for {} seconds",
key, SPAWNER_MAX_IDLE_SECONDS);
metadata.spawner.terminate(false);
launchedSpawnersMap.remove(key);
return;
}
List<String> pendingAgentTransfers = transferDispatcher.getMftConsulClient().listPendingAgentTransfers(agentId);
if (pendingAgentTransfers.isEmpty()) {
int totalFilesInProgress = transferDispatcher.getMftConsulClient().getEndpointHookCountForAgent(agentId);
if (totalFilesInProgress == 0) {
logger.info("Killing spawner with key {} as all files were transferred and the agent" +
" is inactive for {} seconds",
key, SPAWNER_MAX_IDLE_SECONDS);
metadata.spawner.terminate(false);
launchedSpawnersMap.remove(key);
}
}
} catch (Exception e) {
logger.error("Failed while fetching the endpoint count for agent", e);
}
}
});
} catch (Exception e) {
// Just to keep the thread running
logger.error("Some error occurred while processing spawners map", e);
}
}, 3, 5, TimeUnit.SECONDS);
}