# Pub/Sub Dataflow Sliding Window Count to BigQuery

In [None]:
! pip install apache_beam apache-beam[gcp] --quiet

In [1]:
# Define your Google Cloud project ID and Pub/Sub subscription name
project_id = 'dsl-dar'
subscription_name = 'clicks-dataflow-subscription'
dataset_id = 'pubsub_dataset'
table_id = 'clicks-per-minute'

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.trigger import AfterProcessingTime, AccumulationMode, AfterWatermark
import json
from datetime import datetime

class ParseMessage(beam.DoFn):
    def process(self, element):
        parsed_element = json.loads(element)
        yield parsed_element

class ExtractRoute(beam.DoFn):
    def process(self, element):
        route = element.get('route')
        yield route

class FormatOutput(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        route, count = element
        window_end = window.end.to_utc_datetime()
        yield {
            'Route': route,
            'Count': count,
            'Timestamp': window_end.strftime('%Y-%m-%d %H:%M:%S')
        }

# Define your pipeline options
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
options.view_as(StandardOptions).runner = 'DirectRunner'  # Use DataflowRunner to run on Google Cloud Dataflow

# Construct the full subscription path
subscription = f'projects/{project_id}/subscriptions/{subscription_name}'

# Define the BigQuery table schema
table_schema = {
    'fields': [
        {'name': 'Route', 'type': 'STRING', 'mode': 'REQUIRED'},
        {'name': 'Count', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'Timestamp', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},
    ]
}

# Create the pipeline
p = beam.Pipeline(options=options)

messages = (
    p
    | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)
    | 'DecodeMessage' >> beam.Map(lambda x: x.decode('utf-8'))
    | 'ParseJSON' >> beam.ParDo(ParseMessage())
    | 'ExtractRoute' >> beam.ParDo(ExtractRoute())
    | 'WindowIntoSlidingWindows' >> beam.WindowInto(
            beam.window.SlidingWindows(size=60, period=15)
        )
    | 'CountPerRoute' >> beam.combiners.Count.PerElement()
    | 'FormatOutput' >> beam.ParDo(FormatOutput())
    | 'WriteToBigQuery' >> WriteToBigQuery(
            table=f'{project_id}:{dataset_id}.{table_id}',
            schema=table_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )
)

# Run the pipeline
result = p.run()
result.wait_until_finish()
