def generate_and_publish_image()

in data-analytics/next25-turbocharge-ecomm/generate_and_publish_image.py [0:0]


def generate_and_publish_image(project_id: str, gcs_bucket_name: str, location: str = "us-central1") -> None:
    """
    Generates a toy image using Vertex AI, uploads it to GCS, and publishes a message to Pub/Sub.

    Args:
        project_id: Your Google Cloud project ID.
        gcs_bucket_name: The name of your GCS bucket.
        location: The Vertex AI location. Defaults to "us-central1".
    """

    vertexai.init(project=project_id, location=location)
    model = ImageGenerationModel.from_pretrained("imagen-3.0-generate-002")  # Or a suitable model

    # Generate a toy name for the prompt
    prompt = generate_random_toy_name()
    print(f"Generated Prompt: {prompt}")

    # Create a filename-safe version of the prompt
    filename_safe_prompt = prompt.replace(" ", "_").lower()

    # Generate the image
    try:
        images = model.generate_images(
            prompt=prompt,
            number_of_images=1,
            language="en",
            aspect_ratio="1:1",
            safety_filter_level="block_some",
        )
        image = images[0]  # Get the first (and only) image
        image_bytes = image._image_bytes # Get image bytes

        print(f"Created output image using {len(image_bytes)} bytes")

        # Show the image (optional, useful for notebooks)
        #image.show()  # Uncomment if you're in a Jupyter environment


    except Exception as e:
        print(f"Error generating image: {e}")
        return

    # Initialize GCS and Pub/Sub clients
    storage_client = storage.Client()
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, "demo6-topic")  # Replace "demo6-topic" with your topic

    # GCS Upload
    try:
        timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
        image_name = f"image_{filename_safe_prompt}_{timestamp}.jpeg"  # Use .jpeg extension
        gcs_path = f"raw/{image_name}"
        bucket = storage_client.bucket(gcs_bucket_name)
        blob = bucket.blob(gcs_path)
        blob.upload_from_string(image_bytes, content_type="image/jpeg")  # Upload bytes
        print(f"Image uploaded to gs://{gcs_bucket_name}/{gcs_path}")

    except Exception as e:
        print(f"Error uploading to GCS: {e}")
        return  # Exit if upload fails

    # Pub/Sub Publish
    try:
        message_data = {
            "image_path": f"gs://{gcs_bucket_name}/{gcs_path}",
            "image_bucket": gcs_bucket_name,
            "image_path_split": gcs_path,
            "contextual_text": prompt,
        }
        message_bytes = json.dumps(message_data).encode("utf-8")  # Use json.dumps

        publish_future = publisher.publish(topic_path, data=message_bytes)
        publish_future.result()  # Wait for publish to complete (important!)
        print(f"Message published to {topic_path}")

    except Exception as e:
        print(f"Error publishing message: {e}")