in agent/service/src/main/java/org/apache/airavata/mft/agent/ingress/ConsulIngressHandler.java [97:134]
private void acceptTransferRequests() {
transferCacheListener = newValues -> {
newValues.values().forEach(value -> {
Optional<byte[]> decodedValue = value.getValueAsBytes();
String[] partsOfKey = value.getKey().split("/");
String agentTransferRequestId = partsOfKey[partsOfKey.length - 1];
String transferId = partsOfKey[partsOfKey.length - 2];
decodedValue.ifPresent(reqBytes -> {
mftConsulClient.getKvClient().deleteKey(value.getKey());
AgentTransferRequest.Builder builder = null;
try {
builder = AgentTransferRequest.newBuilder().mergeFrom(reqBytes);
} catch (InvalidProtocolBufferException e) {
logger.error("Failed to merge transfer request {} for transfer {} from bytes", agentTransferRequestId, transferId, e);
return;
}
AgentTransferRequest request = builder.build();
transferOrchestrator.submitTransferToProcess(transferId, request, transportCache,
AgentUtil.throwingBiConsumerWrapper((endPointPath, st) -> {
mftConsulClient.submitFileTransferStateToProcess(transferId, request.getRequestId(), endPointPath, agentId, st.setPublisher(agentId));
}),
AgentUtil.throwingBiConsumerWrapper((endpointPath, create) -> {
if (create) {
mftConsulClient.createEndpointHookForAgent(agentId, session, transferId, agentTransferRequestId, endpointPath);
} else {
mftConsulClient.deleteEndpointHookForAgent(agentId, session, transferId, agentTransferRequestId, endpointPath);
}
}));
});
});
};
transferMessageCache.addListener(transferCacheListener);
transferMessageCache.start();
}