def run()

in flex-templates/python/regional_dlp_transform/bigquery_dlp_bigquery.py [0:0]


def run(argv=None, save_main_session=True):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    group = parser.add_argument_group()
    group_exclusive = parser.add_mutually_exclusive_group(required=True)
    group_exclusive.add_argument(
        '--query',
        help=(
            'Input query to retrieve data from Dataset. '
            'Example: `SELECT * FROM PROJECT:DATASET.TABLE LIMIT 100`. '
            'You need to specify either an input-table or query. '
            'It is recommended to use query '
            'when you want to use a public dataset.'
        )
    )
    group_exclusive.add_argument(
        '--input_table',
        help=(
            'Input BigQuery table for results specified as: '
            'PROJECT:DATASET.TABLE or DATASET.TABLE. '
            'You need to specify either an input-table or query. '
            'It is recommended to use input-table '
            'for when you have your own dataset.'
        )
    )
    group.add_argument(
        '--output_table',
        required=True,
        help=(
            'Output BigQuery table for results specified as: '
            'PROJECT:DATASET.TABLE or DATASET.TABLE.'
        )
    )
    group.add_argument(
        '--bq_schema',
        required=True,
        help=(
            'Output BigQuery table schema specified as string with format: '
            'FIELD_1:STRING,FIELD_2:STRING,...'
        )
    )
    group.add_argument(
        '--dlp_project',
        required=True,
        help=(
            'ID of the project that holds the DLP template.'
        )
    )
    group.add_argument(
        '--dlp_location',
        required=False,
        help=(
            'The Location of the DLP template resource.'
        )
    )
    group.add_argument(
        '--deidentification_template_name',
        required=True,
        help=(
            'Name of the DLP Structured De-identification Template '
            'of the form "projects/<PROJECT>/locations/<LOCATION>'
            '/deidentifyTemplates/<TEMPLATE_ID>"'
        )
    )
    group.add_argument(
        "--window_interval_sec",
        default=30,
        type=int,
        help=(
            'Window interval in seconds for grouping incoming messages.'
        )
    )
    group.add_argument(
        "--batch_size",
        default=1000,
        type=int,
        help=(
            'Number of records to be sent in a batch in ',
            'the call to the Data Loss Prevention (DLP) API.'
        )
    )
    group.add_argument(
        "--dlp_transform",
        default='RE-IDENTIFY',
        required=True,
        help=(
            'DLP transformation type.'
        )
    )
    known_args, pipeline_args = parser.parse_known_args(argv)

    options = PipelineOptions(
        pipeline_args,
        save_main_session=True,
        streaming=True
    )

    with beam.Pipeline(options=options) as p:

        # Read from BigQuery into a PCollection.
        if known_args.input_table is not None:
            messages = (
                p
                | 'Read from BigQuery Table' >>
                beam.io.ReadFromBigQuery(
                    table=known_args.input_table
                )
                | 'Apply window' >> beam.WindowInto(
                    window.FixedWindows(known_args.window_interval_sec, 0)
                )
            )
        else:
            if 'LIMIT' not in known_args.query:
                logging.warning('The query has no LIMIT parameter set. '
                                'This can lead to a pipeline processing '
                                'taking more time.')

            messages = (
                p
                | 'Run SQL query to read data from BigQuery Table.' >>
                beam.io.ReadFromBigQuery(
                    query=known_args.query
                )
                | 'Apply window' >> beam.WindowInto(
                    window.FixedWindows(known_args.window_interval_sec, 0)
                )
            )

        if known_args.dlp_transform == 'RE-IDENTIFY':
            transformed_messages = (
                messages
                | "Batching" >> BatchElements(
                    min_batch_size=known_args.batch_size,
                    max_batch_size=known_args.batch_size
                )
                | 'Convert dicts to table' >>
                beam.Map(from_list_dicts_to_table)
                | 'Call DLP re-identification' >>
                UnmaskDetectedDetails(
                    project=known_args.dlp_project,
                    location=known_args.dlp_location,
                    template_name=known_args.deidentification_template_name
                )
                | 'Convert table to dicts' >>
                beam.FlatMap(from_table_to_list_dict)
            )
        else:
            transformed_messages = (
                messages
                | "Batching" >> BatchElements(
                    min_batch_size=known_args.batch_size,
                    max_batch_size=known_args.batch_size
                )
                | 'Convert dicts to table' >>
                beam.Map(from_list_dicts_to_table)
                | 'Call DLP de-identification' >>
                MaskDetectedDetails(
                    project=known_args.dlp_project,
                    location=known_args.dlp_location,
                    template_name=known_args.deidentification_template_name
                )
                | 'Convert table to dicts' >>
                beam.FlatMap(from_table_to_list_dict)
            )

        # Write to BigQuery.
        transformed_messages | 'Write to BQ' >> beam.io.WriteToBigQuery(
            known_args.output_table,
            schema=known_args.bq_schema,
            method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
            triggering_frequency=300,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )