atr/routes/upload.py (110 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import asyncio import logging import pathlib from collections.abc import Sequence import aiofiles import quart import werkzeug.datastructures as datastructures import werkzeug.wrappers.response as response import wtforms import atr.db as db import atr.revision as revision import atr.routes as routes import atr.routes.compose as compose import atr.util as util class SvnImportForm(util.QuartFormTyped): """Form for importing files from SVN into a draft.""" svn_url = wtforms.URLField( "SVN URL", validators=[ wtforms.validators.InputRequired("SVN URL is required."), wtforms.validators.URL(require_tld=False), ], description="The URL to the public SVN directory", ) revision = wtforms.StringField( "Revision", default="HEAD", validators=[], description="Specify an SVN revision number or leave as HEAD for the latest", ) target_subdirectory = wtforms.StringField( "Target subdirectory", validators=[], description="Subdirectory to place imported files, defaulting to the root (optional)", ) submit = wtforms.SubmitField("Queue SVN import task") @routes.committer("/upload/<project_name>/<version_name>", methods=["GET", "POST"]) async def selected(session: routes.CommitterSession, project_name: str, version_name: str) -> response.Response | str: """Show a page to allow the user to add files to a candidate draft.""" await session.check_access(project_name) class AddFilesForm(util.QuartFormTyped): """Form for adding files to a release candidate.""" file_name = wtforms.StringField("File name (optional)") file_data = wtforms.MultipleFileField( "Files", validators=[wtforms.validators.InputRequired("At least one file is required")] ) submit = wtforms.SubmitField("Add files") def validate_file_name(self, field: wtforms.Field) -> bool: if field.data and len(self.file_data.data) > 1: raise wtforms.validators.ValidationError("File name can only be used when uploading a single file") return True form = await AddFilesForm.create_form() if await form.validate_on_submit(): try: file_name = None if isinstance(form.file_name.data, str) and form.file_name.data: file_name = pathlib.Path(form.file_name.data) file_data = form.file_data.data number_of_files = await _upload_files(project_name, version_name, session.uid, file_name, file_data) return await session.redirect( compose.selected, success=f"{number_of_files} file{'' if number_of_files == 1 else 's'} added successfully", project_name=project_name, version_name=version_name, ) except Exception as e: logging.exception("Error adding file:") await quart.flash(f"Error adding file: {e!s}", "error") svn_form = await SvnImportForm.create_form() async with db.session() as data: release = await session.release(project_name, version_name, data=data) user_ssh_keys = await data.ssh_key(asf_uid=session.uid).all() return await quart.render_template( "upload-selected.html", asf_id=session.uid, server_domain=session.app_host, release=release, project_name=project_name, version_name=version_name, form=form, svn_form=svn_form, user_ssh_keys=user_ssh_keys, ) async def _save_file(file: datastructures.FileStorage, target_path: pathlib.Path) -> None: async with aiofiles.open(target_path, "wb") as f: while chunk := await asyncio.to_thread(file.stream.read, 8192): await f.write(chunk) async def _upload_files( project_name: str, version_name: str, asf_uid: str, file_name: pathlib.Path | None, files: Sequence[datastructures.FileStorage], ) -> int: """Process and save the uploaded files into a new draft revision.""" async with revision.create_and_manage(project_name, version_name, asf_uid) as ( new_revision_dir, _new_revision_name, ): def get_target_path(file: datastructures.FileStorage) -> pathlib.Path: # Determine the target path within the new revision directory relative_file_path: pathlib.Path if not file_name: if not file.filename: raise routes.FlashError("No filename provided") # Use the original name relative_file_path = pathlib.Path(file.filename) else: # Use the provided name, relative to its anchor # In other words, ignore the leading "/" relative_file_path = file_name.relative_to(file_name.anchor) # Construct path inside the new revision directory target_path = new_revision_dir / relative_file_path return target_path # Save each uploaded file to the new revision directory for file in files: target_path = get_target_path(file) # Ensure parent directories exist within the new revision target_path.parent.mkdir(parents=True, exist_ok=True) await _save_file(file, target_path) return len(files)