in flex-templates/python/regional_dlp_transform/bigquery_dlp_bigquery.py [0:0]
def run(argv=None, save_main_session=True):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
group = parser.add_argument_group()
group_exclusive = parser.add_mutually_exclusive_group(required=True)
group_exclusive.add_argument(
'--query',
help=(
'Input query to retrieve data from Dataset. '
'Example: `SELECT * FROM PROJECT:DATASET.TABLE LIMIT 100`. '
'You need to specify either an input-table or query. '
'It is recommended to use query '
'when you want to use a public dataset.'
)
)
group_exclusive.add_argument(
'--input_table',
help=(
'Input BigQuery table for results specified as: '
'PROJECT:DATASET.TABLE or DATASET.TABLE. '
'You need to specify either an input-table or query. '
'It is recommended to use input-table '
'for when you have your own dataset.'
)
)
group.add_argument(
'--output_table',
required=True,
help=(
'Output BigQuery table for results specified as: '
'PROJECT:DATASET.TABLE or DATASET.TABLE.'
)
)
group.add_argument(
'--bq_schema',
required=True,
help=(
'Output BigQuery table schema specified as string with format: '
'FIELD_1:STRING,FIELD_2:STRING,...'
)
)
group.add_argument(
'--dlp_project',
required=True,
help=(
'ID of the project that holds the DLP template.'
)
)
group.add_argument(
'--dlp_location',
required=False,
help=(
'The Location of the DLP template resource.'
)
)
group.add_argument(
'--deidentification_template_name',
required=True,
help=(
'Name of the DLP Structured De-identification Template '
'of the form "projects/<PROJECT>/locations/<LOCATION>'
'/deidentifyTemplates/<TEMPLATE_ID>"'
)
)
group.add_argument(
"--window_interval_sec",
default=30,
type=int,
help=(
'Window interval in seconds for grouping incoming messages.'
)
)
group.add_argument(
"--batch_size",
default=1000,
type=int,
help=(
'Number of records to be sent in a batch in ',
'the call to the Data Loss Prevention (DLP) API.'
)
)
group.add_argument(
"--dlp_transform",
default='RE-IDENTIFY',
required=True,
help=(
'DLP transformation type.'
)
)
known_args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(
pipeline_args,
save_main_session=True,
streaming=True
)
with beam.Pipeline(options=options) as p:
# Read from BigQuery into a PCollection.
if known_args.input_table is not None:
messages = (
p
| 'Read from BigQuery Table' >>
beam.io.ReadFromBigQuery(
table=known_args.input_table
)
| 'Apply window' >> beam.WindowInto(
window.FixedWindows(known_args.window_interval_sec, 0)
)
)
else:
if 'LIMIT' not in known_args.query:
logging.warning('The query has no LIMIT parameter set. '
'This can lead to a pipeline processing '
'taking more time.')
messages = (
p
| 'Run SQL query to read data from BigQuery Table.' >>
beam.io.ReadFromBigQuery(
query=known_args.query
)
| 'Apply window' >> beam.WindowInto(
window.FixedWindows(known_args.window_interval_sec, 0)
)
)
if known_args.dlp_transform == 'RE-IDENTIFY':
transformed_messages = (
messages
| "Batching" >> BatchElements(
min_batch_size=known_args.batch_size,
max_batch_size=known_args.batch_size
)
| 'Convert dicts to table' >>
beam.Map(from_list_dicts_to_table)
| 'Call DLP re-identification' >>
UnmaskDetectedDetails(
project=known_args.dlp_project,
location=known_args.dlp_location,
template_name=known_args.deidentification_template_name
)
| 'Convert table to dicts' >>
beam.FlatMap(from_table_to_list_dict)
)
else:
transformed_messages = (
messages
| "Batching" >> BatchElements(
min_batch_size=known_args.batch_size,
max_batch_size=known_args.batch_size
)
| 'Convert dicts to table' >>
beam.Map(from_list_dicts_to_table)
| 'Call DLP de-identification' >>
MaskDetectedDetails(
project=known_args.dlp_project,
location=known_args.dlp_location,
template_name=known_args.deidentification_template_name
)
| 'Convert table to dicts' >>
beam.FlatMap(from_table_to_list_dict)
)
# Write to BigQuery.
transformed_messages | 'Write to BQ' >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=known_args.bq_schema,
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
triggering_frequency=300,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)