in order-book-pipeline/src/main/java/com/google/cloud/dataflow/orderbook/OrderBookProcessingPipeline.java [100:129]
private void run(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<OrderBookEvent> orderBookEvents = pipeline.apply("Read from PubSub",
PubsubIO.readProtos(
OrderBookEvent.class).fromSubscription(options.getSubscription()));
OrderedEventProcessorResult<SessionContractKey, MarketDepth, OrderBookEvent> processingResults = orderBookEvents
.apply("Build Order Book", new OrderBookProducer(
options.getOrderBookDepth(),
options.isIncludeLastTrade(),
options.getMaxOutputElementsPerBundle()).produceStatusUpdatesOnEveryEvent());
storeInBigQuery(processingResults.output(), options.getMarketDepthTable(), "Market Depth",
new MarketDepthToTableRowConverter());
if (options.getProcessingStatusTable() != null) {
storeInBigQuery(processingResults.processingStatuses(), options.getProcessingStatusTable(),
"Processing Status",
new ProcessingStatusToTableRowConverter());
}
if (options.getOrderEventTable() != null) {
storeInBigQuery(orderBookEvents, options.getOrderEventTable(),
"Order Event",
new OrderBookEventToTableRowConverter());
}
pipeline.run();
}