in pipelines/clickstream_analytics_java/src/main/java/com/google/cloud/dataflow/solutions/clickstream_analytics/ClickstreamPubSubToBq.java [93:189]
public static void main(String[] args) {
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
final String PROJECT = options.getBqProjectId();
final String SUBSCRIPTION = options.getSubscription();
final String BQ_PROJECT = PROJECT;
final String BQ_TABLE = options.getBQTable();
final String BQ_DEADLETTER_TABLE = options.getOutputDeadletterTable();
final String BQ_DATASET = options.getBQDataset();
final String BT_INSTANCE = options.getBTInstance();
final String BT_TABLE = options.getBTTable();
final String BT_LOOKUP_KEY = options.getBtLookupKey();
PCollection<String> pubsubMessages =
p.apply("ReadPubSubData", PubsubIO.readStrings().fromSubscription(SUBSCRIPTION));
// TODO: Bigtable enrichment with pubsub data task is still pending.
// PCollection<String> enrichedMessages = pubsubMessages.apply(
// "EnrichWithBigtable",
// ParDo.of(new BigTableEnrichment(PROJECT, BT_INSTANCE, BT_TABLE,BT_LOOKUP_KEY))
// );
// Convert PubSub data to BigQuery Table Rows
PCollectionTuple results = pubsubMessages.apply("TransformJSONToBQ", JsonToTableRows.run());
// TODO: Apply analytics to windowwed data
// PCollection<TableRow> windowedTableRows =
// results
// .get(SUCCESS_TAG)
// .apply(
// "SessionWindow",
//
// Window.<TableRow>into(Sessions.withGapDuration(Duration.standardMinutes(1))));
// Write Data to BigQuery Table
WriteResult writeResult =
results.get(SUCCESS_TAG)
.apply(
"WriteSuccessfulRecordsToBQ",
BigQueryIO.writeTableRows()
.withMethod(
BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
.withWriteDisposition(WRITE_APPEND)
.withCreateDisposition(CREATE_NEVER)
.ignoreUnknownValues()
.skipInvalidRows()
.to(
(row) -> {
return new TableDestination(
String.format(
"%s:%s.%s",
BQ_PROJECT,
BQ_DATASET,
BQ_TABLE),
"BQ Table destination");
}));
// Failed insertion will be pushed to Bigquery Deadletter table *********/
writeResult
.getFailedStorageApiInserts()
.apply(
"FailedRecordToTableRow",
ParDo.of(
new DoFn<BigQueryStorageApiInsertError, TableRow>() {
@ProcessElement
public void process(
@Element BigQueryStorageApiInsertError insertError,
OutputReceiver<TableRow> outputReceiver) {
outputReceiver.output(insertError.getRow());
}
}))
.apply(
"WriteFailedRecordsToBigQuery",
BigQueryIO.writeTableRows()
.withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
.to(
(row) -> {
return new TableDestination(
String.format(
"%s:%s.%s",
BQ_PROJECT,
BQ_DATASET,
BQ_DEADLETTER_TABLE),
"BQ Dead Letter Table destination");
})
.withJsonSchema(getDeadletterTableSchemaJson())
.withCreateDisposition(
BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(
BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run();
}