def chunk_document_multimodal()

in components/llm_service/src/services/query/data_source.py [0:0]


  def chunk_document_multimodal(self,
                           doc_name: str,
                           doc_url: str,
                           doc_filepath: str
                           ) -> list[object]:
    """
    Process a file document into multimodal chunks (b64 and text) for embeddings

    Args:
       doc_name: file name of document
       doc_url: remote url of document
       doc_filepath: local file path of document
    Returns:
       array where each item is an object representing a page of the document
       contains two properties for image b64 data & text chunks 
       or None if the document could not be processed
    """
    Logger.info(f"generating index data for {doc_name}")

    # Confirm that this is a valid file type
    allowed_image_types = ["png", "jpeg", "jpg", "bmp", "gif"]
    try:
      doc_extension = doc_name.split(".")[-1]
      doc_extension = doc_extension.lower()
      if (doc_extension != "pdf" and
          doc_extension != "txt" and
          doc_extension not in allowed_image_types):
        raise ValueError(f"{doc_name} must be a PDF, TXT, "
                         f"PNG, JPG, BMP, or GIF")
      # TODO: Insert elif statements to check for additional types of
      # videos (AVI, MP4, MOV, etc), and audio (MP3, WAV, etc)
    except Exception as e:
      Logger.error(f"error reading doc {doc_name}: {e}")

    doc_chunks = []
    try:

      # Get bucket name & the doc file path within bucket
      if doc_url.startswith("https://storage.googleapis.com/"):
        bucket_parts = unquote(
          doc_url.split("https://storage.googleapis.com/")[1]).split("/")
      elif doc_url.startswith("gs://"):
        bucket_parts = unquote(doc_url.split("gs://")[1]).split("/")
      else:
        raise ValueError(f"Invalid Doc URL: {doc_url}")

      bucket_name = bucket_parts[0]
      filepath_in_bucket = "/".join(bucket_parts[1:])

      if filepath_in_bucket.startswith(GENIE_FOLDER_MARKER):
        # if this is true this file was created by genie as a chunk of another
        # file and should not be processed
        return []

      # Determine bucket folder for document chunks that require storage
      # The folder is marked as a genie folder and uses a hash of the
      # document
      chunk_bucket_folder = (f"{GENIE_FOLDER_MARKER}/"
                             f"{get_file_hash(doc_filepath)}")

      # If doc is a PDF, convert it to an array of PNGs for each page
      allowed_image_types = ["png", "jpg", "jpeg", "bmp", "gif"]
      if doc_extension == "pdf":

        with tempfile.TemporaryDirectory() as path:
          png_array = convert_from_path(doc_filepath, output_folder=path)

        # Open PDF and iterate over pages
        with open(doc_filepath, "rb") as f:
          reader = PdfReader(f)
          num_pages = len(reader.pages)
          Logger.info(f"Reading pdf doc {doc_name} with {num_pages} pages")
          for i in range(num_pages):
            # Create a pdf file for the page and chunk into contextual_text
            pdf_doc = self.create_pdf_page(reader.pages[i], doc_filepath, i)
            contextual_text = self.extract_contextual_text(pdf_doc["filename"],
                                                  pdf_doc["filepath"], doc_url)

            # Take PNG version of page and convert to b64
            png_doc_filepath = \
              ".png".join(pdf_doc["filepath"].rsplit(".pdf", 1))
            png_array[i].save(png_doc_filepath, format="png")
            png_b64 = self.extract_b64(png_doc_filepath)

            # Upload to Google Cloud Bucket and return gs URL
            png_url = gcs_helper.upload_to_gcs(self.storage_client,
                                               bucket_name,
                                               png_doc_filepath,
                                               chunk_bucket_folder)

            # Clean up temp files
            os.remove(pdf_doc["filepath"])
            os.remove(png_doc_filepath)

            # Push chunk object into chunk array
            chunk_obj = {
              "image": png_b64,
              "image_url": png_url,
              "text": contextual_text
            }
            doc_chunks.append(chunk_obj)
      elif doc_extension in allowed_image_types:
        # TODO: Convert image file into something text readable (pdf, html, ext)
        # So that we can extract text chunks

        # Get text associated with the document
        contextual_text = self.extract_contextual_text(doc_name,
                                          doc_filepath, doc_url)

        # Get b64 for the document
        image_b64 = self.extract_b64(doc_filepath)

        # Push chunk object into chunk array
        chunk_obj = {
          "image": image_b64,
          "image_url": doc_url,
          "text": contextual_text
        }
        doc_chunks.append(chunk_obj)

      elif doc_extension == "txt":
        # Chunk text in document
        text_chunks = self.chunk_document(doc_name,
                                          doc_url,
                                          doc_filepath,
                                          )
        for text_chunk in text_chunks:
          #TODO: Consider all characters in text_chunk,
          #not just the first 1024
          #As of Nov 2024, multimodalembedding@001 API throws error if
          #text input argument >1024 characters
          text_chunk = text_chunk[0:1023]
          # Push chunk object into chunk array
          chunk_obj = {
            "image": None,
            "image_url": None,
            "text": text_chunk,
          }
          doc_chunks.append(chunk_obj)

      # TODO: Insert elif statements to chunk additional types of
      # videos (AVI, MP4, MOV, etc), and audio (MP3, WAV, etc)
      # - For images, set "image" and "text" fields of chunk_obj
      # - For video and audio, set "timestamp_start" and "timestamp_stop"
      # fields of chunk_obj

    except Exception as e:
      Logger.error(f"error processing doc {doc_name}: {e}")
      Logger.error(traceback.format_exc())

    # Return array of page data
    return doc_chunks