def convert_flipkart_to_retail_search_product()

in projects/conversational-commerce-agent/data-ingestion/cosmetics_to_retail_search.py [0:0]


def convert_flipkart_to_retail_search_product(
    input_file:str,
    output_file:str,
    project_number:str,
    branch:str="1") -> str:

    """
    Transforms a Flipkart JSONL file to
    Google Cloud Retail Search Product Schema.

    Args:
      input_file: Path to the input Flipkart JSONL file.
      output_file: Path to the output JSONL file.
      project_number: Google Cloud Project number.
      branch: Retail Search Branch Id. defaults to 1
    Returns:
      Path to the output JSONL file.
    """

    processed_products = ""
    with open(input_file, "r", encoding="utf-8") as infile:
        with open(output_file, "w", encoding="utf-8") as outfile:
            source_objs = json.load(infile)
            for source_obj in source_objs:
                try:
                    target_obj = {}

                    # Required fields
                    target_obj["title"] = source_obj.get(
                        "name", "Unknown Product"
                    )

                    if "product_link" not in source_obj:
                        logging.warning(
                            (
                                "[Warning]Product doed not"
                                "have a product url:%s"
                            ),
                            target_obj["title"]
                        )
                        continue
                    source_obj_brand = source_obj.get("brand")
                    if source_obj_brand == "":
                        logging.warning(
                            "[Warning]Product doed not have a brand:%s",
                            target_obj["title"]
                        )
                        source_obj_brand = "Unknown"

                    target_obj["brands"] = [
                        source_obj_brand
                    ]

                    if target_obj["title"] in processed_products:
                        continue
                    else:
                        processed_products += f"""|{target_obj["title"]}"""

                    subcatagory = source_obj.get("category", "")

                    if subcatagory is None or subcatagory == "":
                        subcatagory = source_obj.get("product_type")

                    catagory = (
                        f"""{source_obj.get("product_type")} >> """
                        f"{subcatagory}"
                    )
                    target_obj["categories"] = catagory
                    prod_id = uuid.uuid4()
                    target_obj["id"] = f"""{source_obj.get("id", prod_id)}"""
                    target_obj["name"] = (
                    f"projects/{project_number}/locations/global/catalogs/"
                    f"""default_catalog/branches/{branch}"""
                    f"""/products/{target_obj["id"]}"""
                    )
                    target_obj["primaryProductId"] = target_obj["id"]
                    target_obj["type"] = "PRIMARY"  # Assuming all are primary
                    target_obj["description"] = source_obj.get(
                            "description", {"description": ""}
                            )
                    target_desc = target_obj.get("description", "")
                    if target_desc is not None and len(target_desc) >= 5000:
                         # Max description
                        target_obj["description"] = target_desc[:5000]

                    target_obj["languageCode"] = "en-us" # Default language

                    source_image = source_obj.get("image_link", None)
                    if source_image is not None:
                        target_obj["images"] = [
                            {"uri": source_image}
                        ]
                    else:
                        logging.error(
                        "[Error]product does not have images:%s",
                        target_obj["title"])
                        continue

                    target_obj["uri"] = source_obj["product_link"]

                    # Price Information
                    item_price = 0
                    item_original_price = 0
                    if "prince" in source_obj:
                        item_price = float(source_obj.get("price", 0))
                    if source_obj.get("price") is not None:
                        item_original_price = float(
                            source_obj.get("price")
                        )
                    if item_price > 0 or item_original_price > 0:
                        target_obj["priceInfo"] = {
                            "currencyCode": "USD",
                            "price": item_price
                                if item_price > 0 else item_original_price,
                            "originalPrice": item_original_price,
                            "priceRange": {},
                        }

                    # Attributes
                    target_obj["attributes"] = update_attributes(source_obj)

                    # Availability
                    target_obj["availability"] = "IN_STOCK"
                    target_obj["availableQuantity"] = 0
                    target_obj["fulfillmentInfo"] = [
                        {
                            "type": "custom-type-1",
                            "placeIds": ["mobile", "www"]
                         }
                    ]
                    target_obj["retrievableFields"] = (
                        "name,title,brands,uri,categories,"
                        "priceInfo,description,attributes.Tags"
                        )

                    # For Promotion flow
                    if  source_obj.get("product_type", "") == "mascara":
                        if ("Tags" in target_obj["attributes"] and
                            target_obj["attributes"]["Tags"] is not None):
                            target_obj["attributes"]["Tags"]["text"].append(
                                "PromotionItem"
                            )

                    outfile.write(json.dumps(target_obj) + "\n")

                    # For Blend recommendation flow
                    if target_obj["id"] == "828":
                        target_obj["id"] = "99828"
                        target_obj["categories"] = "foundation >> blend"
                        target_obj["attributes"]["Tags"] = {
                            "text": ["Natural", "Gluten Free"],
                            "searchable": True
                        }
                        target_obj["primaryProductId"] = "99828"
                        outfile.write(json.dumps(target_obj) + "\n")
                except json.JSONDecodeError as e:
                    logging.error("""