in source/lambda/wf_publish_topic_model/lambda_function.py [0:0]
def is_file_available(event, source_prefix, file_to_extract):
if event.get(source_prefix, None):
s3_uri_parse = urlparse(event[source_prefix]["OutputDataConfig"]["S3Uri"])
bucket = s3_uri_parse.netloc
key = s3_uri_parse.path.lstrip("/")
logger.debug("Bucket is " + bucket + " and key is " + key)
file_name = os.path.basename(key)
logger.debug("File name is " + file_name)
try:
"""
Lambda functions provide a /tmp directory to store temporary files.
This is not the same /tmp as on a conventional unix OR linux
system. Hence suppressing the rule
"""
s3.download_file(bucket, key, TMP_DIR + file_name)
logger.debug(file_name + " downloaded from S3 bucket")
if tarfile.is_tarfile(TMP_DIR + file_name):
# This archive is generated by AWS Comprehend Topic Modeling job and stored in an S3 bucket.
# The bucket permissions only allow the comprehend job and lambda function to read/ write from it
archive_file = tarfile.open(TMP_DIR + file_name)
file_list = archive_file.getnames()
logger.debug(f"File list length is {len(file_list)} and files in the archive {file_list}")
if len(file_list) != 2 and not ("doc-topics.csv" in file_list and "doc-terms.csv" in file_list):
raise IncorrectTarFileException(
"Either number of files in the archive are not 2 or file names are not as expected in the archive. May not be a valid archive"
)
archive_file.extractall(TMP_DIR, member_file_to_extract(archive_file, file_to_extract))
archive_file.close()
logger.debug(f"Extraction complete. Files in the directory are {os.listdir(TMP_DIR)}")
return True
except Exception as e:
logger.error(f"Error occured when processing topics: ${str(e)}")
raise e
else:
logger.error(f"Ingestion source prefix information not available in event to process data")
raise IngestionSourcePrefixMissingError(
"Ingestion source prefix information not available in event to process data"
)