in modules/harness-build-flex-template/pubsub_dataflow_bigquery/pubsub_transform_bigquery.py [0:0]
def run(argv=None, save_main_session=True):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output_table',
required=True,
help=(
'Output BigQuery table for results specified as: '
'PROJECT:DATASET.TABLE or DATASET.TABLE.'
)
)
parser.add_argument(
'--bq_schema',
required=True,
help=(
'Output BigQuery table schema specified as string with format: '
'FIELD_1:STRING,FIELD_2:STRING,...'
)
)
parser.add_argument(
"--window_interval_sec",
default=30,
type=int,
help=(
'Window interval in seconds for grouping incoming messages.'
)
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--input_topic',
help=(
'Input PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>".'
'A temporary subscription will be created from '
'the specified topic.'
)
)
group.add_argument(
'--input_subscription',
help=(
'Input PubSub subscription of the form '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'
)
)
known_args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(
pipeline_args,
save_main_session=True,
streaming=True
)
with beam.Pipeline(options=options) as p:
# Read from PubSub into a PCollection.
# If input_subscription provided, it will be used.
# If input_subscription not provided, input_topic will be used.
# If input_topic provided, a temporary subscription will be created
# from the specified topic.
if known_args.input_subscription:
messages = (
p
| 'Read from Pub/Sub' >>
beam.io.ReadFromPubSub(
subscription=known_args.input_subscription
).with_output_types(bytes)
| 'UTF-8 bytes to string' >>
beam.Map(lambda msg: msg.decode("utf-8"))
| 'Parse JSON payload' >>
beam.Map(json.loads)
| 'Flatten lists' >>
beam.FlatMap(normalize_data)
| 'Apply window' >> beam.WindowInto(
window.FixedWindows(known_args.window_interval_sec, 0)
)
)
else:
messages = (
p
| 'Read from Pub/Sub' >>
beam.io.ReadFromPubSub(
topic=known_args.input_topic
).with_output_types(bytes)
| 'UTF-8 bytes to string' >>
beam.Map(lambda msg: msg.decode("utf-8"))
| 'Parse JSON payload' >>
beam.Map(json.loads)
| 'Flatten lists' >>
beam.FlatMap(normalize_data)
| 'Apply window' >> beam.WindowInto(
window.FixedWindows(known_args.window_interval_sec, 0)
)
)
transformed_messages = (
messages
| 'Data transformation' >> beam.Map(transform_data)
)
# Write to BigQuery.
transformed_messages | 'Write to BQ' >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=known_args.bq_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)