# Pub/Sub Dataflow Fixed Window Count

In [None]:
! pip install apache_beam apache-beam[gcp] --quiet

In [None]:
# Define your Google Cloud project ID and Pub/Sub subscription name
project_id = 'dsl-dar'
subscription_name = 'clicks-dataflow-subscription'

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.trigger import AfterProcessingTime, AccumulationMode, AfterWatermark
import json

class ParseMessage(beam.DoFn):
    def process(self, element):
        parsed_element = json.loads(element)
        yield parsed_element

class ExtractRoute(beam.DoFn):
    def process(self, element):
        route = element.get('route')
        yield route

class FormatOutput(beam.DoFn):
    def process(self, element):
        route, count = element
        yield f'Route: {route}, Clicks: {count}'


# Define your pipeline options
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
options.view_as(StandardOptions).runner = 'DirectRunner'  # Use DataflowRunner to run on Google Cloud Dataflow

# Construct the full subscription path
subscription = f'projects/{project_id}/subscriptions/{subscription_name}'

# Create the pipeline
p = beam.Pipeline(options=options)

messages = (
    p
    | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)
    | 'DecodeMessage' >> beam.Map(lambda x: x.decode('utf-8'))
    | 'ParseJSON' >> beam.ParDo(ParseMessage())
    | 'ExtractRoute' >> beam.ParDo(ExtractRoute())
    | 'WindowIntoFixedWindows' >> beam.WindowInto(
            beam.window.FixedWindows(60)
        )
    | 'CountPerRoute' >> beam.combiners.Count.PerElement()
    | 'FormatOutput' >> beam.ParDo(FormatOutput())
    | 'PrintMessage' >> beam.Map(print)
)

# Run the pipeline
result = p.run()
result.wait_until_finish()