def is_file_available()

in source/lambda/wf_publish_topic_model/lambda_function.py [0:0]


def is_file_available(event, source_prefix, file_to_extract):
    if event.get(source_prefix, None):
        s3_uri_parse = urlparse(event[source_prefix]["OutputDataConfig"]["S3Uri"])
        bucket = s3_uri_parse.netloc
        key = s3_uri_parse.path.lstrip("/")
        logger.debug("Bucket is " + bucket + " and key is " + key)
        file_name = os.path.basename(key)
        logger.debug("File name is " + file_name)
        try:
            """
            Lambda functions provide a /tmp directory to store temporary files.
            This is not the same /tmp as on a conventional unix OR linux
            system. Hence suppressing the rule
            """
            s3.download_file(bucket, key, TMP_DIR + file_name)
            logger.debug(file_name + " downloaded from S3 bucket")
            if tarfile.is_tarfile(TMP_DIR + file_name):
                # This archive is generated by AWS Comprehend Topic Modeling job and stored in an S3 bucket.
                # The bucket permissions only allow the comprehend job and lambda function to read/ write from it
                archive_file = tarfile.open(TMP_DIR + file_name)
                file_list = archive_file.getnames()
                logger.debug(f"File list length is {len(file_list)} and files in the archive {file_list}")
                if len(file_list) != 2 and not ("doc-topics.csv" in file_list and "doc-terms.csv" in file_list):
                    raise IncorrectTarFileException(
                        "Either number of files in the archive are not 2 or file names are not as expected in the archive. May not be a valid archive"
                    )
                archive_file.extractall(TMP_DIR, member_file_to_extract(archive_file, file_to_extract))
            archive_file.close()
            logger.debug(f"Extraction complete. Files in the directory are {os.listdir(TMP_DIR)}")
            return True
        except Exception as e:
            logger.error(f"Error occured when processing topics: ${str(e)}")
            raise e
    else:
        logger.error(f"Ingestion source prefix information not available in event to process data")
        raise IngestionSourcePrefixMissingError(
            "Ingestion source prefix information not available in event to process data"
        )