public static Pipeline createPipeline()

in pipelines/etl_integration_java/src/main/java/com/google/cloud/dataflow/solutions/ETLIntegration.java [43:65]


    public static Pipeline createPipeline(SpannerPublisherOptions options) {
        String projectId = options.as(DataflowPipelineOptions.class).getProject();

        Pipeline p = Pipeline.create(options);

        PCollection<PubsubMessage> msgs =
                p.apply("Read topic", PubsubIO.readMessages().fromTopic(options.getPubsubTopic()));

        TaxiEventProcessor.ParsingOutput<TaxiObjects.TaxiEvent> parsed =
                msgs.apply("Parse", TaxiEventProcessor.FromPubsubMessage.parse());
        PCollection<TaxiObjects.TaxiEvent> taxiEvents = parsed.getParsedData();

        taxiEvents.apply(
                "Write",
                Spanner.Writer.builder()
                        .projectId(projectId)
                        .instanceId(options.getSpannerInstance())
                        .databaseId(options.getSpannerDatabase())
                        .tableName(options.getSpannerTable())
                        .build());

        return p;
    }