in pipelines/etl_integration_java/src/main/java/com/google/cloud/dataflow/solutions/ETLIntegration.java [43:65]
public static Pipeline createPipeline(SpannerPublisherOptions options) {
String projectId = options.as(DataflowPipelineOptions.class).getProject();
Pipeline p = Pipeline.create(options);
PCollection<PubsubMessage> msgs =
p.apply("Read topic", PubsubIO.readMessages().fromTopic(options.getPubsubTopic()));
TaxiEventProcessor.ParsingOutput<TaxiObjects.TaxiEvent> parsed =
msgs.apply("Parse", TaxiEventProcessor.FromPubsubMessage.parse());
PCollection<TaxiObjects.TaxiEvent> taxiEvents = parsed.getParsedData();
taxiEvents.apply(
"Write",
Spanner.Writer.builder()
.projectId(projectId)
.instanceId(options.getSpannerInstance())
.databaseId(options.getSpannerDatabase())
.tableName(options.getSpannerTable())
.build());
return p;
}