in controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java [169:223]
private void acceptLiveAgents() {
ConsulCache.Listener<String, Value> liveAgentCacheListener = newValues -> newValues.forEach((agentId, value) -> {
try {
Optional<String> session = mftConsulClient.getKvClient().getSession(value.getKey());
if (session.isPresent()) {
String sessionId = session.get();
logger.info("Agent connected in path {} agent id {} session {}", value.getKey(), agentId, sessionId);
List<Value> scheduledTransfers = mftConsulClient.getKvClient().getValues(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentId);
for (Value v: scheduledTransfers) {
logger.info("Found scheduled key {}", v.getKey());
try {
// Key = AGENTS_SCHEDULED_PATH agent id / session id / transfer id
String[] parts = v.getKey().split("/");
// Make sure right amount of data is available
if (parts.length == MFTConsulClient.AGENTS_SCHEDULED_PATH.split("/").length + 3) {
String scheduledSession = parts[parts.length - 2];
String scheduledTransfer = parts[parts.length - 1];
logger.info("Scheduled session {} transfer {}", scheduledSession, scheduledTransfer);
if (!scheduledSession.equals(sessionId)) {
logger.info("Old transfer session found. Re scheduling to agent {}", agentId);
AgentTransferRequest transferRequest = AgentTransferRequest
.newBuilder().mergeFrom(v.getValueAsBytes().get()).build();
mftConsulClient.commandTransferToAgent(agentId, scheduledTransfer, transferRequest);
// Delete the key as it is already processed
mftConsulClient.getKvClient().deleteKey(v.getKey());
} else {
logger.info("Session {} is already active so skipping scheduled transfer", scheduledSession);
}
} else {
logger.warn("Invalid schedule key {}", v.getKey());
}
} catch (Exception e) {
logger.error("Failed to process schedule key {}", v.getKey());
}
}
}
} catch (Exception e) {
logger.error("Errored while processing live agent cache key {}", agentId, e);
} finally {
}
});
liveAgentCache.addListener(liveAgentCacheListener);
liveAgentCache.start();
}