def end_to_end_pipeline()

in tools/genai-prod-catalog-enrichment/main.py [0:0]


def end_to_end_pipeline(input_pdf_uri, output_gcs_bucket, project_id):
    # csv_folder = "csv"
    pdf_folder = "pdf_files"
    # date_ = get_todays_date()
    # os.makedirs(csv_folder, exist_ok=True)
    os.makedirs(pdf_folder, exist_ok=True)

    try:
        start = time.time()
        # csv_data = get_csv_info(csv_gcs_uri, bucket_name, csv_folder)
        end = time.time()
        print(f"[INFO]: CSV data fetched "
              f"successfully in {end - start} seconds")
    except Exception as e:
        print(f"[ERROR]: Error "
              f"during fetching data from CSV - {e}")
        return None

    uri = input_pdf_uri

    # check if file name exists
    full_path = uri.replace("gs://", "")
    input_gcs_bucket = full_path.split("/")[0]
    filename = full_path.replace(f"{input_gcs_bucket}/", "")
    name = filename.split("/")[-1]
    output_gcs_bucket = output_gcs_bucket

    try:
        pdf_type = check_pdf_type(filename, input_gcs_bucket)
        if not pdf_type["scanned"]:
            start = time.time()
            pdf_json = parse_pdf(filename, input_gcs_bucket, output_gcs_bucket)
            end = time.time()
            print(f"[INFO]: Parsed PDF successfully in {end - start} seconds")

            pdf_json = get_specific_caption(pdf_json)
            name_initials = name.replace(".pdf", "")
            json_path = f"./{name_initials}.json"
            create_json_from_dict(json_path, pdf_json, output_gcs_bucket)
            pdf_json = clean_text(pdf_json)
            json_output_response = {
                "pdf_name": f"{filename.split('/')[-1]}",
                "pdf_url": uri,  # f"gs://{input_gcs_bucket}/{filename}",
                "company_details": {},
                "products": [],
                "catalogue_faqs": []
                # "pc_item_doc_id": pc_item_doc_id,
                # "pc_doc_modified_date": pc_doc_modified_date,
                # "pc_item_doc_path": pc_item_doc_path,
                # "fk_pc_item_id": fk_pc_item_id,
                # "fk_glusr_usr_id": fk_glusr_usr_id
            }
            print(json_output_response, "\n")
            start = time.time()
            company_text = get_company_text(pdf_json['pages'])
            if company_text:
                company_details = \
                    generate_company_details(company_text, project_id)
                if "company_details" in company_details:
                    json_output_response["company_details"] = \
                        company_details["company_details"]
                elif "response_error" in company_details:
                    json_output_response["company_details"] = \
                        company_details
                else:
                    json_output_response["company_details"] = {}
            end = time.time()
            print(f"[INFO]: Company details "
                  f"extraction time: {end - start} seconds")
            for page_no, page in enumerate(pdf_json['pages']):
                try:
                    print(str(page_no))
                    context = page['texts']['full_text']
                    start = time.time()
                    faq_json = generate_faqs(context, project_id)
                    isq_json = generate_isqs(context, project_id)
                    end = time.time()
                    print(f"[INFO]: FAQ, ISQ generation "
                          f"time for page {page_no + 1}: "
                          f"{end - start} seconds")

                    error_ws2_msg = ''
                    try:
                        products = isq_json['product_name']
                    except Exception:
                        products = []
                    print("Products:", products)
                    if len(products) > 0:
                        try:
                            start = time.time()
                            product_tags = \
                                generate_tags_json(context, products)
                            end = time.time()
                            print(f"[INFO]: Product tag "
                                  f"generation time for page {page_no + 1}: "
                                  f"{end - start} seconds")
                        except Exception:
                            print(f"[ERROR]: Error during generation "
                                  f"of product tags -"
                                  f" {str(traceback.format_exc())}")
                            error_ws2_msg = error_ws2_msg
                            product_tags = {}
                        try:
                            start = time.time()
                            product_category = \
                                generate_category_json(context, products)
                            end = time.time()
                            print(
                                f"[INFO]: Product category generation time "
                                f"for page {page_no + 1}: "
                                f"{end - start} seconds")
                        except Exception:
                            print(
                                f"[ERROR]: Error during generation of"
                                f" product category - "
                                f"{str(traceback.format_exc())}")
                            product_category = {}

                        try:
                            start = time.time()
                            image_json = \
                                gij(page, products,
                                    product_tags, output_gcs_bucket)
                            end = time.time()
                            print(f"[INFO]: Image JSON generation "
                                  f"time for page {page_no + 1}:"
                                  f" {end - start} seconds")
                        except Exception as error:
                            print(
                                f"[ERROR]: Error during generation "
                                f"of product category - "
                                f"{str(traceback.format_exc())} + {error}")
                            error_ws2_msg = \
                                error_ws2_msg + "\n" \
                                + 'Product image json ' \
                                  'generation failed - {str(e)}'
                            image_json = {}
                    else:
                        product_tags = {}
                        product_category = {}
                        image_json = {}

                    start = time.time()
                    json_output_response = \
                        update_output_json(json_output_response,
                                           faq_json, isq_json, image_json,
                                           product_category,
                                           product_tags, error_ws2_msg)
                    end = time.time()
                    print(f"[INFO]: Output for page {page_no + 1}: "
                          f"updated in {end - start} seconds")
                except Exception as error:
                    print(f"[ERROR]: Error during generating "
                          f"final json - "
                          f"{str(traceback.format_exc())} - {error}")

            final_json_path = f"{name_initials}_ouput.json"
            start = time.time()
            create_json_from_dict(final_json_path,
                                  json_output_response,
                                  output_gcs_bucket)
            end = time.time()
            print(f"[INFO]: Final output JSON "
                  f"uploaded to GCS in {end - start} seconds")
        else:
            print("[INFO]: Please process text PDF's only!")

        return json_output_response
    except Exception as error:
        print(f"[ERROR]: Error for filename: "
              f"{filename} - {str(traceback.format_exc())} - {error}")