srf-longrun-job-dataflow/srflongrunjobdataflow.py (130 lines of code) (raw):
# Copyright 2021 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START SRF pubsub_to_cloud_storage]
import argparse
import logging
import json
import time
import apache_beam as beam
import google.cloud.dlp
import uuid
from apache_beam.io import filesystems
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from google.cloud.dlp import DlpServiceClient
class WriteToSeparateFiles(beam.DoFn):
def __init__(self, outdir):
self.outdir = outdir
def process(self, element):
x = uuid.uuid4()
record = json.loads(element)
file_name = record['filename'].split("/")
writer = filesystems.FileSystems.create(self.outdir + file_name[-1] + "_" + str(x)[:8] + ".json")
writer.write(json.dumps(record).encode("utf8"))
writer.close()
# function to get STT data from long audio file using asynchronous speech recognition
def stt_output_response(data):
from oauth2client.client import GoogleCredentials
from googleapiclient import discovery
credentials = GoogleCredentials.get_application_default()
pub_sub_data = json.loads(data)
speech_service = discovery.build('speech', 'v1p1beta1', credentials=credentials)
get_operation = speech_service.operations().get(name=pub_sub_data['sttnameid'])
response = get_operation.execute()
# handle polling of STT
if pub_sub_data['duration'] != 'NA':
sleep_duration = round(int(float(pub_sub_data['duration'])) / 2)
else:
sleep_duration = 5
logging.info('Sleeping for: %s', sleep_duration)
time.sleep(sleep_duration)
retry_count = 10
while retry_count > 0 and not response.get('done', False):
retry_count -= 1
time.sleep(120)
response = get_operation.execute()
# return response to include STT data and agent search word
response_list = [response,
pub_sub_data['filename']
]
return response_list
# function to get enrich stt_output function response
def stt_parse_response(stt_data):
parse_stt_output_response = {
'filename': stt_data[1],
'transcript': None,
'words': [],
'dlp': [],
}
string_transcript = ''
# get transcript from stt_data
for i in stt_data[0]['response']['results']:
if 'transcript' in i['alternatives'][0]:
string_transcript += str(i['alternatives'][0]['transcript']) + ' '
parse_stt_output_response['transcript'] = string_transcript[:-1] # remove the ending whitespace
for element in stt_data[0]['response']['results']:
for word in element['alternatives'][0]['words']:
parse_stt_output_response['words'].append(
{'word': word['word'], 'startsecs': word['startTime'].strip('s'),
'endsecs': word['endTime'].strip('s')})
return parse_stt_output_response
def destination(element):
return json.loads(element)["filename"]
# function to redact sensitive data from audio file
def redact_text(data, project, template_id):
#logging.info(data)
dlp = google.cloud.dlp_v2.DlpServiceClient()
parent = dlp.common_project_path(project)
request = google.cloud.dlp_v2.ListInfoTypesRequest()
response = dlp.list_info_types(request=request)
inspect_template_name = f"{parent}/inspectTemplates/{template_id}"
#logging.info(data['transcript'])
item = {"value": data['transcript']}
request = google.cloud.dlp_v2.InspectContentRequest(
parent=parent,
inspect_template_name=inspect_template_name,
item=item,)
response = dlp.inspect_content(request=request)
#logging.info(response)
if response.result.findings:
for finding in response.result.findings:
try:
if finding.quote:
#logging.info("Quote: {}".format(finding.quote))
data['dlp'].append(finding.quote)
except AttributeError:
pass
else:
logging.info("No findings.")
return data
def run(argv=None, save_main_session=True):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--input_topic',
help=('Input PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>".'))
group.add_argument(
'--input_subscription',
help=('Input PubSub subscription of the form '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
parser.add_argument('--inspect_template', required=True,
help='Input ID for dlp inspect template'
'"ID TEMPLATE"')
parser.add_argument('--output', required=True,
help='Output BQ table to write results to '
'"PROJECT_ID:DATASET.TABLE"')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
project_id = pipeline_options.view_as(GoogleCloudOptions).project
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
# Read from PubSub into a PCollection.
if known_args.input_subscription:
messages = (p
| beam.io.ReadFromPubSub(
subscription=known_args.input_subscription)
.with_output_types(bytes))
else:
messages = (p
| beam.io.ReadFromPubSub(topic=known_args.input_topic)
.with_output_types(bytes))
decode_messages = messages | 'DecodePubSubMessages' >> beam.Map(lambda x: x.decode('utf-8'))
# Get STT data from function for long audio file using asynchronous speech recognition
stt_output = decode_messages | 'SpeechToTextOutput' >> beam.Map(stt_output_response)
# Parse and enrich stt_output response
parse_stt_output = stt_output | 'ParseSpeechToText' >> beam.Map(stt_parse_response)
# Google Cloud DLP redaction for all info types
dlp_output = parse_stt_output | 'FindDLP' >> beam.Map(lambda j: redact_text(j, project_id, template_id=known_args.inspect_template))
# Convert to JSON
json_output = dlp_output | 'JSONDumps' >> beam.Map(json.dumps)
# Write findings to Cloud Storage
json_output | 'WriteFindings' >> beam.ParDo(WriteToSeparateFiles(known_args.output + '/'))
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
run()
# [END SRF pubsub_to_cloud_storage]