data-analytics/next25-turbocharge-ecomm/generate_and_publish_image.py (94 lines of code) (raw):
import vertexai
from vertexai.preview.vision_models import ImageGenerationModel
from google.cloud import storage
from google.cloud import pubsub_v1
import datetime
import random
import json # Use json.dumps for better message formatting
import io
def generate_random_toy_name() -> str:
"""
Generates a random, child-friendly toy name. Includes "toy".
"""
materials = [
"wooden", "plush", "plastic", "soft", "colorful",
"musical", "interactive", "stacking", "rolling", "cuddly",
]
animals = [
"bear", "dog", "cat", "rabbit", "lion", "elephant",
"monkey", "giraffe", "penguin", "parrot", "duck",
"fish", "horse", "puppy", "kitten", "bunny"
]
objects = [
"train", "blocks", "car", "truck", "puzzle",
"doll", "boat", "plane", "rattle", "stacker", "xylophone",
]
descriptors = [
"with tracks", "for building", "for cuddling", "that sings",
"with lights", "for learning", "", # Empty string for no extra descriptor
"set", "and friends"
]
material = random.choice(materials)
# Choose animal or object, not both.
if random.random() < 0.6: # 60% chance of an object
item = random.choice(objects)
animal = ""
else: # 40% chance of animal
item = ""
animal = random.choice(animals)
descriptor = random.choice(descriptors)
# Construct the name, handling different combinations logically.
if item:
name = f"{material} toy {item} {descriptor}"
elif animal:
name = f"{material} {animal} toy {descriptor}"
else: # Edge Case
name = f"{material} toy"
# Clean up extra spaces and capitalize.
name = " ".join(name.split()).title()
return name
def generate_and_publish_image(project_id: str, gcs_bucket_name: str, location: str = "us-central1") -> None:
"""
Generates a toy image using Vertex AI, uploads it to GCS, and publishes a message to Pub/Sub.
Args:
project_id: Your Google Cloud project ID.
gcs_bucket_name: The name of your GCS bucket.
location: The Vertex AI location. Defaults to "us-central1".
"""
vertexai.init(project=project_id, location=location)
model = ImageGenerationModel.from_pretrained("imagen-3.0-generate-002") # Or a suitable model
# Generate a toy name for the prompt
prompt = generate_random_toy_name()
print(f"Generated Prompt: {prompt}")
# Create a filename-safe version of the prompt
filename_safe_prompt = prompt.replace(" ", "_").lower()
# Generate the image
try:
images = model.generate_images(
prompt=prompt,
number_of_images=1,
language="en",
aspect_ratio="1:1",
safety_filter_level="block_some",
)
image = images[0] # Get the first (and only) image
image_bytes = image._image_bytes # Get image bytes
print(f"Created output image using {len(image_bytes)} bytes")
# Show the image (optional, useful for notebooks)
#image.show() # Uncomment if you're in a Jupyter environment
except Exception as e:
print(f"Error generating image: {e}")
return
# Initialize GCS and Pub/Sub clients
storage_client = storage.Client()
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, "demo6-topic") # Replace "demo6-topic" with your topic
# GCS Upload
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
image_name = f"image_{filename_safe_prompt}_{timestamp}.jpeg" # Use .jpeg extension
gcs_path = f"raw/{image_name}"
bucket = storage_client.bucket(gcs_bucket_name)
blob = bucket.blob(gcs_path)
blob.upload_from_string(image_bytes, content_type="image/jpeg") # Upload bytes
print(f"Image uploaded to gs://{gcs_bucket_name}/{gcs_path}")
except Exception as e:
print(f"Error uploading to GCS: {e}")
return # Exit if upload fails
# Pub/Sub Publish
try:
message_data = {
"image_path": f"gs://{gcs_bucket_name}/{gcs_path}",
"image_bucket": gcs_bucket_name,
"image_path_split": gcs_path,
"contextual_text": prompt,
}
message_bytes = json.dumps(message_data).encode("utf-8") # Use json.dumps
publish_future = publisher.publish(topic_path, data=message_bytes)
publish_future.result() # Wait for publish to complete (important!)
print(f"Message published to {topic_path}")
except Exception as e:
print(f"Error publishing message: {e}")
if __name__ == '__main__':
my_project_id = "data-connect-demo6"
my_gcs_bucket = "demo6-df-temp"
generate_and_publish_image(my_project_id, my_gcs_bucket)