in data-analytics/next25-turbocharge-ecomm/generate_and_publish_image.py [0:0]
def generate_and_publish_image(project_id: str, gcs_bucket_name: str, location: str = "us-central1") -> None:
"""
Generates a toy image using Vertex AI, uploads it to GCS, and publishes a message to Pub/Sub.
Args:
project_id: Your Google Cloud project ID.
gcs_bucket_name: The name of your GCS bucket.
location: The Vertex AI location. Defaults to "us-central1".
"""
vertexai.init(project=project_id, location=location)
model = ImageGenerationModel.from_pretrained("imagen-3.0-generate-002") # Or a suitable model
# Generate a toy name for the prompt
prompt = generate_random_toy_name()
print(f"Generated Prompt: {prompt}")
# Create a filename-safe version of the prompt
filename_safe_prompt = prompt.replace(" ", "_").lower()
# Generate the image
try:
images = model.generate_images(
prompt=prompt,
number_of_images=1,
language="en",
aspect_ratio="1:1",
safety_filter_level="block_some",
)
image = images[0] # Get the first (and only) image
image_bytes = image._image_bytes # Get image bytes
print(f"Created output image using {len(image_bytes)} bytes")
# Show the image (optional, useful for notebooks)
#image.show() # Uncomment if you're in a Jupyter environment
except Exception as e:
print(f"Error generating image: {e}")
return
# Initialize GCS and Pub/Sub clients
storage_client = storage.Client()
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, "demo6-topic") # Replace "demo6-topic" with your topic
# GCS Upload
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
image_name = f"image_{filename_safe_prompt}_{timestamp}.jpeg" # Use .jpeg extension
gcs_path = f"raw/{image_name}"
bucket = storage_client.bucket(gcs_bucket_name)
blob = bucket.blob(gcs_path)
blob.upload_from_string(image_bytes, content_type="image/jpeg") # Upload bytes
print(f"Image uploaded to gs://{gcs_bucket_name}/{gcs_path}")
except Exception as e:
print(f"Error uploading to GCS: {e}")
return # Exit if upload fails
# Pub/Sub Publish
try:
message_data = {
"image_path": f"gs://{gcs_bucket_name}/{gcs_path}",
"image_bucket": gcs_bucket_name,
"image_path_split": gcs_path,
"contextual_text": prompt,
}
message_bytes = json.dumps(message_data).encode("utf-8") # Use json.dumps
publish_future = publisher.publish(topic_path, data=message_bytes)
publish_future.result() # Wait for publish to complete (important!)
print(f"Message published to {topic_path}")
except Exception as e:
print(f"Error publishing message: {e}")