gce_rescue/tasks/actions.py (116 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" List of ordered tasks to be executed when set/reset VM rescue mode. """
from typing import List
import logging
from gce_rescue.gce import Instance
from gce_rescue.tasks.disks import (
take_snapshot,
create_rescue_disk,
restore_original_disk,
attach_disk
)
from gce_rescue.tasks.operations import (
start_instance,
stop_instance
)
from gce_rescue.tasks.metadata import (
set_metadata,
restore_metadata_items
)
from gce_rescue.utils import Tracker
from gce_rescue.config import get_config
_logger = logging.getLogger(__name__)
def _list_tasks(vm: Instance, action: str) -> List:
""" List tasks, by order, per operation
operations (str):
1. set_rescue_mode
2. reset_rescue_mode
"""
all_tasks = {
'set_rescue_mode': [
{
'name': stop_instance,
'args': [{
'vm': vm
}]
},
{
'name': create_rescue_disk,
'args': [{
'vm': vm
}]
},
{
'name': set_metadata,
'args': [{
'vm': vm
}]
},
{
'name': start_instance,
'args': [{
'vm': vm
}]
},
{
'name': attach_disk,
'args': [{
'vm': vm,
'boot': False,
**vm.disks
}],
},
{
'name': restore_metadata_items,
'args': [{
'vm': vm
}],
}
],
'reset_rescue_mode': [
{
'name': stop_instance,
'args': [{
'vm': vm
}]
},
{
'name': restore_original_disk,
'args': [{
'vm': vm
}]
},
{
'name': restore_metadata_items,
'args': [{
'vm': vm,
'remove_rescue_mode': True
}]
},
{
'name': start_instance,
'args': [{
'vm': vm
}]
},
]
}
if action not in all_tasks:
_logger.info(f'Unable to find "{action}".')
raise ValueError()
return all_tasks[action]
def call_tasks(vm: Instance, action: str) -> None:
""" Loop tasks dict and execute """
tasks = _list_tasks(vm = vm, action = action)
async_backup_thread = None
if action == 'set_rescue_mode':
if get_config('skip-snapshot'):
_logger.info(f'Skipping snapshot backup.')
else:
take_snapshot(vm)
async_backup_thread = True
total_tasks = len(tasks)
tracker = Tracker(total_tasks)
tracker.start()
for task in tasks:
execute = task['name']
args = task['args'][0]
execute(**args)
tracker.advance(step = 1)
if async_backup_thread:
_logger.info(f'Waiting for async backup to finish')
take_snapshot(vm, join_snapshot=True)
_logger.info('done.')
tracker.finish()