nlp-sentiment-analysis/code/main.py (83 lines of code) (raw):
from google.cloud import bigquery, documentai_v1beta3, storage
from google.cloud import language_v2
import functions_framework
import os
import json
def process_document(bucket_name, object_name):
"""Performs sentiment analysis on a text document stored in GCS."""
print("Document processing started.")
language_client = language_v2.LanguageServiceClient()
storage_client = storage.Client()
file_path = f"/tmp/{object_name}"
print(f"Downloading document to: {file_path}")
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(object_name)
blob.download_to_filename(file_path)
with open(file_path, "r") as text_file:
document_content = text_file.read()
print("Performing sentiment analysis...")
document = language_v2.Document(content=document_content, type_=language_v2.Document.Type.PLAIN_TEXT)
response = language_client.analyze_sentiment(request={"document": document})
# Get results
sentiment_score = response.document_sentiment.score
sentiment_magnitude = response.document_sentiment.magnitude
sentences = [
{"text": sentence.text.content, "sentiment": sentence.sentiment.score}
for sentence in response.sentences
]
process_output(
bucket_name,
object_name,
sentiment_score,
sentiment_magnitude,
sentences,
)
def process_output(
bucket_name, object_name, sentiment_score, sentiment_magnitude, sentences
):
print("Storing results in GCS and BigQuery.")
# Prepare results JSON
results_json = {
"document_file_name": object_name,
"sentiment_score": sentiment_score,
"sentiment_magnitude": sentiment_magnitude,
"sentences": sentences,
}
results_json_str = json.dumps(results_json)
# Load to GCS
storage_client = storage.Client()
destination_bucket_name = os.environ["GCS_OUTPUT"]
destination_bucket = storage_client.bucket(destination_bucket_name)
results_json_blob = destination_bucket.blob(f"{object_name}.json")
results_json_blob.upload_from_string(results_json_str)
# Load to BigQuery (error handling omitted for brevity)
bq_client = bigquery.Client()
table_id = os.getenv("BQ_TABLE_ID")
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("document_file_name", "STRING"),
bigquery.SchemaField("sentiment_score", "FLOAT"),
bigquery.SchemaField("sentiment_magnitude", "FLOAT"),
bigquery.SchemaField("sentences", "RECORD", mode="REPEATED", fields=[
bigquery.SchemaField("text", "STRING"),
bigquery.SchemaField("sentiment", "FLOAT"),
]),
],
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)
uri = f"gs://{destination_bucket_name}/{object_name}.json"
load_job = bq_client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result()
# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def trigger_gcs(cloud_event):
data = cloud_event.data
event_id = cloud_event["id"]
event_type = cloud_event["type"]
bucket = data["bucket"]
name = data["name"]
metageneration = data["metageneration"]
timeCreated = data["timeCreated"]
updated = data["updated"]
print(f"Event ID: {event_id}")
print(f"Event type: {event_type}")
print(f"Bucket: {bucket}")
print(f"File: {name}")
print(f"Metageneration: {metageneration}")
print(f"Created: {timeCreated}")
print(f"Updated: {updated}")
process_document(bucket, name)