atr/tasks/svn.py (109 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import asyncio
import logging
import pathlib
from typing import Any, Final
import aiofiles.os
import aioshutil
import pydantic
import atr.revision as revision
import atr.tasks.checks as checks
_LOGGER: Final = logging.getLogger(__name__)
class SvnImport(pydantic.BaseModel):
"""Arguments for the task to import files from SVN."""
svn_url: str
revision: str
target_subdirectory: str | None
project_name: str
version_name: str
asf_uid: str
class SvnImportError(Exception):
"""Custom exception for SVN import failures."""
def __init__(self, message: str, details: dict[str, Any] | None = None) -> None:
super().__init__(message)
self.details = details or {}
@checks.with_model(SvnImport)
async def import_files(args: SvnImport) -> str | None:
"""Import files from SVN into a draft release candidate revision."""
try:
result_message = await _import_files_core(args)
return result_message
except SvnImportError as e:
_LOGGER.error(f"SVN import failed: {e.details}")
raise
except Exception:
_LOGGER.exception("Unexpected error during SVN import task")
raise
async def _import_files_core(args: SvnImport) -> str:
"""Core logic to perform the SVN export."""
_LOGGER.info(f"Starting SVN import for {args.project_name}-{args.version_name}")
# We have to use a temporary directory otherwise SVN thinks it's a pegged revision
temp_export_dir_name = ".svn-export.tmp"
async with revision.create_and_manage(args.project_name, args.version_name, args.asf_uid) as (
new_revision_dir,
new_revision_name,
):
_LOGGER.debug(f"Created revision directory: {new_revision_dir}")
final_target_path = new_revision_dir
if args.target_subdirectory:
final_target_path = new_revision_dir / args.target_subdirectory
# Validate that final_target_path is a subdirectory of new_revision_dir
if not final_target_path.is_relative_to(new_revision_dir):
raise SvnImportError(
f"Target subdirectory {args.target_subdirectory} is not a subdirectory of {new_revision_dir}"
)
await aiofiles.os.makedirs(final_target_path, exist_ok=True)
temp_export_path = new_revision_dir / temp_export_dir_name
svn_command = [
"svn",
"export",
"--non-interactive",
"--trust-server-cert-failures",
"unknown-ca,cn-mismatch",
"-r",
args.revision,
"--",
args.svn_url,
str(temp_export_path),
]
await _import_files_core_run_svn_export(svn_command, temp_export_path)
# Move files from temp export path to final target path
# We only have to do this to avoid the SVN pegged revision issue
_LOGGER.info(f"Moving exported files from {temp_export_path} to {final_target_path}")
for item_name in await aiofiles.os.listdir(temp_export_path):
source_item = temp_export_path / item_name
destination_item = final_target_path / item_name
try:
await aioshutil.move(str(source_item), str(destination_item))
except FileExistsError:
_LOGGER.warning(f"Item {destination_item} already exists, skipping move for {item_name}")
except Exception as move_err:
_LOGGER.error(f"Error moving {source_item} to {destination_item}: {move_err}")
await aiofiles.os.rmdir(temp_export_path)
_LOGGER.info(f"Removed temporary export directory: {temp_export_path}")
return f"Successfully imported files from SVN into revision {new_revision_name}"
async def _import_files_core_run_svn_export(svn_command: list[str], temp_export_path: pathlib.Path) -> None:
"""Execute the svn export command and handle errors."""
_LOGGER.info(f"Executing SVN command: {' '.join(svn_command)}")
timeout_seconds = 600
try:
process = await asyncio.create_subprocess_exec(
*svn_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout_seconds)
stdout_str = stdout.decode("utf-8", errors="ignore").strip() if stdout else ""
stderr_str = stderr.decode("utf-8", errors="ignore").strip() if stderr else ""
if process.returncode != 0:
_LOGGER.error(f"SVN export failed with code {process.returncode}")
_LOGGER.error(f"SVN stderr: {stderr_str}")
_LOGGER.error(f"SVN stdout: {stdout_str[:1000]}")
raise SvnImportError(
f"SVN export failed with code {process.returncode}",
details={"returncode": process.returncode, "stderr": stderr_str, "stdout": stdout_str[:1000]},
)
_LOGGER.info("SVN export to temporary directory successful")
if stdout_str:
_LOGGER.debug(f"SVN stdout: {stdout_str}")
if stderr_str:
_LOGGER.warning(f"SVN stderr: {stderr_str}")
except TimeoutError:
_LOGGER.error("SVN export command timed out after %d seconds", timeout_seconds)
raise SvnImportError("SVN export command timed out")
except FileNotFoundError:
_LOGGER.error("svn command not found. Is it installed and in PATH?")
raise SvnImportError("svn command not found")
except Exception as e:
_LOGGER.exception("Unexpected error during SVN export subprocess execution")
raise SvnImportError(f"Unexpected error during SVN export: {e}")