def run()

in srf-longrun-job-dataflow/srflongrunjobdataflow.py [0:0]


def run(argv=None, save_main_session=True):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument(
        '--input_topic',
        help=('Input PubSub topic of the form '
              '"projects/<PROJECT>/topics/<TOPIC>".'))
    group.add_argument(
        '--input_subscription',
        help=('Input PubSub subscription of the form '
              '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
    parser.add_argument('--inspect_template', required=True,
        help='Input ID for dlp inspect template'
              '"ID TEMPLATE"')
    parser.add_argument('--output', required=True,
                        help='Output BQ table to write results to '
                             '"PROJECT_ID:DATASET.TABLE"')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    project_id = pipeline_options.view_as(GoogleCloudOptions).project

    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    pipeline_options.view_as(StandardOptions).streaming = True
    
    p = beam.Pipeline(options=pipeline_options)
    
    # Read from PubSub into a PCollection.
    if known_args.input_subscription:
        messages = (p
                    | beam.io.ReadFromPubSub(
                    subscription=known_args.input_subscription)
                    .with_output_types(bytes))
    else:
        messages = (p
                    | beam.io.ReadFromPubSub(topic=known_args.input_topic)
                    .with_output_types(bytes))

    decode_messages = messages | 'DecodePubSubMessages' >> beam.Map(lambda x: x.decode('utf-8'))

    # Get STT data from function for long audio file using asynchronous speech recognition
    stt_output = decode_messages | 'SpeechToTextOutput' >> beam.Map(stt_output_response)

    # Parse and enrich stt_output response
    parse_stt_output = stt_output | 'ParseSpeechToText' >> beam.Map(stt_parse_response)

    # Google Cloud DLP redaction for all info types
    dlp_output = parse_stt_output | 'FindDLP' >> beam.Map(lambda j: redact_text(j, project_id, template_id=known_args.inspect_template))

    # Convert to JSON
    json_output = dlp_output | 'JSONDumps' >> beam.Map(json.dumps)

    # Write findings to Cloud Storage
    json_output | 'WriteFindings' >> beam.ParDo(WriteToSeparateFiles(known_args.output + '/'))

    p.run()