in projects/conversational-commerce-agent/data-ingestion/cosmetics_to_retail_search.py [0:0]
def convert_flipkart_to_retail_search_product(
input_file:str,
output_file:str,
project_number:str,
branch:str="1") -> str:
"""
Transforms a Flipkart JSONL file to
Google Cloud Retail Search Product Schema.
Args:
input_file: Path to the input Flipkart JSONL file.
output_file: Path to the output JSONL file.
project_number: Google Cloud Project number.
branch: Retail Search Branch Id. defaults to 1
Returns:
Path to the output JSONL file.
"""
processed_products = ""
with open(input_file, "r", encoding="utf-8") as infile:
with open(output_file, "w", encoding="utf-8") as outfile:
source_objs = json.load(infile)
for source_obj in source_objs:
try:
target_obj = {}
# Required fields
target_obj["title"] = source_obj.get(
"name", "Unknown Product"
)
if "product_link" not in source_obj:
logging.warning(
(
"[Warning]Product doed not"
"have a product url:%s"
),
target_obj["title"]
)
continue
source_obj_brand = source_obj.get("brand")
if source_obj_brand == "":
logging.warning(
"[Warning]Product doed not have a brand:%s",
target_obj["title"]
)
source_obj_brand = "Unknown"
target_obj["brands"] = [
source_obj_brand
]
if target_obj["title"] in processed_products:
continue
else:
processed_products += f"""|{target_obj["title"]}"""
subcatagory = source_obj.get("category", "")
if subcatagory is None or subcatagory == "":
subcatagory = source_obj.get("product_type")
catagory = (
f"""{source_obj.get("product_type")} >> """
f"{subcatagory}"
)
target_obj["categories"] = catagory
prod_id = uuid.uuid4()
target_obj["id"] = f"""{source_obj.get("id", prod_id)}"""
target_obj["name"] = (
f"projects/{project_number}/locations/global/catalogs/"
f"""default_catalog/branches/{branch}"""
f"""/products/{target_obj["id"]}"""
)
target_obj["primaryProductId"] = target_obj["id"]
target_obj["type"] = "PRIMARY" # Assuming all are primary
target_obj["description"] = source_obj.get(
"description", {"description": ""}
)
target_desc = target_obj.get("description", "")
if target_desc is not None and len(target_desc) >= 5000:
# Max description
target_obj["description"] = target_desc[:5000]
target_obj["languageCode"] = "en-us" # Default language
source_image = source_obj.get("image_link", None)
if source_image is not None:
target_obj["images"] = [
{"uri": source_image}
]
else:
logging.error(
"[Error]product does not have images:%s",
target_obj["title"])
continue
target_obj["uri"] = source_obj["product_link"]
# Price Information
item_price = 0
item_original_price = 0
if "prince" in source_obj:
item_price = float(source_obj.get("price", 0))
if source_obj.get("price") is not None:
item_original_price = float(
source_obj.get("price")
)
if item_price > 0 or item_original_price > 0:
target_obj["priceInfo"] = {
"currencyCode": "USD",
"price": item_price
if item_price > 0 else item_original_price,
"originalPrice": item_original_price,
"priceRange": {},
}
# Attributes
target_obj["attributes"] = update_attributes(source_obj)
# Availability
target_obj["availability"] = "IN_STOCK"
target_obj["availableQuantity"] = 0
target_obj["fulfillmentInfo"] = [
{
"type": "custom-type-1",
"placeIds": ["mobile", "www"]
}
]
target_obj["retrievableFields"] = (
"name,title,brands,uri,categories,"
"priceInfo,description,attributes.Tags"
)
# For Promotion flow
if source_obj.get("product_type", "") == "mascara":
if ("Tags" in target_obj["attributes"] and
target_obj["attributes"]["Tags"] is not None):
target_obj["attributes"]["Tags"]["text"].append(
"PromotionItem"
)
outfile.write(json.dumps(target_obj) + "\n")
# For Blend recommendation flow
if target_obj["id"] == "828":
target_obj["id"] = "99828"
target_obj["categories"] = "foundation >> blend"
target_obj["attributes"]["Tags"] = {
"text": ["Natural", "Gluten Free"],
"searchable": True
}
target_obj["primaryProductId"] = "99828"
outfile.write(json.dumps(target_obj) + "\n")
except json.JSONDecodeError as e:
logging.error("""