in serverless-workflow-examples/serverless-workflow-loanbroker-showcase/aggregator/src/main/java/org/acme/serverless/loanbroker/aggregator/QuotesAggregatorRoute.java [50:66]
public void configure() {
getContext()
.getTypeConverterRegistry()
.addTypeConverter(CloudEvent.class, AggregationResponse.class,
cloudEventsConverter);
from("direct:aggregator")
.routeId("quotes-aggregator")
.aggregate(header(IntegrationConstants.KOGITO_FLOW_ID_HEADER), new QuotesAggregationStrategy())
.completionInterval(3000)
.process(quotesRepository)
.setBody(AggregationResponse::fromExchange)
.convertBodyTo(CloudEvent.class)
.marshal(cloudEventDataFormat)
.setHeader(Exchange.CONTENT_TYPE, constant("application/cloudevents+json"))
.to(replyTo + "?copyHeaders=false");
}