samcli/commands/sync/sync_context.py (152 lines of code) (raw):
"""
Context object used by sync command
"""
import logging
import threading
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, cast
import tomlkit
from tomlkit.items import Item
from tomlkit.toml_document import TOMLDocument
from samcli.lib.build.build_graph import DEFAULT_DEPENDENCIES_DIR
from samcli.lib.utils.osutils import rmtree_if_exists
LOG = logging.getLogger(__name__)
DEFAULT_SYNC_STATE_FILE_NAME = "sync.toml"
SYNC_STATE = "sync_state"
RESOURCE_SYNC_STATES = "resource_sync_states"
HASH = "hash"
SYNC_TIME = "sync_time"
DEPENDENCY_LAYER = "dependency_layer"
LATEST_INFRA_SYNC_TIME = "latest_infra_sync_time"
# global lock for writing to file
_lock = threading.Lock()
@dataclass
class ResourceSyncState:
hash_value: str
sync_time: datetime
@dataclass
class SyncState:
dependency_layer: bool
resource_sync_states: Dict[str, ResourceSyncState]
latest_infra_sync_time: Optional[datetime]
def update_resource_sync_state(self, resource_id: str, hash_value: str) -> None:
"""
Updates the sync_state information for the provided resource_id
to be stored in the TOML file.
Parameters
-------
resource_id: str
The resource identifier of the resource
hash_value: str
The logical ID identifier of the resource
"""
self.resource_sync_states[resource_id] = ResourceSyncState(hash_value, datetime.utcnow())
def update_infra_sync_time(self) -> None:
"""
Updates the last infra sync time to be stored in the TOML file.
"""
self.latest_infra_sync_time = datetime.utcnow()
def _sync_state_to_toml_document(sync_state: SyncState) -> TOMLDocument:
"""
Writes the sync state information to the TOML file.
Parameters
-------
sync_state: SyncState
The SyncState to cache the information in the TOML file
Returns
-------
TOMLDocument
Object which will be dumped to the TOML file
"""
sync_state_toml_table = tomlkit.table()
sync_state_toml_table[DEPENDENCY_LAYER] = sync_state.dependency_layer
if sync_state.latest_infra_sync_time:
sync_state_toml_table[LATEST_INFRA_SYNC_TIME] = sync_state.latest_infra_sync_time.isoformat()
resource_sync_states_toml_table = tomlkit.table()
for resource_id in sync_state.resource_sync_states:
resource_sync_state = sync_state.resource_sync_states[resource_id]
resource_sync_state_toml_table = tomlkit.table()
resource_sync_state_toml_table[HASH] = resource_sync_state.hash_value
resource_sync_state_toml_table[SYNC_TIME] = resource_sync_state.sync_time.isoformat()
# For Nested stack resources, replace "/" with "-"
resource_id_toml = resource_id.replace("/", "-")
resource_sync_states_toml_table[resource_id_toml] = resource_sync_state_toml_table
toml_document = tomlkit.document()
toml_document.add((tomlkit.comment("This file is auto generated by SAM CLI sync command")))
toml_document.add(SYNC_STATE, cast(Item, sync_state_toml_table))
toml_document.add(RESOURCE_SYNC_STATES, cast(Item, resource_sync_states_toml_table))
return toml_document
def _toml_document_to_sync_state(toml_document: Dict) -> Optional[SyncState]:
"""
Reads the cached information from the provided toml_document.
Parameters
-------
toml_document: SyncState
The toml document to read the information from
"""
if not toml_document:
return None
sync_state_toml_table = toml_document.get(SYNC_STATE)
resource_sync_states_toml_table = toml_document.get(RESOURCE_SYNC_STATES, {})
# If no info in toml file
if not (sync_state_toml_table or resource_sync_states_toml_table):
return None
resource_sync_states = dict()
if resource_sync_states_toml_table:
for resource_id in resource_sync_states_toml_table:
resource_sync_state_toml_table = resource_sync_states_toml_table.get(resource_id)
resource_sync_state = ResourceSyncState(
resource_sync_state_toml_table.get(HASH),
datetime.fromisoformat(resource_sync_state_toml_table.get(SYNC_TIME)),
)
# For Nested stack resources, replace "-" with "/"
resource_sync_state_resource_id = resource_id.replace("-", "/")
resource_sync_states[resource_sync_state_resource_id] = resource_sync_state
dependency_layer = False
latest_infra_sync_time = None
if sync_state_toml_table:
dependency_layer = sync_state_toml_table.get(DEPENDENCY_LAYER)
latest_infra_sync_time = sync_state_toml_table.get(LATEST_INFRA_SYNC_TIME)
if latest_infra_sync_time:
latest_infra_sync_time = datetime.fromisoformat(str(latest_infra_sync_time))
sync_state = SyncState(dependency_layer, resource_sync_states, latest_infra_sync_time)
return sync_state
class SyncContext:
_current_state: SyncState
_previous_state: Optional[SyncState]
_build_dir: Path
_cache_dir: Path
_file_path: Path
skip_deploy_sync: bool
def __init__(self, dependency_layer: bool, build_dir: str, cache_dir: str, skip_deploy_sync: bool):
self._current_state = SyncState(dependency_layer, dict(), None)
self._previous_state = None
self.skip_deploy_sync = skip_deploy_sync
self._build_dir = Path(build_dir)
self._cache_dir = Path(cache_dir)
self._file_path = Path(build_dir).parent.joinpath(DEFAULT_SYNC_STATE_FILE_NAME)
def __enter__(self) -> "SyncContext":
with _lock:
self._read()
LOG.debug(
"Entering sync context, previous state: %s, current state: %s", self._previous_state, self._current_state
)
# if adl parameter is changed between sam sync runs, cleanup build, cache and dependencies folders
if self._previous_state and self._previous_state.dependency_layer != self._current_state.dependency_layer:
self._cleanup_build_folders()
return self
def __exit__(self, *args) -> None:
with _lock:
self._write()
def update_infra_sync_time(self) -> None:
"""
Updates the last infra sync time and stores it in the TOML file.
"""
with _lock:
LOG.debug("Updating latest_infra_sync_time in sync state")
self._current_state.update_infra_sync_time()
self._write()
def get_latest_infra_sync_time(self) -> Optional[datetime]:
"""
Returns the time last infra sync happened.
Returns
-------
Optional[datetime]
The last infra sync time if it exists
"""
with _lock:
infra_sync_time = self._current_state.latest_infra_sync_time
if not infra_sync_time:
LOG.debug("No record of previous infrastructure sync time found from sync.toml file")
return None
LOG.debug("Latest infra sync happened at %s ", infra_sync_time)
return infra_sync_time
def update_resource_sync_state(self, resource_id: str, hash_value: str) -> None:
"""
Updates the sync_state information for the provided resource_id
to be stored in the TOML file.
Parameters
-------
resource_id: str
The resource identifier of the resource
hash_value: str
The logical ID identifier of the resource
"""
with _lock:
LOG.debug("Updating resource_sync_state for resource %s with hash %s", resource_id, hash_value)
self._current_state.update_resource_sync_state(resource_id, hash_value)
self._write()
def get_resource_latest_sync_hash(self, resource_id: str) -> Optional[str]:
"""
Returns the latest hash from resource_sync_state if this information was
cached for the provided resource_id.
Parameters
-------
resource_id: str
The resource identifier of the resource
Returns
-------
Optional[str]
The hash of the resource stored in resource_sync_state if it exists
"""
with _lock:
resource_sync_state = self._current_state.resource_sync_states.get(resource_id)
if not resource_sync_state:
LOG.debug("No record of latest hash found for resource %s found in sync.toml file", resource_id)
return None
LOG.debug(
"Latest resource_sync_state hash %s found for resource %s", resource_id, resource_sync_state.hash_value
)
return resource_sync_state.hash_value
def _write(self) -> None:
with open(self._file_path, "w+") as file:
file.write(tomlkit.dumps(_sync_state_to_toml_document(self._current_state)))
def _read(self) -> None:
try:
with open(self._file_path) as file:
toml_document = cast(Dict, tomlkit.loads(file.read()))
self._previous_state = _toml_document_to_sync_state(toml_document)
if self._previous_state:
self._current_state.resource_sync_states = self._previous_state.resource_sync_states
self._current_state.latest_infra_sync_time = self._previous_state.latest_infra_sync_time
except OSError:
LOG.debug("Missing previous sync state, will create a new file at the end of this execution")
def _cleanup_build_folders(self) -> None:
"""
Cleans up build, cache and dependencies folders for clean start of the next session
"""
LOG.debug("Cleaning up build directory %s", self._build_dir)
rmtree_if_exists(self._build_dir)
LOG.debug("Cleaning up cache directory %s", self._cache_dir)
rmtree_if_exists(self._cache_dir)
dependencies_dir = Path(DEFAULT_DEPENDENCIES_DIR)
LOG.debug("Cleaning up dependencies directory: %s", dependencies_dir)
rmtree_if_exists(dependencies_dir)