tools/genai-prod-catalog-enrichment/main.py (342 lines of code) (raw):

import os import json import csv import traceback import datetime import time import requests import fitz from pdf_helper import parse_pdf from genai_helper import generate_faqs, \ generate_isqs, generate_company_details, \ get_specific_caption, \ generate_tags_json, generate_category_json from genai_helper import generate_images_json as gij from text_helper import clean_text, get_company_text from google.cloud import storage def get_todays_date(): """Returns today's date in 'DD_MM_YYYY' format.""" today = datetime.date.today() return today.strftime('%d_%m_%Y') def download_blob(bucket_name, source_blob_name, destination_file_name, prefix, csv_folder): """Downloads a blob from the bucket.""" storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(f"{prefix}/{source_blob_name}") blob.download_to_filename(f"{csv_folder}/{destination_file_name}") print( "[INFO]: Downloaded storage object " "{} from bucket {} to local file {}.".format( source_blob_name, bucket_name, destination_file_name ) ) def upload_blob(bucket_name, source_file_name, destination_blob_name, prefix, date_): """Uploads a file to the bucket.""" storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(f"{prefix}/{date_}/{destination_blob_name}") blob.upload_from_filename(source_file_name) print(f"[INFO]: File {source_file_name} " f"uploaded to {destination_blob_name}.") def get_csv_info(csv_gcs_uri, bucket_name, csv_folder): download_blob(bucket_name, csv_gcs_uri.split('/')[-1], csv_gcs_uri.split("/")[-1], csv_gcs_uri.split('/')[-2], csv_folder) with open(f"{csv_folder}/{csv_gcs_uri.split('/')[-1]}", "r") as f: reader = csv.reader(f) csv_data = list(reader) return csv_data[1:] def download_pdf(url, filename): response = requests.get(url) with open(filename, 'wb') as f: f.write(response.content) def update_output_json(json_output_response, faq_json, isq_json, product_images, product_category, product_tags, error_ws2_message): todays_date = get_todays_date() products_output_response = get_output_template() products_output_response[0]["product_name_citation"][0]["created_date"] = todays_date products_output_response[0]["specification_citation"][0]["created_date"] = todays_date products_output_response[0]["product_image_citation"][0]["created_date"] = todays_date products_output_response[0]["product_category_citation"][0]["created_date"] = todays_date try: products_output_response[0]["product_images"] = product_images products_output_response[0]["product_category"] = product_category products_output_response[0]["product_tags"] = product_tags if "response_error" not in isq_json: if "product_name" in isq_json: products_output_response[0]["product_name"] = isq_json["product_name"] if "confidence_score" in isq_json: products_output_response[0]["product_name_citation"][0]["confidence_score"] = isq_json["confidence_score"] products_output_response[0]["specification_citation"][0]["confidence_score"] = isq_json["confidence_score"] if "specifications" in isq_json: products_output_response[0]["specification"] = isq_json["specifications"] json_output_response["products"].extend(products_output_response) else: products_output_response[0] = {**products_output_response[0], **isq_json} json_output_response["products"].extend(products_output_response) if "response_error" not in products_output_response[0] and error_ws2_message != '': response_error = {"message": error_ws2_message} products_output_response[0]["response_error"] = response_error if "response_error" not in faq_json: json_output_response["catalogue_faqs"].extend(faq_json[list(faq_json.keys())[0]]) else: json_output_response["catalogue_faqs"].append(faq_json) return json_output_response except Exception as e: print(f"[ERROR]: Error while updating output JSON - {e}") return {} def get_output_template(): products_output_response = [ { "product_name": [], "product_name_citation": [ { "pdf_name": "", "url": "", "page": 0, "startIndex": [], "endIndex": [], "confidence_score": 0.0, "created_date": "01/01/1970" } ], "specification": {}, "specification_citation": [ { "attribute": "", "pdf_name": "", "url": "", "page": 0, "startIndex": [], "endIndex": [], "confidence_score": 0.0, "created_date": "01/01/1970" } ], "product_images": [ ], "product_image_citation": [ { "pdf_name": "", "url": "", "page": 0, "startIndex": [], "endIndex": [], "confidence_score": 0.0, "created_date": "01/01/1970" } ], "product_category": [], "product_tags": [], "product_category_citation": [ { "pdf_name": "", "url": "", "page": 0, "startIndex": [], "endIndex": [], "confidence_score": 0.0, "created_date": "01/01/1970" } ] } ] return products_output_response def create_json_from_dict(json_path, dict_, gcs_bucket=None): if gcs_bucket: bucket_object = storage.Client().get_bucket(gcs_bucket) json_path = bucket_object.blob(json_path) json_obj = json.dumps(dict_) json_path.upload_from_string(data=json_obj, content_type='application/json') else: with open(json_path, "w") as outfile: json.dump(dict_, outfile) def check_pdf_type(filename, input_gcs_bucket=None): scanned = False tables = False images = False total_images = 0 if input_gcs_bucket: bucket_object = storage.Client().bucket(input_gcs_bucket) blob = bucket_object.blob(filename) pdf_file = fitz.open("pdf", blob.download_as_bytes()) else: pdf_file = fitz.open(filename) pages = [] for page_no, page in enumerate(pdf_file, start=1): page_wise_details = {"scanned": False, "Images": False, "Tables": False, "Total Images": 0} print(f"Page no {page_no}") if not page.get_text(): print("[Info]: scanned!") scanned = True page_wise_details["scanned"] = True images_info = page.get_images() print(f"[Info]: No of images: {len(images_info)}") if len(images_info) > 0: images = True page_wise_details["Images"] = True page_wise_details["Total Images"] = len(images_info) total_images = total_images + len(images_info) tables = page.find_tables() if len(list(tables)) > 0: print("[Info]: Tables present") tables = True page_wise_details["Tables"] = True pages.append(page_wise_details) return {"scanned": scanned, "Tables": tables, "Images": images, "Total_Images": total_images, "pages": pages} def end_to_end_pipeline(input_pdf_uri, output_gcs_bucket, project_id): # csv_folder = "csv" pdf_folder = "pdf_files" # date_ = get_todays_date() # os.makedirs(csv_folder, exist_ok=True) os.makedirs(pdf_folder, exist_ok=True) try: start = time.time() # csv_data = get_csv_info(csv_gcs_uri, bucket_name, csv_folder) end = time.time() print(f"[INFO]: CSV data fetched " f"successfully in {end - start} seconds") except Exception as e: print(f"[ERROR]: Error " f"during fetching data from CSV - {e}") return None uri = input_pdf_uri # check if file name exists full_path = uri.replace("gs://", "") input_gcs_bucket = full_path.split("/")[0] filename = full_path.replace(f"{input_gcs_bucket}/", "") name = filename.split("/")[-1] output_gcs_bucket = output_gcs_bucket try: pdf_type = check_pdf_type(filename, input_gcs_bucket) if not pdf_type["scanned"]: start = time.time() pdf_json = parse_pdf(filename, input_gcs_bucket, output_gcs_bucket) end = time.time() print(f"[INFO]: Parsed PDF successfully in {end - start} seconds") pdf_json = get_specific_caption(pdf_json) name_initials = name.replace(".pdf", "") json_path = f"./{name_initials}.json" create_json_from_dict(json_path, pdf_json, output_gcs_bucket) pdf_json = clean_text(pdf_json) json_output_response = { "pdf_name": f"{filename.split('/')[-1]}", "pdf_url": uri, # f"gs://{input_gcs_bucket}/{filename}", "company_details": {}, "products": [], "catalogue_faqs": [] # "pc_item_doc_id": pc_item_doc_id, # "pc_doc_modified_date": pc_doc_modified_date, # "pc_item_doc_path": pc_item_doc_path, # "fk_pc_item_id": fk_pc_item_id, # "fk_glusr_usr_id": fk_glusr_usr_id } print(json_output_response, "\n") start = time.time() company_text = get_company_text(pdf_json['pages']) if company_text: company_details = \ generate_company_details(company_text, project_id) if "company_details" in company_details: json_output_response["company_details"] = \ company_details["company_details"] elif "response_error" in company_details: json_output_response["company_details"] = \ company_details else: json_output_response["company_details"] = {} end = time.time() print(f"[INFO]: Company details " f"extraction time: {end - start} seconds") for page_no, page in enumerate(pdf_json['pages']): try: print(str(page_no)) context = page['texts']['full_text'] start = time.time() faq_json = generate_faqs(context, project_id) isq_json = generate_isqs(context, project_id) end = time.time() print(f"[INFO]: FAQ, ISQ generation " f"time for page {page_no + 1}: " f"{end - start} seconds") error_ws2_msg = '' try: products = isq_json['product_name'] except Exception: products = [] print("Products:", products) if len(products) > 0: try: start = time.time() product_tags = \ generate_tags_json(context, products) end = time.time() print(f"[INFO]: Product tag " f"generation time for page {page_no + 1}: " f"{end - start} seconds") except Exception: print(f"[ERROR]: Error during generation " f"of product tags -" f" {str(traceback.format_exc())}") error_ws2_msg = error_ws2_msg product_tags = {} try: start = time.time() product_category = \ generate_category_json(context, products) end = time.time() print( f"[INFO]: Product category generation time " f"for page {page_no + 1}: " f"{end - start} seconds") except Exception: print( f"[ERROR]: Error during generation of" f" product category - " f"{str(traceback.format_exc())}") product_category = {} try: start = time.time() image_json = \ gij(page, products, product_tags, output_gcs_bucket) end = time.time() print(f"[INFO]: Image JSON generation " f"time for page {page_no + 1}:" f" {end - start} seconds") except Exception as error: print( f"[ERROR]: Error during generation " f"of product category - " f"{str(traceback.format_exc())} + {error}") error_ws2_msg = \ error_ws2_msg + "\n" \ + 'Product image json ' \ 'generation failed - {str(e)}' image_json = {} else: product_tags = {} product_category = {} image_json = {} start = time.time() json_output_response = \ update_output_json(json_output_response, faq_json, isq_json, image_json, product_category, product_tags, error_ws2_msg) end = time.time() print(f"[INFO]: Output for page {page_no + 1}: " f"updated in {end - start} seconds") except Exception as error: print(f"[ERROR]: Error during generating " f"final json - " f"{str(traceback.format_exc())} - {error}") final_json_path = f"{name_initials}_ouput.json" start = time.time() create_json_from_dict(final_json_path, json_output_response, output_gcs_bucket) end = time.time() print(f"[INFO]: Final output JSON " f"uploaded to GCS in {end - start} seconds") else: print("[INFO]: Please process text PDF's only!") return json_output_response except Exception as error: print(f"[ERROR]: Error for filename: " f"{filename} - {str(traceback.format_exc())} - {error}")