in skills/contextual-embeddings/contextual-rag-lambda-function/s3_adapter.py [0:0]
def write_output_to_s3(self, bucket_name, file_name, json_data):
"""
Write a JSON object to an S3 bucket
:param bucket_name: Name of the S3 bucket
:param file_name: Name of the file to be created in the bucket
:param json_data: JSON object to be written
:return: True if file was uploaded, else False
"""
try:
# Convert JSON object to string
json_string = json.dumps(json_data)
# Upload the file
response = self.s3_client.put_object(
Bucket=bucket_name,
Key=file_name,
Body=json_string,
ContentType='application/json'
)
# Check if the upload was successful
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f"Successfully uploaded {file_name} to {bucket_name}")
return True
else:
print(f"Failed to upload {file_name} to {bucket_name}")
return False
except ClientError as e:
print(f"Error occurred: {e}")
return False