def run()

in flex-templates/python/regional_dlp_de_identification/pubsub_dlp_bigquery.py [0:0]


def run(argv=None, save_main_session=True):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--output_table',
        required=True,
        help=(
            'Output BigQuery table for results specified as: '
            'PROJECT:DATASET.TABLE or DATASET.TABLE.'
        )
    )
    parser.add_argument(
        '--bq_schema',
        required=True,
        help=(
            'Output BigQuery table schema specified as string with format: '
            'FIELD_1:STRING,FIELD_2:STRING,...'
        )
    )
    parser.add_argument(
        '--dlp_project',
        required=True,
        help=(
            'ID of the project that holds the DLP template.'
        )
    )
    parser.add_argument(
        '--dlp_location',
        required=False,
        help=(
            'The Location of the DLP template resource.'
        )
    )
    parser.add_argument(
        '--deidentification_template_name',
        required=True,
        help=(
            'Name of the DLP Structured De-identification Template '
            'of the form "projects/<PROJECT>/locations/<LOCATION>'
            '/deidentifyTemplates/<TEMPLATE_ID>"'
        )
    )
    parser.add_argument(
        "--window_interval_sec",
        default=30,
        type=int,
        help=(
            'Window interval in seconds for grouping incoming messages.'
        )
    )
    parser.add_argument(
        "--batch_size",
        default=1000,
        type=int,
        help=(
            'Number of records to be sent in a batch in '
            'the call to the Data Loss Prevention (DLP) API.'
        )
    )
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument(
        '--input_topic',
        help=(
            'Input PubSub topic of the form '
            '"projects/<PROJECT>/topics/<TOPIC>".'
            'A temporary subscription will be created from '
            'the specified topic.'
        )
    )
    group.add_argument(
        '--input_subscription',
        help=(
            'Input PubSub subscription of the form '
            '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'
        )
    )
    known_args, pipeline_args = parser.parse_known_args(argv)

    options = PipelineOptions(
        pipeline_args,
        save_main_session=True,
        streaming=True
    )

    with beam.Pipeline(options=options) as p:

        # Read from PubSub into a PCollection.
        # If input_subscription provided, it will be used.
        # If input_subscription not provided, input_topic will be used.
        # If input_topic provided, a temporary subscription will be created
        # from the specified topic.
        if known_args.input_subscription:
            messages = (
                p
                | 'Read from Pub/Sub' >>
                beam.io.ReadFromPubSub(
                    subscription=known_args.input_subscription
                ).with_output_types(bytes)
                | 'UTF-8 bytes to string' >>
                beam.Map(lambda msg: msg.decode("utf-8"))
                | 'Parse JSON payload' >>
                beam.Map(json.loads)
                | 'Flatten lists' >>
                beam.FlatMap(normalize_data)
                | 'Apply window' >> beam.WindowInto(
                    window.FixedWindows(known_args.window_interval_sec, 0)
                )
            )
        else:
            messages = (
                p
                | 'Read from Pub/Sub' >>
                beam.io.ReadFromPubSub(
                    topic=known_args.input_topic
                ).with_output_types(bytes)
                | 'UTF-8 bytes to string' >>
                beam.Map(lambda msg: msg.decode("utf-8"))
                | 'Parse JSON payload' >>
                beam.Map(json.loads)
                | 'Flatten lists' >>
                beam.FlatMap(normalize_data)
                | 'Apply window' >> beam.WindowInto(
                    window.FixedWindows(known_args.window_interval_sec, 0)
                )
            )

        de_identified_messages = (
            messages
            | "Batching" >> BatchElements(
                min_batch_size=known_args.batch_size,
                max_batch_size=known_args.batch_size
            )
            | 'Convert dicts to table' >>
            beam.Map(from_list_dicts_to_table)
            | 'Call DLP de-identification' >>
            MaskDetectedDetails(
                project=known_args.dlp_project,
                location=known_args.dlp_location,
                template_name=known_args.deidentification_template_name
            )
            | 'Convert table to dicts' >>
            beam.FlatMap(from_table_to_list_dict)
        )

        # Write to BigQuery.
        de_identified_messages | 'Write to BQ' >> beam.io.WriteToBigQuery(
            known_args.output_table,
            schema=known_args.bq_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )