atr/revision.py (82 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
import datetime
import logging
import pathlib
from collections.abc import AsyncGenerator
from typing import Final
import aiofiles.os
import atr.db as db
import atr.db.models as models
import atr.tasks as tasks
import atr.util as util
_LOGGER: Final = logging.getLogger(__name__)
@contextlib.asynccontextmanager
async def create_and_manage(
project_name: str, version_name: str, asf_uid: str, preview: bool = False, create_directory: bool = True
) -> AsyncGenerator[tuple[pathlib.Path, str]]:
"""Manage the creation and symlinking of a mutable release revision."""
base_dir = util.get_unfinished_dir()
base_release_dir = base_dir / project_name / version_name
new_revision_name = _new_name(asf_uid)
new_revision_dir = base_release_dir / new_revision_name
# Ensure that the base directory for the release exists
await aiofiles.os.makedirs(base_release_dir, exist_ok=True)
# Get the parent revision, if available
parent_revision_id: str | None = None
parent_revision_dir: pathlib.Path | None = None
async with db.session() as data:
release_name = models.release_name(project_name, version_name)
namespace = release_name + (" draft" if (preview is False) else " preview")
release = await data.release(name=release_name, _project=True).get()
if release is not None:
parent_revision_id = release.revision
if parent_revision_id:
parent_revision_dir = base_release_dir / parent_revision_id
try:
# Create the new revision directory
if parent_revision_dir:
_LOGGER.info(f"Creating new revision {new_revision_name} by hard linking from {parent_revision_id}")
await util.create_hard_link_clone(parent_revision_dir, new_revision_dir)
elif create_directory:
_LOGGER.info(f"Creating new empty revision directory {new_revision_name}")
await aiofiles.os.makedirs(new_revision_dir)
else:
_LOGGER.info(f"Creating new empty revision with no directory for {new_revision_name}")
# Yield control to the block within "async with"
yield new_revision_dir, new_revision_name
# If the "with" block completed without error, store the parent link
async with db.session() as data:
async with data.begin():
if parent_revision_id is not None:
_LOGGER.info(f"Storing parent link for {new_revision_name} -> {parent_revision_id}")
data.add(models.TextValue(ns=namespace, key=new_revision_name, value=parent_revision_id))
else:
_LOGGER.info(f"No parent revision for {new_revision_name}")
release = await data.release(name=release_name, _project=True).demand(
RuntimeError("Release does not exist")
)
release.revision = new_revision_name
if preview is False:
# Schedule the checks to be run
await tasks.draft_checks(project_name, version_name, new_revision_name)
except Exception:
_LOGGER.exception(f"Error during revision management for {new_revision_name}")
# Keep this in case we do clean up the new revision directory
raise
finally:
# TODO: It's hard to know whether we should clean up the new revision directory
# Generally we should probably keep it no matter what
# The only exception would be if release.revision was never set
# But if it wasn't, it doesn't matter so much
...
async def latest_info(project_name: str, version_name: str) -> tuple[str | None, str | None, datetime.datetime | None]:
"""Get the name, editor, and timestamp of the latest revision."""
revision_name: str | None = None
editor: str | None = None
timestamp: datetime.datetime | None = None
async with db.session() as data:
release = await data.release(name=models.release_name(project_name, version_name), _project=True).demand(
RuntimeError("Release does not exist")
)
revision_name = release.revision
if not revision_name:
return revision_name, editor, timestamp
parts = revision_name.split("@", 1)
if len(parts) == 2:
editor = parts[0]
dt_obj = datetime.datetime.strptime(parts[1][:-1], "%Y-%m-%dT%H.%M.%S.%f")
timestamp = dt_obj.replace(tzinfo=datetime.UTC)
return revision_name, editor, timestamp
def _new_name(asf_uid: str) -> str:
"""Generate a new revision name with timestamp truncated to milliseconds."""
now_utc = datetime.datetime.now(datetime.UTC)
time_prefix = now_utc.strftime("%Y-%m-%dT%H.%M.%S")
milliseconds = now_utc.microsecond // 1000
timestamp_str = f"{time_prefix}.{milliseconds:03d}Z"
return f"{asf_uid}@{timestamp_str}"