in assets/lambda/code/scan/lambda.py [0:0]
def expand_if_large_archive(input_bucket, input_key, download_path, byte_size):
"""Expand the file if it is an archival type and larger than ClamAV Max Size"""
if byte_size > MAX_BYTES:
file_name = f"{download_path}/{input_key}"
try:
command = ["7za", "x", "-y", f"{file_name}", f"-o{download_path}"]
archive_summary = subprocess.run(
command,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
)
if archive_summary.returncode not in [0, 1]:
raise ArchiveException(
f"7za exited with unexpected code: {archive_summary.returncode}."
)
delete(download_path, input_key)
large_file_list = []
for root, dirs, files in os.walk(download_path, topdown=False):
for name in files:
size = os.path.getsize(os.path.join(root, name))
if size > MAX_BYTES:
large_file_list.append(name)
if large_file_list:
raise FileTooBigException(
f"Archive {input_key} contains files {large_file_list} "
f"which are at greater than ClamAV max of {MAX_BYTES} bytes"
)
except subprocess.CalledProcessError as e:
report_failure(
input_bucket, input_key, download_path, str(e.stderr)
)
except ArchiveException as e:
report_failure(input_bucket, input_key, download_path, e.message)
except FileTooBigException as e:
report_failure(input_bucket, input_key, download_path, e.message)
else:
return