private void acceptTransferRequests()

in agent/service/src/main/java/org/apache/airavata/mft/agent/ingress/ConsulIngressHandler.java [97:134]


    private void acceptTransferRequests() {

        transferCacheListener = newValues -> {
            newValues.values().forEach(value -> {
                Optional<byte[]> decodedValue = value.getValueAsBytes();

                String[] partsOfKey = value.getKey().split("/");
                String agentTransferRequestId = partsOfKey[partsOfKey.length  - 1];
                String transferId = partsOfKey[partsOfKey.length  - 2];

                decodedValue.ifPresent(reqBytes -> {
                    mftConsulClient.getKvClient().deleteKey(value.getKey());
                    AgentTransferRequest.Builder builder = null;
                    try {
                        builder = AgentTransferRequest.newBuilder().mergeFrom(reqBytes);
                    } catch (InvalidProtocolBufferException e) {
                        logger.error("Failed to merge transfer request {} for transfer {} from bytes", agentTransferRequestId, transferId, e);
                        return;
                    }

                    AgentTransferRequest request = builder.build();
                    transferOrchestrator.submitTransferToProcess(transferId, request, transportCache,
                            AgentUtil.throwingBiConsumerWrapper((endPointPath, st) -> {
                                mftConsulClient.submitFileTransferStateToProcess(transferId, request.getRequestId(), endPointPath,  agentId, st.setPublisher(agentId));
                            }),
                            AgentUtil.throwingBiConsumerWrapper((endpointPath, create) -> {
                                if (create) {
                                    mftConsulClient.createEndpointHookForAgent(agentId, session, transferId, agentTransferRequestId, endpointPath);
                                } else {
                                   mftConsulClient.deleteEndpointHookForAgent(agentId, session, transferId, agentTransferRequestId, endpointPath);
                                }
                            }));
                });
            });
        };
        transferMessageCache.addListener(transferCacheListener);
        transferMessageCache.start();
    }