in flex-templates/python/regional_dlp_de_identification/pubsub_dlp_bigquery.py [0:0]
def run(argv=None, save_main_session=True):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output_table',
required=True,
help=(
'Output BigQuery table for results specified as: '
'PROJECT:DATASET.TABLE or DATASET.TABLE.'
)
)
parser.add_argument(
'--bq_schema',
required=True,
help=(
'Output BigQuery table schema specified as string with format: '
'FIELD_1:STRING,FIELD_2:STRING,...'
)
)
parser.add_argument(
'--dlp_project',
required=True,
help=(
'ID of the project that holds the DLP template.'
)
)
parser.add_argument(
'--dlp_location',
required=False,
help=(
'The Location of the DLP template resource.'
)
)
parser.add_argument(
'--deidentification_template_name',
required=True,
help=(
'Name of the DLP Structured De-identification Template '
'of the form "projects/<PROJECT>/locations/<LOCATION>'
'/deidentifyTemplates/<TEMPLATE_ID>"'
)
)
parser.add_argument(
"--window_interval_sec",
default=30,
type=int,
help=(
'Window interval in seconds for grouping incoming messages.'
)
)
parser.add_argument(
"--batch_size",
default=1000,
type=int,
help=(
'Number of records to be sent in a batch in '
'the call to the Data Loss Prevention (DLP) API.'
)
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--input_topic',
help=(
'Input PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>".'
'A temporary subscription will be created from '
'the specified topic.'
)
)
group.add_argument(
'--input_subscription',
help=(
'Input PubSub subscription of the form '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'
)
)
known_args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(
pipeline_args,
save_main_session=True,
streaming=True
)
with beam.Pipeline(options=options) as p:
# Read from PubSub into a PCollection.
# If input_subscription provided, it will be used.
# If input_subscription not provided, input_topic will be used.
# If input_topic provided, a temporary subscription will be created
# from the specified topic.
if known_args.input_subscription:
messages = (
p
| 'Read from Pub/Sub' >>
beam.io.ReadFromPubSub(
subscription=known_args.input_subscription
).with_output_types(bytes)
| 'UTF-8 bytes to string' >>
beam.Map(lambda msg: msg.decode("utf-8"))
| 'Parse JSON payload' >>
beam.Map(json.loads)
| 'Flatten lists' >>
beam.FlatMap(normalize_data)
| 'Apply window' >> beam.WindowInto(
window.FixedWindows(known_args.window_interval_sec, 0)
)
)
else:
messages = (
p
| 'Read from Pub/Sub' >>
beam.io.ReadFromPubSub(
topic=known_args.input_topic
).with_output_types(bytes)
| 'UTF-8 bytes to string' >>
beam.Map(lambda msg: msg.decode("utf-8"))
| 'Parse JSON payload' >>
beam.Map(json.loads)
| 'Flatten lists' >>
beam.FlatMap(normalize_data)
| 'Apply window' >> beam.WindowInto(
window.FixedWindows(known_args.window_interval_sec, 0)
)
)
de_identified_messages = (
messages
| "Batching" >> BatchElements(
min_batch_size=known_args.batch_size,
max_batch_size=known_args.batch_size
)
| 'Convert dicts to table' >>
beam.Map(from_list_dicts_to_table)
| 'Call DLP de-identification' >>
MaskDetectedDetails(
project=known_args.dlp_project,
location=known_args.dlp_location,
template_name=known_args.deidentification_template_name
)
| 'Convert table to dicts' >>
beam.FlatMap(from_table_to_list_dict)
)
# Write to BigQuery.
de_identified_messages | 'Write to BQ' >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=known_args.bq_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)