public static void main()

in pipelines/clickstream_analytics_java/src/main/java/com/google/cloud/dataflow/solutions/clickstream_analytics/ClickstreamPubSubToBq.java [93:189]


    public static void main(String[] args) {

        PipelineOptionsFactory.register(MyOptions.class);
        MyOptions options =
                PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

        Pipeline p = Pipeline.create(options);

        final String PROJECT = options.getBqProjectId();
        final String SUBSCRIPTION = options.getSubscription();
        final String BQ_PROJECT = PROJECT;
        final String BQ_TABLE = options.getBQTable();
        final String BQ_DEADLETTER_TABLE = options.getOutputDeadletterTable();
        final String BQ_DATASET = options.getBQDataset();
        final String BT_INSTANCE = options.getBTInstance();
        final String BT_TABLE = options.getBTTable();
        final String BT_LOOKUP_KEY = options.getBtLookupKey();

        PCollection<String> pubsubMessages =
                p.apply("ReadPubSubData", PubsubIO.readStrings().fromSubscription(SUBSCRIPTION));

        //  TODO: Bigtable enrichment with pubsub data task is still pending.
        //  PCollection<String> enrichedMessages = pubsubMessages.apply(
        //  "EnrichWithBigtable",
        //  ParDo.of(new BigTableEnrichment(PROJECT, BT_INSTANCE, BT_TABLE,BT_LOOKUP_KEY))
        //  );

        // Convert PubSub data to BigQuery Table Rows
        PCollectionTuple results = pubsubMessages.apply("TransformJSONToBQ", JsonToTableRows.run());

        // TODO: Apply analytics to windowwed data
        // PCollection<TableRow> windowedTableRows =
        //     results
        //         .get(SUCCESS_TAG)
        //         .apply(
        //             "SessionWindow",
        //
        // Window.<TableRow>into(Sessions.withGapDuration(Duration.standardMinutes(1))));

        // Write Data to BigQuery Table
        WriteResult writeResult =
                results.get(SUCCESS_TAG)
                        .apply(
                                "WriteSuccessfulRecordsToBQ",
                                BigQueryIO.writeTableRows()
                                        .withMethod(
                                                BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
                                        .withWriteDisposition(WRITE_APPEND)
                                        .withCreateDisposition(CREATE_NEVER)
                                        .ignoreUnknownValues()
                                        .skipInvalidRows()
                                        .to(
                                                (row) -> {
                                                    return new TableDestination(
                                                            String.format(
                                                                    "%s:%s.%s",
                                                                    BQ_PROJECT,
                                                                    BQ_DATASET,
                                                                    BQ_TABLE),
                                                            "BQ Table destination");
                                                }));

        //  Failed insertion will be pushed to Bigquery Deadletter table  *********/
        writeResult
                .getFailedStorageApiInserts()
                .apply(
                        "FailedRecordToTableRow",
                        ParDo.of(
                                new DoFn<BigQueryStorageApiInsertError, TableRow>() {
                                    @ProcessElement
                                    public void process(
                                            @Element BigQueryStorageApiInsertError insertError,
                                            OutputReceiver<TableRow> outputReceiver) {
                                        outputReceiver.output(insertError.getRow());
                                    }
                                }))
                .apply(
                        "WriteFailedRecordsToBigQuery",
                        BigQueryIO.writeTableRows()
                                .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
                                .to(
                                        (row) -> {
                                            return new TableDestination(
                                                    String.format(
                                                            "%s:%s.%s",
                                                            BQ_PROJECT,
                                                            BQ_DATASET,
                                                            BQ_DEADLETTER_TABLE),
                                                    "BQ Dead Letter Table destination");
                                        })
                                .withJsonSchema(getDeadletterTableSchemaJson())
                                .withCreateDisposition(
                                        BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                                .withWriteDisposition(
                                        BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        p.run();
    }