dataflow/streaming-taxi-data.py (41 lines of code) (raw):
##################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###################################################################################
# Author: Wei Hsia
# TEMP_BUCKET=gs://your-gcs-bucket/dataflow-temp/
# REGION=your-region
# PROJECT=your-gcp-project
# DATASET=your-dataset
# TABLE=your-table
# OUTPUT=${PROJECT}:${DATASET}.${TABLE}
# python -m streaming-taxi-data \
# --region $REGION \
# --project $PROJECT \
# --output $OUTPUT \
# --runner DataflowRunner \
# --temp_location $TEMP_BUCKET \
# --streaming
import argparse
import logging
import re
import random
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
class add_product_id(beam.DoFn):
def process(self, element):
element['product_id'] = round(1 + random.random() * (29120 - 1))
yield element
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=True,
help='BQ Destination Table specified as: PROJECT:DATASET.TABLE or DATASET.TABLE.')
known_args, pipeline_args = parser.parse_known_args(argv)
# Pub/Sub information: https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon
input = "projects/pubsub-public-data/topics/taxirides-realtime"
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
pipeline_options.view_as(StandardOptions).streaming = True
# The pipeline will be run on exiting the with block.
p = beam.Pipeline(options=pipeline_options)
# Read the PubSub messages
messages = (
p | beam.io.ReadFromPubSub(topic=input)
.with_output_types(bytes))
# Translate to text
json_messages = messages | "Parse JSON payload" >> beam.Map(json.loads)
product_id_messages = json_messages | "Add Product ID" >> beam.ParDo(add_product_id())
product_id_messages | 'Write to Table' >> beam.io.WriteToBigQuery(
known_args.output,
schema='ride_id:STRING, point_idx:INTEGER, latitude:FLOAT, longitude:FLOAT, timestamp:TIMESTAMP, '
'meter_reading:FLOAT ,meter_increment:FLOAT, ride_status:STRING, passenger_count:INTEGER, '
'product_id:INTEGER',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()