in agent/service/src/main/java/org/apache/airavata/mft/agent/ingress/ConsulIngressHandler.java [158:215]
private boolean connectAgent() throws MFTConsulClientException {
final ImmutableSession session = ImmutableSession.builder()
.name(agentId)
.behavior("delete")
.ttl(sessionTTLSeconds + "s").build();
final SessionCreatedResponse sessResp = mftConsulClient.getSessionClient().createSession(session);
final String lockPath = MFTConsulClient.LIVE_AGENTS_PATH + agentId;
boolean acquired = mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
if (acquired) {
this.session = sessResp.getId();
sessionRenewPool.scheduleAtFixedRate(() -> {
try {
mftConsulClient.getSessionClient().renewSession(sessResp.getId());
} catch (ConsulException e) {
if (e.getCode() == 404) {
logger.error("Can not renew session as it is expired");
destroy();
}
logger.warn("Errored while renewing the session", e);
try {
boolean status = mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
if (!status) {
logger.error("Can not renew session as it is expired");
destroy();
}
} catch (Exception ex) {
logger.error("Can not renew session as it is expired");
destroy();
}
} catch (Exception e) {
try {
boolean status = mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
if (!status) {
logger.error("Can not renew session as it is expired");
destroy();
}
} catch (Exception ex) {
logger.error("Can not renew session as it is expired");
destroy();
}
}
}, sessionRenewSeconds, sessionRenewSeconds, TimeUnit.SECONDS);
this.mftConsulClient.registerAgent(new AgentInfo()
.setId(agentId)
.setHost(agentHost)
.setUser(agentUser)
.setSessionId(this.session)
.setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
.setLocalStorages(new ArrayList<>()));
}
logger.info("Acquired lock " + acquired);
return acquired;
}