packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-components/backend/python/embedding/loaders/s3.py (78 lines of code) (raw):

import os import tempfile import logging import boto3 from distutils.util import strtobool from embedding.loaders.base import BaseLoader, Document from unstructured.partition.auto import partition from unstructured.partition.pdf import partition_pdf logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class S3FileLoader(BaseLoader): """Loads a document from a file in S3. Reference: `langchain_community.document_loaders.S3FileLoader` class """ def __init__( self, bucket: str, key: str, mode: str = "single", enable_partition_pdf: bool = False, ): self.bucket = bucket self.key = key self.mode = mode self.enable_partition_pdf = enable_partition_pdf def _get_elements(self) -> list: """Get elements.""" s3 = boto3.client("s3") with tempfile.TemporaryDirectory() as temp_dir: file_path = f"{temp_dir}/{self.key}" os.makedirs(os.path.dirname(file_path), exist_ok=True) s3.download_file(self.bucket, self.key, file_path) extension = os.path.splitext(file_path)[1] if extension == ".pdf" and self.enable_partition_pdf == True: logger.info(f"Start partitioning using hi-resolution mode: {file_path}") return partition_pdf( filename=file_path, strategy="hi_res", infer_table_structure=True, extract_images_in_pdf=False, ) else: logger.info(f"Start partitioning using auto mode: {file_path}") return partition(filename=file_path) def _get_metadata(self) -> dict: return {"source": f"s3://{self.bucket}/{self.key}"} def load(self) -> list[Document]: """Load file.""" elements = self._get_elements() if self.mode == "elements": docs: list[Document] = list() for element in elements: metadata = self._get_metadata() if hasattr(element, "metadata"): metadata.update(element.metadata.to_dict()) if hasattr(element, "category"): metadata["category"] = element.category docs.append(Document(page_content=str(element), metadata=metadata)) elif self.mode == "paged": text_dict: dict[int, str] = {} meta_dict: dict[int, dict] = {} for idx, element in enumerate(elements): metadata = self._get_metadata() if hasattr(element, "metadata"): metadata.update(element.metadata.to_dict()) page_number = metadata.get("page_number", 1) # Check if this page_number already exists in docs_dict if page_number not in text_dict: # If not, create new entry with initial text and metadata text_dict[page_number] = str(element) + "\n\n" meta_dict[page_number] = metadata else: # If exists, append to text and update the metadata text_dict[page_number] += str(element) + "\n\n" meta_dict[page_number].update(metadata) # Convert the dict to a list of Document objects docs = [ Document(page_content=text_dict[key], metadata=meta_dict[key]) for key in text_dict.keys() ] elif self.mode == "single": metadata = self._get_metadata() text = "\n\n".join([str(el) for el in elements]) docs = [Document(page_content=text, metadata=metadata)] else: raise ValueError(f"mode of {self.mode} not supported.") return docs