public void process()

in serverless-workflow-examples/serverless-workflow-loanbroker-showcase/aggregator/src/main/java/org/acme/serverless/loanbroker/aggregator/InMemoryQuotesRepositoryProcessor.java [43:67]


    public void process(Exchange exchange) throws Exception {
        if (exchange != null) {
            final String instanceId = (String) exchange.getIn().getHeader(IntegrationConstants.KOGITO_FLOW_ID_HEADER);

            if (instanceId != null && !instanceId.isEmpty()) {
                final Integer quoteCount = (Integer) exchange.getIn()
                        .getHeader(QuotesAggregationStrategy.HEADER_QUOTES_COUNT);

                quotesMap.computeIfAbsent(instanceId, k -> {
                    final List<BankQuote> quotes = (List<BankQuote>) exchange.getIn().getBody();
                    if (quotes == null || quotes.isEmpty()) {
                        return null;
                    }
                    return quotes;
                });
                LOGGER.info("Aggregation for workflow instance {} ended with {} quotes", instanceId, quoteCount);
            } else {
                throw new IllegalStateException(
                        String.format("Received an exchange with empty instance id. '%s' header not present",
                                IntegrationConstants.KOGITO_FLOW_ID_HEADER));
            }

        }

    }