private void run()

in order-book-pipeline/src/main/java/com/google/cloud/dataflow/orderbook/OrderBookProcessingPipeline.java [100:129]


  private void run(Options options) {
    Pipeline pipeline = Pipeline.create(options);

    PCollection<OrderBookEvent> orderBookEvents = pipeline.apply("Read from PubSub",
        PubsubIO.readProtos(
            OrderBookEvent.class).fromSubscription(options.getSubscription()));

    OrderedEventProcessorResult<SessionContractKey, MarketDepth, OrderBookEvent> processingResults = orderBookEvents
        .apply("Build Order Book", new OrderBookProducer(
            options.getOrderBookDepth(),
            options.isIncludeLastTrade(),
            options.getMaxOutputElementsPerBundle()).produceStatusUpdatesOnEveryEvent());

    storeInBigQuery(processingResults.output(), options.getMarketDepthTable(), "Market Depth",
        new MarketDepthToTableRowConverter());

    if (options.getProcessingStatusTable() != null) {
      storeInBigQuery(processingResults.processingStatuses(), options.getProcessingStatusTable(),
          "Processing Status",
          new ProcessingStatusToTableRowConverter());
    }

    if (options.getOrderEventTable() != null) {
      storeInBigQuery(orderBookEvents, options.getOrderEventTable(),
          "Order Event",
          new OrderBookEventToTableRowConverter());
    }

    pipeline.run();
  }