public OrderedEventProcessorResult expand()

in order-book-pipeline/src/main/java/com/google/cloud/dataflow/orderbook/OrderBookProducer.java [60:76]


  public OrderedEventProcessorResult<SessionContractKey, MarketDepth, OrderBookEvent> expand(
      PCollection<OrderBookEvent> input) {
    input.getPipeline().getCoderRegistry()
        .registerCoderForClass(SessionContractKey.class, SessionContractKeyCoder.of());

    OrderedProcessingHandler handler = new OrderBookOrderedProcessingHandler(depth, withTrade);
    handler.setProduceStatusUpdateOnEveryEvent(produceStatusUpdatesOnEveryEvent);
    handler.setMaxOutputElementsPerBundle(maxElementsPerBundle);
    handler.setStatusUpdateFrequency(statusUpdateFrequency);

    OrderedEventProcessor<OrderBookEvent, SessionContractKey, MarketDepth, OrderBookMutableState> orderedProcessor =
        OrderedEventProcessor.create(handler);

    return input
        .apply("Convert to KV", ParDo.of(new ConvertOrderBookEventToKV()))
        .apply("Produce OrderBook", orderedProcessor);
  }