in srf-longrun-job-dataflow/srflongrunjobdataflow.py [0:0]
def run(argv=None, save_main_session=True):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--input_topic',
help=('Input PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>".'))
group.add_argument(
'--input_subscription',
help=('Input PubSub subscription of the form '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
parser.add_argument('--inspect_template', required=True,
help='Input ID for dlp inspect template'
'"ID TEMPLATE"')
parser.add_argument('--output', required=True,
help='Output BQ table to write results to '
'"PROJECT_ID:DATASET.TABLE"')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
project_id = pipeline_options.view_as(GoogleCloudOptions).project
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
# Read from PubSub into a PCollection.
if known_args.input_subscription:
messages = (p
| beam.io.ReadFromPubSub(
subscription=known_args.input_subscription)
.with_output_types(bytes))
else:
messages = (p
| beam.io.ReadFromPubSub(topic=known_args.input_topic)
.with_output_types(bytes))
decode_messages = messages | 'DecodePubSubMessages' >> beam.Map(lambda x: x.decode('utf-8'))
# Get STT data from function for long audio file using asynchronous speech recognition
stt_output = decode_messages | 'SpeechToTextOutput' >> beam.Map(stt_output_response)
# Parse and enrich stt_output response
parse_stt_output = stt_output | 'ParseSpeechToText' >> beam.Map(stt_parse_response)
# Google Cloud DLP redaction for all info types
dlp_output = parse_stt_output | 'FindDLP' >> beam.Map(lambda j: redact_text(j, project_id, template_id=known_args.inspect_template))
# Convert to JSON
json_output = dlp_output | 'JSONDumps' >> beam.Map(json.dumps)
# Write findings to Cloud Storage
json_output | 'WriteFindings' >> beam.ParDo(WriteToSeparateFiles(known_args.output + '/'))
p.run()