in projects/conversational-commerce-agent/data-ingestion/flipkart_to_retail_search.py [0:0]
def convert_flipkart_to_retail_search_product(
input_file:str,
output_file:str,
project_number:str,
branch:str="0") -> str:
"""
Transforms a Flipkart JSONL file to
Google Cloud Retail Search Product Schema.
Args:
input_file: Path to the input Flipkart JSONL file.
output_file: Path to the output JSONL file.
project_number: Google Cloud Project number.
branch: Retail Search Branch Id. defaults to 0
Returns:
Path to the output JSONL file.
"""
processed_products = ""
with open(input_file, "r", encoding="utf-8") as infile:
with open(output_file, "w", encoding="utf-8") as outfile:
for line in infile:
try:
source_obj = json.loads(line)
target_obj = {}
# Required fields
target_obj["title"] = source_obj.get(
"product_name", "Unknown Product"
)
if "product_url" not in source_obj:
logging.warning(
(
"[Warning]Product doed not"
"have a product url:%s"
),
target_obj["title"]
)
continue
source_obj_brand = source_obj.get("brand")
if source_obj_brand == "":
logging.warning(
"[Warning]Product doed not have a brand:%s",
target_obj["title"]
)
source_obj_brand = "Unknown"
target_obj["brands"] = [
source_obj_brand
]
if target_obj["title"] in processed_products:
continue
else:
processed_products += f"""|{target_obj["title"]}"""
target_obj["categories"] = (
json.loads(source_obj["product_category_tree"])
if "product_category_tree" in source_obj
else ["Unknown"]
)
target_obj["id"] = source_obj["uniq_id"]
target_obj["name"] = (
f"projects/{project_number}/locations/global/catalogs/"
f"""default_catalog/branches/{branch}"""
f"""/products/{target_obj["id"]}"""
)
target_obj["primaryProductId"] = target_obj["id"]
target_obj["type"] = "PRIMARY" # Assuming all are primary
target_obj["description"] = source_obj.get(
"description", ""
)
target_desc = target_obj["description"]
if len(target_desc) >= 5000: # Max description
target_obj["description"] = target_desc[:5000]
target_obj["languageCode"] = "en-us" # Default language
source_images = source_obj.get("image", None)
if source_images is not None:
target_obj["images"] = [
{"uri": img}
for img in json.loads(source_images)
if "image" in source_obj
]
# Use the images on the shared GCS bucket.
for image_url in target_obj["images"]:
new_url = replace_domain_host(
url=image_url["uri"],
new_domain=(
"storage.googleapis.com/"
"gcp-retail-demo")
)
image_url["uri"] = new_url
continue
else:
logging.error(
"[Error]product does not have images:%s",
target_obj["title"])
continue
target_obj["uri"] = source_obj["product_url"]
# Price Information
if source_obj["discounted_price"] == "":
source_obj["discounted_price"]= 0
if source_obj["retail_price"] == "":
source_obj["retail_price"]= 0
item_price = float(source_obj.get("discounted_price", 0))
item_original_price = float(
source_obj.get("retail_price", 0)
)
target_obj["priceInfo"] = {
"currencyCode": "INR", # Replace with actual currency
"price": item_price,
"originalPrice": item_original_price,
"priceRange": {},
}
for image in target_obj["images"]:
if "height" in image:
del image["height"]
if "width" in image:
del image["width"]
# Attributes
target_obj["attributes"] = update_attributes(source_obj)
# Availability
target_obj["availability"] = "IN_STOCK"
target_obj["availableQuantity"] = 0
target_obj["fulfillmentInfo"] = [
{
"type": "custom-type-1",
"placeIds": ["mobile", "www"]
}
]
target_obj["retrievableFields"] = (
"name,title,brands,uri,categories,"
"priceInfo,description"
)
outfile.write(json.dumps(target_obj) + "\n")
except json.JSONDecodeError as e:
logging.error("""