projects/conversational-commerce-agent/data-ingestion/flipkart_to_retail_search.py (255 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Converting Flipkart dataset to
Google Cloud Search for Retail data format.
"""
import argparse
import csv
import json
import logging
import os
import re
import requests
logging.basicConfig(level=logging.INFO)
def convert_csv_to_jsonl(input_file:str, output_file:str) -> str:
"""
This function does a one-to-one mapping from CSV to Json
Args:
input_file: Path to the input CSV file.
output_file: Path to the output JSONL file.
Returns:
Path to the output JSONL file.
"""
csvfile = open(input_file, "r",
encoding="utf-8")
jsonfile = open(output_file, "w",
encoding="utf-8")
# Remove \n in the end
headers = csvfile.readline().replace(os.linesep, "").split(",")
reader = csv.DictReader(csvfile, headers)
for row in reader:
json.dump(row, jsonfile)
jsonfile.write(os.linesep)
return output_file
def update_attributes(source_obj) -> dict:
"""
Update site level attribute controls
Args:
source_obj: source product attribute controls
Returns:
dict: updated attribute controls
"""
target_obj = {
"attributes" : {}
}
if "product_specifications" in source_obj:
source_prod_specs = source_obj["product_specifications"]
try:
source_prod_specs = source_prod_specs.replace(
"=>nil", ":null"
)
source_prod_specs = source_prod_specs.replace(
"=>", ":"
)
specs = json.loads(
source_prod_specs
)
except json.JSONDecodeError:
logging.warning(
("[Warning]Could not decode "
"product_specifications: %s"),
source_obj
)
return {}
if specs.get("product_specification") is None:
return {}
for attr in specs["product_specification"]:
target_attrs = target_obj["attributes"]
try:
if not isinstance(attr, dict):
continue
attr_key = ""
attr_value = ""
if "key" in attr:
logging.info("* processing: %s", attr)
attr_key = attr["key"]
attr_value = attr.get("value", "")
if (attr_value == "" or
attr_key == ""):
continue
target_attrs[attr_key] = {
"text": [attr_value]
}
if attr_key == "Type":
target_attrs[attr_key]["searchable"] = True
target_attrs[attr_key]["indexable"] = True
elif not re.search(r"[ \-/\\]", attr_key):
target_attrs[attr_key]["searchable"] = True
target_attrs[attr_key]["indexable"] = True
except KeyError as e:
logging.error("[Error]%s", e)
logging.error("* Attribute:%s", attr)
else:
logging.error("Product Spec not found.")
return target_attrs
def replace_domain_host(
url:str,
new_domain:str) -> str:
"""Replaces the domain name of a given URL.
Args:
url: The URL to modify.
new_domain: The new domain name to use.
Returns:
The modified URL with the new domain name.
"""
try:
start = url.index("//") + 2
end = url.find("/", start)
return f"""{url[:start]}{new_domain}{url[end:]}"""
except ValueError:
# Return original url if errors
return url
def convert_flipkart_to_retail_search_product(
input_file:str,
output_file:str,
project_number:str,
branch:str="0") -> str:
"""
Transforms a Flipkart JSONL file to
Google Cloud Retail Search Product Schema.
Args:
input_file: Path to the input Flipkart JSONL file.
output_file: Path to the output JSONL file.
project_number: Google Cloud Project number.
branch: Retail Search Branch Id. defaults to 0
Returns:
Path to the output JSONL file.
"""
processed_products = ""
with open(input_file, "r", encoding="utf-8") as infile:
with open(output_file, "w", encoding="utf-8") as outfile:
for line in infile:
try:
source_obj = json.loads(line)
target_obj = {}
# Required fields
target_obj["title"] = source_obj.get(
"product_name", "Unknown Product"
)
if "product_url" not in source_obj:
logging.warning(
(
"[Warning]Product doed not"
"have a product url:%s"
),
target_obj["title"]
)
continue
source_obj_brand = source_obj.get("brand")
if source_obj_brand == "":
logging.warning(
"[Warning]Product doed not have a brand:%s",
target_obj["title"]
)
source_obj_brand = "Unknown"
target_obj["brands"] = [
source_obj_brand
]
if target_obj["title"] in processed_products:
continue
else:
processed_products += f"""|{target_obj["title"]}"""
target_obj["categories"] = (
json.loads(source_obj["product_category_tree"])
if "product_category_tree" in source_obj
else ["Unknown"]
)
target_obj["id"] = source_obj["uniq_id"]
target_obj["name"] = (
f"projects/{project_number}/locations/global/catalogs/"
f"""default_catalog/branches/{branch}"""
f"""/products/{target_obj["id"]}"""
)
target_obj["primaryProductId"] = target_obj["id"]
target_obj["type"] = "PRIMARY" # Assuming all are primary
target_obj["description"] = source_obj.get(
"description", ""
)
target_desc = target_obj["description"]
if len(target_desc) >= 5000: # Max description
target_obj["description"] = target_desc[:5000]
target_obj["languageCode"] = "en-us" # Default language
source_images = source_obj.get("image", None)
if source_images is not None:
target_obj["images"] = [
{"uri": img}
for img in json.loads(source_images)
if "image" in source_obj
]
# Use the images on the shared GCS bucket.
for image_url in target_obj["images"]:
new_url = replace_domain_host(
url=image_url["uri"],
new_domain=(
"storage.googleapis.com/"
"gcp-retail-demo")
)
image_url["uri"] = new_url
continue
else:
logging.error(
"[Error]product does not have images:%s",
target_obj["title"])
continue
target_obj["uri"] = source_obj["product_url"]
# Price Information
if source_obj["discounted_price"] == "":
source_obj["discounted_price"]= 0
if source_obj["retail_price"] == "":
source_obj["retail_price"]= 0
item_price = float(source_obj.get("discounted_price", 0))
item_original_price = float(
source_obj.get("retail_price", 0)
)
target_obj["priceInfo"] = {
"currencyCode": "INR", # Replace with actual currency
"price": item_price,
"originalPrice": item_original_price,
"priceRange": {},
}
for image in target_obj["images"]:
if "height" in image:
del image["height"]
if "width" in image:
del image["width"]
# Attributes
target_obj["attributes"] = update_attributes(source_obj)
# Availability
target_obj["availability"] = "IN_STOCK"
target_obj["availableQuantity"] = 0
target_obj["fulfillmentInfo"] = [
{
"type": "custom-type-1",
"placeIds": ["mobile", "www"]
}
]
target_obj["retrievableFields"] = (
"name,title,brands,uri,categories,"
"priceInfo,description"
)
outfile.write(json.dumps(target_obj) + "\n")
except json.JSONDecodeError as e:
logging.error("""
======
* Error decoding JSON object in line:
Exception:%s
Line:%s}
======
""", e, line.strip())
logging.info(
"Successfully transformed %s to %s",
input_file,
output_file
)
return output_file
def check_existense(url:str) -> bool:
"""
Check if an URL exists.
Args:
url: An Flipkart product image URL.
Returns:
True if the image exists, otherwise False
"""
headers = {"User-Agent": "Mozilla/5.0", "accept-language": "en-US,en"}
try:
response = requests.get(url, headers=headers, timeout=1)
# The server returns HTTP 200 with `Error 404 Not Found`
# in the payload when the image is not available.
# Hence checking for text instead of HTTP status code
return False if "Error 404 Not Found" in response.text else True
except requests.exceptions.RequestException as err:
logging.error("[HttpError]%s", err)
return False
except requests.exceptions.HTTPError as e:
logging.error("[HttpError]%s}", e)
return False
def prepare_arguments() -> dict:
"""
Configure and parse commandline arguments.
Returns:
A Dict holds commandline arguments.
"""
parser = argparse.ArgumentParser(
description=("Converting Flipkart dataset "
"to Search for Retail data format.")
)
parser.add_argument("-i", "--input",
help="Flipkart CSV file path.",
required=True)
parser.add_argument("-o", "--output",
help="Search for Retail Jsonl file path.",
required=True)
parser.add_argument("-p", "--project-number",
help="Search for Retail Jsonl file path.",
required=True)
parser.add_argument("-b", "--branch",
help="Search for Retail Jsonl file path.",
required=True)
args = vars(parser.parse_args())
return {
"input_file": args["input"],
"output_file": args["output"],
"project_number": args["project_number"],
"branch": args["branch"]
}
if __name__ == "__main__":
params = prepare_arguments()
FLIPKART_CSV_FILE = params["input_file"]
FLIPKART_JSON_FILE = params["input_file"] + ".jsonl"
RETAIL_SEARCH_JSON_FILE = params["output_file"]
PROJECT_NUMBER = params["project_number"]
BRANCH = params["branch"]
convert_csv_to_jsonl(
input_file=FLIPKART_CSV_FILE,
output_file=FLIPKART_JSON_FILE
)
convert_flipkart_to_retail_search_product(
input_file=FLIPKART_JSON_FILE,
output_file=RETAIL_SEARCH_JSON_FILE,
project_number=PROJECT_NUMBER,
branch=BRANCH)