projects/conversational-commerce-agent/data-ingestion/flipkart_to_retail_search.py (255 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Converting Flipkart dataset to Google Cloud Search for Retail data format. """ import argparse import csv import json import logging import os import re import requests logging.basicConfig(level=logging.INFO) def convert_csv_to_jsonl(input_file:str, output_file:str) -> str: """ This function does a one-to-one mapping from CSV to Json Args: input_file: Path to the input CSV file. output_file: Path to the output JSONL file. Returns: Path to the output JSONL file. """ csvfile = open(input_file, "r", encoding="utf-8") jsonfile = open(output_file, "w", encoding="utf-8") # Remove \n in the end headers = csvfile.readline().replace(os.linesep, "").split(",") reader = csv.DictReader(csvfile, headers) for row in reader: json.dump(row, jsonfile) jsonfile.write(os.linesep) return output_file def update_attributes(source_obj) -> dict: """ Update site level attribute controls Args: source_obj: source product attribute controls Returns: dict: updated attribute controls """ target_obj = { "attributes" : {} } if "product_specifications" in source_obj: source_prod_specs = source_obj["product_specifications"] try: source_prod_specs = source_prod_specs.replace( "=>nil", ":null" ) source_prod_specs = source_prod_specs.replace( "=>", ":" ) specs = json.loads( source_prod_specs ) except json.JSONDecodeError: logging.warning( ("[Warning]Could not decode " "product_specifications: %s"), source_obj ) return {} if specs.get("product_specification") is None: return {} for attr in specs["product_specification"]: target_attrs = target_obj["attributes"] try: if not isinstance(attr, dict): continue attr_key = "" attr_value = "" if "key" in attr: logging.info("* processing: %s", attr) attr_key = attr["key"] attr_value = attr.get("value", "") if (attr_value == "" or attr_key == ""): continue target_attrs[attr_key] = { "text": [attr_value] } if attr_key == "Type": target_attrs[attr_key]["searchable"] = True target_attrs[attr_key]["indexable"] = True elif not re.search(r"[ \-/\\]", attr_key): target_attrs[attr_key]["searchable"] = True target_attrs[attr_key]["indexable"] = True except KeyError as e: logging.error("[Error]%s", e) logging.error("* Attribute:%s", attr) else: logging.error("Product Spec not found.") return target_attrs def replace_domain_host( url:str, new_domain:str) -> str: """Replaces the domain name of a given URL. Args: url: The URL to modify. new_domain: The new domain name to use. Returns: The modified URL with the new domain name. """ try: start = url.index("//") + 2 end = url.find("/", start) return f"""{url[:start]}{new_domain}{url[end:]}""" except ValueError: # Return original url if errors return url def convert_flipkart_to_retail_search_product( input_file:str, output_file:str, project_number:str, branch:str="0") -> str: """ Transforms a Flipkart JSONL file to Google Cloud Retail Search Product Schema. Args: input_file: Path to the input Flipkart JSONL file. output_file: Path to the output JSONL file. project_number: Google Cloud Project number. branch: Retail Search Branch Id. defaults to 0 Returns: Path to the output JSONL file. """ processed_products = "" with open(input_file, "r", encoding="utf-8") as infile: with open(output_file, "w", encoding="utf-8") as outfile: for line in infile: try: source_obj = json.loads(line) target_obj = {} # Required fields target_obj["title"] = source_obj.get( "product_name", "Unknown Product" ) if "product_url" not in source_obj: logging.warning( ( "[Warning]Product doed not" "have a product url:%s" ), target_obj["title"] ) continue source_obj_brand = source_obj.get("brand") if source_obj_brand == "": logging.warning( "[Warning]Product doed not have a brand:%s", target_obj["title"] ) source_obj_brand = "Unknown" target_obj["brands"] = [ source_obj_brand ] if target_obj["title"] in processed_products: continue else: processed_products += f"""|{target_obj["title"]}""" target_obj["categories"] = ( json.loads(source_obj["product_category_tree"]) if "product_category_tree" in source_obj else ["Unknown"] ) target_obj["id"] = source_obj["uniq_id"] target_obj["name"] = ( f"projects/{project_number}/locations/global/catalogs/" f"""default_catalog/branches/{branch}""" f"""/products/{target_obj["id"]}""" ) target_obj["primaryProductId"] = target_obj["id"] target_obj["type"] = "PRIMARY" # Assuming all are primary target_obj["description"] = source_obj.get( "description", "" ) target_desc = target_obj["description"] if len(target_desc) >= 5000: # Max description target_obj["description"] = target_desc[:5000] target_obj["languageCode"] = "en-us" # Default language source_images = source_obj.get("image", None) if source_images is not None: target_obj["images"] = [ {"uri": img} for img in json.loads(source_images) if "image" in source_obj ] # Use the images on the shared GCS bucket. for image_url in target_obj["images"]: new_url = replace_domain_host( url=image_url["uri"], new_domain=( "storage.googleapis.com/" "gcp-retail-demo") ) image_url["uri"] = new_url continue else: logging.error( "[Error]product does not have images:%s", target_obj["title"]) continue target_obj["uri"] = source_obj["product_url"] # Price Information if source_obj["discounted_price"] == "": source_obj["discounted_price"]= 0 if source_obj["retail_price"] == "": source_obj["retail_price"]= 0 item_price = float(source_obj.get("discounted_price", 0)) item_original_price = float( source_obj.get("retail_price", 0) ) target_obj["priceInfo"] = { "currencyCode": "INR", # Replace with actual currency "price": item_price, "originalPrice": item_original_price, "priceRange": {}, } for image in target_obj["images"]: if "height" in image: del image["height"] if "width" in image: del image["width"] # Attributes target_obj["attributes"] = update_attributes(source_obj) # Availability target_obj["availability"] = "IN_STOCK" target_obj["availableQuantity"] = 0 target_obj["fulfillmentInfo"] = [ { "type": "custom-type-1", "placeIds": ["mobile", "www"] } ] target_obj["retrievableFields"] = ( "name,title,brands,uri,categories," "priceInfo,description" ) outfile.write(json.dumps(target_obj) + "\n") except json.JSONDecodeError as e: logging.error(""" ====== * Error decoding JSON object in line: Exception:%s Line:%s} ====== """, e, line.strip()) logging.info( "Successfully transformed %s to %s", input_file, output_file ) return output_file def check_existense(url:str) -> bool: """ Check if an URL exists. Args: url: An Flipkart product image URL. Returns: True if the image exists, otherwise False """ headers = {"User-Agent": "Mozilla/5.0", "accept-language": "en-US,en"} try: response = requests.get(url, headers=headers, timeout=1) # The server returns HTTP 200 with `Error 404 Not Found` # in the payload when the image is not available. # Hence checking for text instead of HTTP status code return False if "Error 404 Not Found" in response.text else True except requests.exceptions.RequestException as err: logging.error("[HttpError]%s", err) return False except requests.exceptions.HTTPError as e: logging.error("[HttpError]%s}", e) return False def prepare_arguments() -> dict: """ Configure and parse commandline arguments. Returns: A Dict holds commandline arguments. """ parser = argparse.ArgumentParser( description=("Converting Flipkart dataset " "to Search for Retail data format.") ) parser.add_argument("-i", "--input", help="Flipkart CSV file path.", required=True) parser.add_argument("-o", "--output", help="Search for Retail Jsonl file path.", required=True) parser.add_argument("-p", "--project-number", help="Search for Retail Jsonl file path.", required=True) parser.add_argument("-b", "--branch", help="Search for Retail Jsonl file path.", required=True) args = vars(parser.parse_args()) return { "input_file": args["input"], "output_file": args["output"], "project_number": args["project_number"], "branch": args["branch"] } if __name__ == "__main__": params = prepare_arguments() FLIPKART_CSV_FILE = params["input_file"] FLIPKART_JSON_FILE = params["input_file"] + ".jsonl" RETAIL_SEARCH_JSON_FILE = params["output_file"] PROJECT_NUMBER = params["project_number"] BRANCH = params["branch"] convert_csv_to_jsonl( input_file=FLIPKART_CSV_FILE, output_file=FLIPKART_JSON_FILE ) convert_flipkart_to_retail_search_product( input_file=FLIPKART_JSON_FILE, output_file=RETAIL_SEARCH_JSON_FILE, project_number=PROJECT_NUMBER, branch=BRANCH)