in packages/constructs/L3/ai/gaia-l3-construct/lib/shared/file-import-batch-job/main.py [0:0]
def main():
logger.info("Starting file converter batch job")
logger.info("Workspace ID: {}".format(WORKSPACE_ID))
logger.info("Document ID: {}".format(DOCUMENT_ID))
logger.info("Input bucket name: {}".format(INPUT_BUCKET_NAME))
logger.info("Input object key: {}".format(INPUT_OBJECT_KEY))
logger.info("Output bucket name: {}".format(PROCESSING_BUCKET_NAME))
logger.info("Output object key: {}".format(PROCESSING_OBJECT_KEY))
workspace = genai_core.workspaces.get_workspace(WORKSPACE_ID)
if not workspace:
raise genai_core.types.CommonError(f"Workspace {WORKSPACE_ID} does not exist")
document = genai_core.documents.get_document(WORKSPACE_ID, DOCUMENT_ID)
if not document:
raise genai_core.types.CommonError(
f"Document {WORKSPACE_ID}/{DOCUMENT_ID} does not exist"
)
try:
extension = os.path.splitext(INPUT_OBJECT_KEY)[-1].lower()
if extension == ".txt":
object = s3_client.get_object(
Bucket=INPUT_BUCKET_NAME, Key=INPUT_OBJECT_KEY
)
content = object["Body"].read().decode("utf-8")
if (
INPUT_BUCKET_NAME != PROCESSING_BUCKET_NAME
and INPUT_OBJECT_KEY != PROCESSING_OBJECT_KEY
):
s3_client.put_object(
Bucket=PROCESSING_BUCKET_NAME, Key=PROCESSING_OBJECT_KEY, Body=content
)
add_chunks(workspace, document, content)
elif extension == ".json" and "frequently-asked-questions" in INPUT_OBJECT_KEY:
object = s3_client.get_object(
Bucket=INPUT_BUCKET_NAME, Key=INPUT_OBJECT_KEY
)
content = object["Body"].read().decode("utf-8")
content = json.loads(content)
existing_faq_map = genai_core.documents.get_redirection_text_documents_map(WORKSPACE_ID)
for key, value in content.items():
question = key
response = value
existing_id = existing_faq_map.get(question.lower())
if not existing_id:
result = genai_core.documents.create_document(
workspace_id=WORKSPACE_ID,
document_type="qna",
document_sub_type="redirection",
title=question,
content=question,
content_complement=response,
)
else:
result = genai_core.documents.create_document(
workspace_id=WORKSPACE_ID,
document_type="qna",
document_sub_type="redirection",
title=question,
content=question,
content_complement=response,
document_id=existing_id
)
genai_core.documents.set_status(
result.get('workspace_id'), result.get("document_id"), "processed"
)
else:
loader = S3FileLoader(INPUT_BUCKET_NAME, INPUT_OBJECT_KEY)
logger.info(f"loader: {loader}")
docs = loader.load()
content = docs[0].page_content
if (
INPUT_BUCKET_NAME != PROCESSING_BUCKET_NAME
and INPUT_OBJECT_KEY != PROCESSING_OBJECT_KEY
):
s3_client.put_object(
Bucket=PROCESSING_BUCKET_NAME, Key=PROCESSING_OBJECT_KEY, Body=content
)
add_chunks(workspace, document, content)
except Exception as error:
genai_core.documents.set_status(WORKSPACE_ID, DOCUMENT_ID, "error")
logger.error(error)
raise error