atr/tasks/checks/targz.py (72 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import asyncio
import logging
import tarfile
from typing import Final
import atr.tasks.checks as checks
_LOGGER: Final = logging.getLogger(__name__)
async def integrity(args: checks.FunctionArguments) -> str | None:
"""Check the integrity of a .tar.gz file."""
recorder = await args.recorder()
if not (artifact_abs_path := await recorder.abs_path()):
return None
_LOGGER.info(f"Checking integrity for {artifact_abs_path} (rel: {args.primary_rel_path})")
chunk_size = 4096
try:
size = await asyncio.to_thread(_integrity_core, str(artifact_abs_path), chunk_size)
await recorder.success("Able to read all entries of the archive using tarfile", {"size": size})
except Exception as e:
await recorder.failure("Unable to read all entries of the archive using tarfile", {"error": str(e)})
return None
def root_directory(tgz_path: str) -> str:
"""Find the root directory in a tar archive and validate that it has only one root dir."""
root = None
with tarfile.open(tgz_path, mode="r|gz") as tf:
for member in tf:
if member.name and member.name.split("/")[-1].startswith("._"):
# Metadata convention
continue
parts = member.name.split("/", 1)
if len(parts) >= 1:
if not root:
root = parts[0]
elif parts[0] != root:
raise ValueError(f"Multiple root directories found: {root}, {parts[0]}")
if not root:
raise ValueError("No root directory found in archive")
return root
async def structure(args: checks.FunctionArguments) -> str | None:
"""Check the structure of a .tar.gz file."""
recorder = await args.recorder()
if not (artifact_abs_path := await recorder.abs_path()):
return None
filename = artifact_abs_path.name
expected_root: Final[str] = (
filename.removesuffix(".tar.gz") if filename.endswith(".tar.gz") else filename.removesuffix(".tgz")
)
_LOGGER.info(
f"Checking structure for {artifact_abs_path} (expected root: {expected_root}) (rel: {args.primary_rel_path})"
)
try:
root = await asyncio.to_thread(root_directory, str(artifact_abs_path))
if root == expected_root:
await recorder.success(
"Archive contains exactly one root directory matching the expected name",
{"root": root, "expected": expected_root},
)
else:
await recorder.warning(
f"Root directory '{root}' does not match expected name '{expected_root}'",
{"root": root, "expected": expected_root},
)
except Exception as e:
await recorder.failure("Unable to verify archive structure", {"error": str(e)})
return None
def _integrity_core(tgz_path: str, chunk_size: int = 4096) -> int:
"""Verify a .tar.gz file and compute its uncompressed size."""
total_size = 0
with tarfile.open(tgz_path, mode="r|gz") as tf:
for member in tf:
# Do not skip metadata here
total_size += member.size
# Verify file by extraction
if member.isfile():
f = tf.extractfile(member)
if f is not None:
while True:
data = f.read(chunk_size)
if not data:
break
return total_size