atr/tasks/sbom.py (169 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import asyncio
import json
import logging
import os
import tarfile
from typing import Any, Final
import aiofiles
import pydantic
import atr.config as config
import atr.tasks.checks as checks
import atr.tasks.checks.targz as targz
import atr.util as util
_CONFIG: Final = config.get()
_LOGGER: Final = logging.getLogger(__name__)
class GenerateCycloneDX(pydantic.BaseModel):
"""Arguments for the task to generate a CycloneDX SBOM."""
artifact_path: str = pydantic.Field(..., description="Absolute path to the artifact")
output_path: str = pydantic.Field(..., description="Absolute path where the generated SBOM JSON should be written")
class SBOMGenerationError(Exception):
"""Custom exception for SBOM generation failures."""
def __init__(self, message: str, details: dict[str, Any] | None = None) -> None:
super().__init__(message)
self.details = details or {}
def archive_extract_safe(
archive_path: str,
extract_dir: str,
max_size: int,
chunk_size: int,
) -> int:
"""Safe archive extraction."""
total_extracted = 0
try:
with tarfile.open(archive_path, mode="r|gz") as tf:
for member in tf:
if member.name and member.name.split("/")[-1].startswith("._"):
# Metadata convention
continue
# Skip anything that's not a file or directory
if not (member.isreg() or member.isdir()):
continue
# Check whether extraction would exceed the size limit
if member.isreg() and ((total_extracted + member.size) > max_size):
raise SBOMGenerationError(
f"Extraction would exceed maximum size limit of {max_size} bytes",
{"max_size": max_size, "current_size": total_extracted, "file_size": member.size},
)
# Extract directories directly
if member.isdir():
# Ensure the path is safe before extracting
target_path = os.path.join(extract_dir, member.name)
if not os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
_LOGGER.warning(f"Skipping potentially unsafe path: {member.name}")
continue
tf.extract(member, extract_dir, numeric_owner=True)
continue
if member.isreg():
extracted_size = _archive_extract_safe_process_file(
tf, member, extract_dir, total_extracted, max_size, chunk_size
)
total_extracted += extracted_size
# TODO: Add other types here
except tarfile.ReadError as e:
raise SBOMGenerationError(f"Failed to read archive: {e}", {"archive_path": archive_path}) from e
return total_extracted
@checks.with_model(GenerateCycloneDX)
async def generate_cyclonedx(args: GenerateCycloneDX) -> str | None:
"""Generate a CycloneDX SBOM for the given artifact and write it to the output path."""
try:
result_data = await _generate_cyclonedx_core(args.artifact_path, args.output_path)
_LOGGER.info(f"Successfully generated CycloneDX SBOM for {args.artifact_path}")
msg = result_data["message"]
if not isinstance(msg, str):
raise SBOMGenerationError(f"Invalid message type: {type(msg)}")
return msg
except SBOMGenerationError as e:
_LOGGER.error(f"SBOM generation failed for {args.artifact_path}: {e.details}")
raise
def _archive_extract_safe_process_file(
tf: tarfile.TarFile,
member: tarfile.TarInfo,
extract_dir: str,
total_extracted: int,
max_size: int,
chunk_size: int,
) -> int:
"""Process a single file member during safe archive extraction."""
target_path = os.path.join(extract_dir, member.name)
if not os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
_LOGGER.warning(f"Skipping potentially unsafe path: {member.name}")
return 0
os.makedirs(os.path.dirname(target_path), exist_ok=True)
source = tf.extractfile(member)
if source is None:
# Should not happen if member.isreg() is true
_LOGGER.warning(f"Could not extract file object for member: {member.name}")
return 0
extracted_file_size = 0
try:
with open(target_path, "wb") as target:
while chunk := source.read(chunk_size):
target.write(chunk)
extracted_file_size += len(chunk)
# Check size limits during extraction
if (total_extracted + extracted_file_size) > max_size:
# Clean up the partial file before raising
target.close()
os.unlink(target_path)
raise SBOMGenerationError(
f"Extraction exceeded maximum size limit of {max_size} bytes",
{"max_size": max_size, "current_size": total_extracted},
)
finally:
source.close()
return extracted_file_size
async def _generate_cyclonedx_core(artifact_path: str, output_path: str) -> dict[str, Any]:
"""Core logic to generate CycloneDX SBOM, raising SBOMGenerationError on failure."""
_LOGGER.info(f"Generating CycloneDX SBOM for {artifact_path} -> {output_path}")
async with util.async_temporary_directory(prefix="cyclonedx_sbom_") as temp_dir:
_LOGGER.info(f"Created temporary directory: {temp_dir}")
# Find and validate the root directory
try:
root_dir = await asyncio.to_thread(targz.root_directory, artifact_path)
except ValueError as e:
raise SBOMGenerationError(f"Archive root directory issue: {e}", {"artifact_path": artifact_path}) from e
except Exception as e:
raise SBOMGenerationError(
f"Failed to determine archive root directory: {e}", {"artifact_path": artifact_path}
) from e
extract_dir = os.path.join(temp_dir, root_dir)
# Extract the archive to the temporary directory
# TODO: Ideally we'd have task dependencies or archive caching
_LOGGER.info(f"Extracting {artifact_path} to {temp_dir}")
extracted_size = await asyncio.to_thread(
archive_extract_safe,
artifact_path,
str(temp_dir),
max_size=_CONFIG.MAX_EXTRACT_SIZE,
chunk_size=_CONFIG.EXTRACT_CHUNK_SIZE,
)
_LOGGER.info(f"Extracted {extracted_size} bytes into {extract_dir}")
# Run syft to generate the CycloneDX SBOM
syft_command = ["syft", extract_dir, "-o", "cyclonedx-json"]
_LOGGER.info(f"Running syft: {' '.join(syft_command)}")
try:
process = await asyncio.create_subprocess_exec(
*syft_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
stdout_str = stdout.decode("utf-8").strip() if stdout else ""
stderr_str = stderr.decode("utf-8").strip() if stderr else ""
if process.returncode != 0:
_LOGGER.error(f"syft command failed with code {process.returncode}")
_LOGGER.error(f"syft stderr: {stderr_str}")
_LOGGER.error(f"syft stdout: {stdout_str[:1000]}...")
raise SBOMGenerationError(
f"syft command failed with code {process.returncode}",
{"returncode": process.returncode, "stderr": stderr_str, "stdout": stdout_str[:1000]},
)
# Parse the JSON output from syft
try:
sbom_data = json.loads(stdout_str)
_LOGGER.info(f"Successfully parsed syft output for {artifact_path}")
# Write the SBOM data to the specified output path
try:
async with aiofiles.open(output_path, "w", encoding="utf-8") as f:
await f.write(json.dumps(sbom_data, indent=2))
_LOGGER.info(f"Successfully wrote SBOM to {output_path}")
except Exception as write_err:
_LOGGER.exception(f"Failed to write SBOM JSON to {output_path}: {write_err}")
raise SBOMGenerationError(f"Failed to write SBOM to {output_path}: {write_err}") from write_err
return {
"message": "Successfully generated and saved CycloneDX SBOM",
"sbom": sbom_data,
"format": "CycloneDX",
"components": len(sbom_data.get("components", [])),
}
except json.JSONDecodeError as e:
_LOGGER.error(f"Failed to parse syft output as JSON: {e}")
raise SBOMGenerationError(
f"Failed to parse syft output: {e}",
{"error": str(e), "syft_output": stdout_str[:1000]},
) from e
except TimeoutError:
_LOGGER.error("syft command timed out after 5 minutes")
raise SBOMGenerationError("syft command timed out after 5 minutes")
except FileNotFoundError:
_LOGGER.error("syft command not found. Is it installed and in PATH?")
raise SBOMGenerationError("syft command not found")