def run()

in modules/harness-build-flex-template/pubsub_dataflow_bigquery/pubsub_transform_bigquery.py [0:0]


def run(argv=None, save_main_session=True):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--output_table',
        required=True,
        help=(
            'Output BigQuery table for results specified as: '
            'PROJECT:DATASET.TABLE or DATASET.TABLE.'
        )
    )
    parser.add_argument(
        '--bq_schema',
        required=True,
        help=(
            'Output BigQuery table schema specified as string with format: '
            'FIELD_1:STRING,FIELD_2:STRING,...'
        )
    )
    parser.add_argument(
        "--window_interval_sec",
        default=30,
        type=int,
        help=(
            'Window interval in seconds for grouping incoming messages.'
        )
    )
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument(
        '--input_topic',
        help=(
            'Input PubSub topic of the form '
            '"projects/<PROJECT>/topics/<TOPIC>".'
            'A temporary subscription will be created from '
            'the specified topic.'
        )
    )
    group.add_argument(
        '--input_subscription',
        help=(
            'Input PubSub subscription of the form '
            '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'
        )
    )
    known_args, pipeline_args = parser.parse_known_args(argv)

    options = PipelineOptions(
        pipeline_args,
        save_main_session=True,
        streaming=True
    )

    with beam.Pipeline(options=options) as p:

        # Read from PubSub into a PCollection.
        # If input_subscription provided, it will be used.
        # If input_subscription not provided, input_topic will be used.
        # If input_topic provided, a temporary subscription will be created
        # from the specified topic.
        if known_args.input_subscription:
            messages = (
                p
                | 'Read from Pub/Sub' >>
                beam.io.ReadFromPubSub(
                    subscription=known_args.input_subscription
                ).with_output_types(bytes)
                | 'UTF-8 bytes to string' >>
                beam.Map(lambda msg: msg.decode("utf-8"))
                | 'Parse JSON payload' >>
                beam.Map(json.loads)
                | 'Flatten lists' >>
                beam.FlatMap(normalize_data)
                | 'Apply window' >> beam.WindowInto(
                    window.FixedWindows(known_args.window_interval_sec, 0)
                )
            )
        else:
            messages = (
                p
                | 'Read from Pub/Sub' >>
                beam.io.ReadFromPubSub(
                    topic=known_args.input_topic
                ).with_output_types(bytes)
                | 'UTF-8 bytes to string' >>
                beam.Map(lambda msg: msg.decode("utf-8"))
                | 'Parse JSON payload' >>
                beam.Map(json.loads)
                | 'Flatten lists' >>
                beam.FlatMap(normalize_data)
                | 'Apply window' >> beam.WindowInto(
                    window.FixedWindows(known_args.window_interval_sec, 0)
                )
            )

        transformed_messages = (
            messages
            | 'Data transformation' >> beam.Map(transform_data)
            )

        # Write to BigQuery.
        transformed_messages | 'Write to BQ' >> beam.io.WriteToBigQuery(
            known_args.output_table,
            schema=known_args.bq_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )