backend_apis/app/bulk_email_util.py (141 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Email Generation:
- Generate email messages that are designed to drive a desired outcome.
- These emails include text and visuals.
"""
import asyncio
import functools
import numpy as np
import pandas as pd
import vertexai
import google.auth
from google.auth import impersonated_credentials
import google.auth.transport.requests
from google.api_core.client_info import ClientInfo
from google.auth import credentials as auth_credentials
import tomllib
import io
import base64
from PIL import Image
from google.cloud import translate_v2 as translate
from vertexai.preview.generative_models import GenerativeModel, GenerationConfig
from vertexai.preview.vision_models import ImageGenerationModel
# Load configuration file
with open("/app/config.toml", "rb") as f:
config = tomllib.load(f)
project_id = config["global"]["project_id"]
location = config["global"]["location"]
credentials, _ = google.auth.default()
request = google.auth.transport.requests.Request()
credentials.refresh(request)
credentials.apply(headers = {'user-agent': 'cloud-solutions/genai-for-marketing-backend-v2.0'})
vertexai.init(
project=project_id,
location=location,
credentials=credentials)
text_llm = GenerativeModel(config["models"]["text_model_name"])
imagen = ImageGenerationModel.from_pretrained(config["models"]["image_model_name"])
translate_client = translate.Client(client_info=ClientInfo(user_agent='cloud-solutions/genai-for-marketing-backend-v2.0'))
# Default values
EMAIL_TEXT_PROMPT = config["prompts"]["prompt_email_text"]
AGE_BUCKET = config["data_sample"]["age_bucket"]
MALE_NAMES = config["data_sample"]["male_names"]
FEMALE_NAMES = config["data_sample"]["female_names"]
LANGUAGES = config["data_sample"]["languages"]
LANGUAGES_MAP = config["data_sample"]["languages_map"]
IMAGE_PROMPT = config["prompts"]["prompt_image_generation"]
def generate_information(data : list) -> pd.DataFrame:
df = pd.DataFrame.from_dict(data)
rng = np.random.default_rng(
abs(hash(df.at[0,'email']) % (10 ** 8)))
df['first_name'] = rng.choice(
FEMALE_NAMES+MALE_NAMES, len(df['email']))
df['language'] = rng.choice(
LANGUAGES, len(df['email']))
df['age_group'] = rng.choice(
AGE_BUCKET, len(df['email']))
df['gender'] = df['first_name'].map(
lambda x: 'woman' if x in FEMALE_NAMES else 'man')
df['city'] = np.full_like(
df['email'], 'New York City')
return df
async def email_generate(row: pd.Series, theme: str, image_context: str) -> pd.Series:
first_name = row['first_name']
email_prompt = EMAIL_TEXT_PROMPT.format(first_name,
theme)
loop = asyncio.get_running_loop()
progress_text = f"Generating email text for {first_name}"
print(progress_text)
generated_text = ""
generated_images = []
try:
generated_response = await loop.run_in_executor(
None,
functools.partial(
text_llm.generate_content,
contents=email_prompt,
generation_config=GenerationConfig(temperature=0.2,
top_p=0.8,
top_k=40,
max_output_tokens=1024),
)
)
except Exception as e:
generated_response = None
print("Error")
print(str(e))
if generated_response and generated_response.text:
generated_text = generated_response.text
else:
generated_text = "No text was generated for this email."
if "language" in row and row.language != "en":
translation = translate_client.translate(
generated_text,
source_language="en",
target_language=row.language,
format_="text"
)['translatedText']
else:
translation = generated_text
if image_context != None and image_context != '':
try:
prompt_image = IMAGE_PROMPT
imagen_responses = await loop.run_in_executor(
None,
functools.partial(
imagen.generate_images,
prompt=prompt_image.format(
image_context
),
number_of_images=1))
except Exception as e:
print(prompt_image.format(
image_context
))
print(str(e))
else:
for image in imagen_responses:
generated_images.append(
{
"images_base64_string": image._as_base64_string(),
}
)
if len(generated_images) > 0 :
generated_image = generated_images[0]["images_base64_string"]
buffer = io.BytesIO()
imgdata = base64.b64decode(generated_image)
img = Image.open(io.BytesIO(imgdata))
new_img = img.resize((250, 250))
new_img.save(buffer, format="PNG")
generated_image = base64.b64encode(buffer.getvalue())
else:
generated_image = ''
return pd.Series(
[row.first_name, row.email, generated_text, translation,LANGUAGES_MAP[row.language],generated_image],
index=["first_name","email","text","translation","language","generated_image"]
)
async def generate_emails(
number_of_emails:int,
theme: str,
audience_data: list,
image_context:str
):
audience_dataframe = pd.DataFrame.from_dict(audience_data)
async_list = await asyncio.gather(
*(email_generate(person[1], theme=str(theme),image_context=image_context
) for person in audience_dataframe.head(number_of_emails).iterrows()))
#print(async_list)
df = pd.concat(async_list,axis=1).T.to_dict('records')
return df