def convert_flipkart_to_retail_search_product()

in projects/conversational-commerce-agent/data-ingestion/flipkart_to_retail_search.py [0:0]


def convert_flipkart_to_retail_search_product(
    input_file:str,
    output_file:str,
    project_number:str,
    branch:str="0") -> str:

    """
    Transforms a Flipkart JSONL file to
    Google Cloud Retail Search Product Schema.

    Args:
      input_file: Path to the input Flipkart JSONL file.
      output_file: Path to the output JSONL file.
      project_number: Google Cloud Project number.
      branch: Retail Search Branch Id. defaults to 0
    Returns:
      Path to the output JSONL file.
    """

    processed_products = ""
    with open(input_file, "r", encoding="utf-8") as infile:
        with open(output_file, "w", encoding="utf-8") as outfile:
            for line in infile:
                try:
                    source_obj = json.loads(line)
                    target_obj = {}

                    # Required fields
                    target_obj["title"] = source_obj.get(
                        "product_name", "Unknown Product"
                    )

                    if "product_url" not in source_obj:
                        logging.warning(
                            (
                                "[Warning]Product doed not"
                                "have a product url:%s"
                            ),
                            target_obj["title"]
                        )
                        continue
                    source_obj_brand = source_obj.get("brand")
                    if source_obj_brand == "":
                        logging.warning(
                            "[Warning]Product doed not have a brand:%s",
                            target_obj["title"]
                        )
                        source_obj_brand = "Unknown"

                    target_obj["brands"] = [
                        source_obj_brand
                    ]

                    if target_obj["title"] in processed_products:
                        continue
                    else:
                        processed_products += f"""|{target_obj["title"]}"""

                    target_obj["categories"] = (
                        json.loads(source_obj["product_category_tree"])
                        if "product_category_tree" in source_obj
                        else ["Unknown"]
                    )
                    target_obj["id"] = source_obj["uniq_id"]
                    target_obj["name"] = (
                    f"projects/{project_number}/locations/global/catalogs/"
                    f"""default_catalog/branches/{branch}"""
                    f"""/products/{target_obj["id"]}"""
                    )
                    target_obj["primaryProductId"] = target_obj["id"]
                    target_obj["type"] = "PRIMARY"  # Assuming all are primary

                    target_obj["description"] = source_obj.get(
                                                "description", ""
                                                )
                    target_desc = target_obj["description"]
                    if len(target_desc) >= 5000: # Max description
                        target_obj["description"] = target_desc[:5000]

                    target_obj["languageCode"] = "en-us" # Default language

                    source_images = source_obj.get("image", None)
                    if source_images is not None:
                        target_obj["images"] = [
                            {"uri": img}
                            for img in json.loads(source_images)
                            if "image" in source_obj
                        ]

                        # Use the images on the shared GCS bucket.
                        for image_url in target_obj["images"]:
                            new_url = replace_domain_host(
                                url=image_url["uri"],
                                new_domain=(
                                    "storage.googleapis.com/"
                                    "gcp-retail-demo")
                                )
                            image_url["uri"] = new_url

                            continue
                    else:
                        logging.error(
                        "[Error]product does not have images:%s",
                        target_obj["title"])
                        continue

                    target_obj["uri"] = source_obj["product_url"]

                    # Price Information
                    if source_obj["discounted_price"] == "":
                        source_obj["discounted_price"]= 0
                    if source_obj["retail_price"] == "":
                        source_obj["retail_price"]= 0
                    item_price = float(source_obj.get("discounted_price", 0))
                    item_original_price = float(
                        source_obj.get("retail_price", 0)
                    )
                    target_obj["priceInfo"] = {
                        "currencyCode": "INR", # Replace with actual currency
                        "price": item_price,
                        "originalPrice": item_original_price,
                        "priceRange": {},
                    }
                    for image in target_obj["images"]:
                        if "height" in image:
                            del image["height"]
                        if "width" in image:
                            del image["width"]

                    # Attributes
                    target_obj["attributes"] = update_attributes(source_obj)

                    # Availability
                    target_obj["availability"] = "IN_STOCK"
                    target_obj["availableQuantity"] = 0
                    target_obj["fulfillmentInfo"] = [
                        {
                            "type": "custom-type-1",
                            "placeIds": ["mobile", "www"]
                         }
                    ]
                    target_obj["retrievableFields"] = (
                        "name,title,brands,uri,categories,"
                        "priceInfo,description"
                        )
                    outfile.write(json.dumps(target_obj) + "\n")

                except json.JSONDecodeError as e:
                    logging.error("""