in quests/dataflow_python/4_SQL_Batch_Analytics/lab/batch_user_traffic_SQL_pipeline.py [0:0]
def run():
# Command line arguments
parser = argparse.ArgumentParser(description='Load from Json into BigQuery')
parser.add_argument('--project',required=True, help='Specify Google Cloud project')
parser.add_argument('--region', required=True, help='Specify Google Cloud region')
parser.add_argument('--staging_location', required=True, help='Specify Cloud Storage bucket for staging')
parser.add_argument('--temp_location', required=True, help='Specify Cloud Storage bucket for temp')
parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner')
parser.add_argument('--input_path', required=True, help='Path to events.json')
parser.add_argument('--raw_table_name', required=True, help='BigQuery table for raw data')
parser.add_argument('--agg_table_name', required=True, help='BigQuery table for aggregated data')
opts, pipeline_opts = parser.parse_known_args()
# Setting up the Beam pipeline options
options = PipelineOptions(pipeline_opts, save_main_session=True)
options.view_as(GoogleCloudOptions).project = opts.project
options.view_as(GoogleCloudOptions).region = opts.region
options.view_as(GoogleCloudOptions).staging_location = opts.staging_location
options.view_as(GoogleCloudOptions).temp_location = opts.temp_location
options.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('batch-user-traffic-pipeline-sql-'
,time.time_ns())
options.view_as(StandardOptions).runner = opts.runner
input_path = opts.input_path
agg_table_name = opts.agg_table_name
raw_table_name = opts.raw_table_name
# Table schema for BigQuery
raw_table_schema = {
"fields": [
{
"name": "ip",
"type": "STRING"
},
{
"name": "user_id",
"type": "STRING"
},
{
"name": "lat",
"type": "FLOAT"
},
{
"name": "lng",
"type": "FLOAT"
},
{
"name": "timestamp",
"type": "STRING"
},
{
"name": "http_request",
"type": "STRING"
},
{
"name": "http_response",
"type": "INTEGER"
},
{
"name": "num_bytes",
"type": "INTEGER"
},
{
"name": "user_agent",
"type": "STRING"
}
]
}
# Table schema for BigQuery
agg_table_schema = {
"fields": [
{
"name": "user_id",
"type": "STRING"
},
{
"name": "page_views",
"type": "INTEGER"
},
{
"name": "total_bytes",
"type": "INTEGER"
},
{
"name": "max_bytes",
"type": "INTEGER"
},
{
"name": "min_bytes",
"type": "INTEGER"
},
]
}
query = """
#TODO: Write SQL query
"""
# Create the pipeline
p = beam.Pipeline(options=options)
logs = (p | 'ReadFromGCS' >> beam.io.ReadFromText(input_path)
| 'ParseJson' >> beam.Map(parse_json).with_output_types(CommonLog))
logs | 'RawToDict' >> beam.Map(lambda row : row._asdict())
| 'WriteRawToBQ' >> # TODO: Write Transform to write raw data to BigQuery
(logs | 'PerUserAggregations' >> # TODO: Apply SqlTransform using ZetaSQL Dialect
| 'AggToDict' >> beam.Map(lambda row : row._asdict())
| 'WriteAggToBQ' >> beam.io.WriteToBigQuery(
agg_table_name,
schema=agg_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
logging.getLogger().setLevel(logging.INFO)
logging.info("Building pipeline ...")
p.run()