private void acceptLiveAgents()

in controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java [169:223]


    private void acceptLiveAgents() {
        ConsulCache.Listener<String, Value> liveAgentCacheListener = newValues -> newValues.forEach((agentId, value) -> {
            try {
                Optional<String> session = mftConsulClient.getKvClient().getSession(value.getKey());
                if (session.isPresent()) {
                    String sessionId = session.get();
                    logger.info("Agent connected in path {} agent id {} session {}", value.getKey(), agentId, sessionId);

                    List<Value> scheduledTransfers = mftConsulClient.getKvClient().getValues(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentId);
                    for (Value v: scheduledTransfers) {
                        logger.info("Found scheduled key {}", v.getKey());

                        try {
                            // Key = AGENTS_SCHEDULED_PATH agent id / session id / transfer id
                            String[] parts = v.getKey().split("/");
                            // Make sure right amount of data is available
                            if (parts.length == MFTConsulClient.AGENTS_SCHEDULED_PATH.split("/").length + 3) {

                                String scheduledSession = parts[parts.length - 2];
                                String scheduledTransfer = parts[parts.length - 1];

                                logger.info("Scheduled session {} transfer {}", scheduledSession, scheduledTransfer);

                                if (!scheduledSession.equals(sessionId)) {
                                    logger.info("Old transfer session found. Re scheduling to agent {}", agentId);
                                    AgentTransferRequest transferRequest = AgentTransferRequest
                                            .newBuilder().mergeFrom(v.getValueAsBytes().get()).build();
                                    mftConsulClient.commandTransferToAgent(agentId, scheduledTransfer, transferRequest);

                                    // Delete the key as it is already processed
                                    mftConsulClient.getKvClient().deleteKey(v.getKey());

                                } else {
                                    logger.info("Session {} is already active so skipping scheduled transfer", scheduledSession);
                                }

                            } else {
                                logger.warn("Invalid schedule key {}", v.getKey());
                            }

                        } catch (Exception e) {
                            logger.error("Failed to process schedule key {}", v.getKey());
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("Errored while processing live agent cache key {}", agentId, e);
            } finally {

            }
        });

        liveAgentCache.addListener(liveAgentCacheListener);
        liveAgentCache.start();
    }