skills/contextual-embeddings/contextual-rag-lambda-function/s3_adapter.py (39 lines of code) (raw):

import json import boto3 import os from botocore.exceptions import ClientError class S3Adapter: def __init__(self): # Create an S3 client self.s3_client = boto3.client('s3') def write_output_to_s3(self, bucket_name, file_name, json_data): """ Write a JSON object to an S3 bucket :param bucket_name: Name of the S3 bucket :param file_name: Name of the file to be created in the bucket :param json_data: JSON object to be written :return: True if file was uploaded, else False """ try: # Convert JSON object to string json_string = json.dumps(json_data) # Upload the file response = self.s3_client.put_object( Bucket=bucket_name, Key=file_name, Body=json_string, ContentType='application/json' ) # Check if the upload was successful if response['ResponseMetadata']['HTTPStatusCode'] == 200: print(f"Successfully uploaded {file_name} to {bucket_name}") return True else: print(f"Failed to upload {file_name} to {bucket_name}") return False except ClientError as e: print(f"Error occurred: {e}") return False def read_from_s3(self, bucket_name, file_name): """ Write a JSON object to an S3 bucket :param bucket_name: Name of the S3 bucket :param file_name: Name of the file to be created in the bucket :return: True if file was uploaded, else False """ try: # Get the object from S3 response = self.s3_client.get_object(Bucket=bucket_name, Key=file_name) # Read the content of the file return json.loads(response['Body'].read().decode('utf-8')) except ClientError as e: print(f"Error reading file from S3: {str(e)}") def parse_s3_path(self, s3_path): # Remove 's3://' prefix if present s3_path = s3_path.replace('s3://', '') # Split the path into bucket and key parts = s3_path.split('/', 1) if len(parts) != 2: raise ValueError("Invalid S3 path format") bucket_name = parts[0] file_key = parts[1] return bucket_name, file_key