in order-book-pipeline/src/main/java/com/google/cloud/dataflow/orderbook/OrderBookProducer.java [60:76]
public OrderedEventProcessorResult<SessionContractKey, MarketDepth, OrderBookEvent> expand(
PCollection<OrderBookEvent> input) {
input.getPipeline().getCoderRegistry()
.registerCoderForClass(SessionContractKey.class, SessionContractKeyCoder.of());
OrderedProcessingHandler handler = new OrderBookOrderedProcessingHandler(depth, withTrade);
handler.setProduceStatusUpdateOnEveryEvent(produceStatusUpdatesOnEveryEvent);
handler.setMaxOutputElementsPerBundle(maxElementsPerBundle);
handler.setStatusUpdateFrequency(statusUpdateFrequency);
OrderedEventProcessor<OrderBookEvent, SessionContractKey, MarketDepth, OrderBookMutableState> orderedProcessor =
OrderedEventProcessor.create(handler);
return input
.apply("Convert to KV", ParDo.of(new ConvertOrderBookEventToKV()))
.apply("Produce OrderBook", orderedProcessor);
}