gce_rescue/tasks/disks.py (144 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Compilations of all disks tasks related. """
from typing import Dict
import logging
from threading import Thread
import googleapiclient.errors
from gce_rescue.tasks.keeper import wait_for_operation
from gce_rescue.tasks.backup import create_snapshot
from gce_rescue.utils import ThreadHandler as Handler
from googleapiclient.errors import HttpError
_logger = logging.getLogger(__name__)
snapshot_thread = None
def _create_rescue_disk(vm, source_disk: str) -> Dict:
""" Create new temporary rescue disk based on source_disk.
https://cloud.google.com/compute/docs/reference/rest/v1/disks/insert
Returns:
operation-result: Dict
"""
chk_disk_exist = {}
try:
chk_disk_exist = vm.compute.disks().get(
**vm.project_data,
disk = vm.rescue_disk).execute()
except googleapiclient.errors.HttpError as e:
if e.status_code == 404:
_logger.info(f'Creating rescue disk {vm.rescue_disk}...')
else:
raise Exception(e) from e
if 'name' in chk_disk_exist.keys():
if 'users' in chk_disk_exist.keys():
disk_users = chk_disk_exist['users']
raise Exception(
f'Disk {vm.rescue_disk} is currently in use: {disk_users}'
)
_logger.info(f'Disk {vm.rescue_disk} already exist. Skipping...')
return {}
disk_body = {
'name': vm.rescue_disk,
'sourceImage': source_disk,
'type': f'projects/{vm.project}/zones/{vm.zone}/diskTypes/pd-balanced'
}
operation = vm.compute.disks().insert(
**vm.project_data,
body = disk_body).execute()
result = wait_for_operation(vm, oper=operation)
return result
def _set_disk_label(vm, disk_name = str) -> Dict:
""" Set labels.rescue=TS to be able to idenfied the boot disk when restore
the VM to the normal configuration.
https://cloud.google.com/compute/docs/reference/rest/v1/disks/setLabels
Return:
operation-result: Dict
"""
name_filter = f'name={disk_name}'
response = list_disk(
vm,
project_data=vm.project_data,
label_filter=name_filter
)
label_fingerprint = response[0]['labelFingerprint']
request_body = {
'labels': {
'rescue': vm.ts
},
'labelFingerprint': label_fingerprint
}
operation = vm.compute.disks().setLabels(
**vm.project_data,
resource = disk_name,
body = request_body).execute()
return operation
def _delete_rescue_disk(vm, disk_name: str) -> Dict:
"""
Delete rescue disk after resetting the instance to the original configuration.
https://cloud.google.com/compute/docs/reference/rest/v1/disks/delete
Param:
disk_name: str, Name of the disk to be deleted.
Returns:
operation-result: Dict
"""
_logger.info(f'Deleting disk {disk_name}...')
operation = vm.compute.disks().delete(
**vm.project_data,
disk = disk_name).execute()
result = wait_for_operation(vm, oper=operation)
return result
def list_disk(vm, project_data: Dict, label_filter: str) -> Dict:
"""Filter existing disks with labels.rescue=TS
https://cloud.google.com/compute/docs/reference/rest/v1/disks/list
Returns:
result: Dict
"""
result = vm.compute.disks().list(
**project_data,
filter=label_filter).execute()
#TODO: Add validation and throw exception if response has more than 1 disk:
# len(response['items'])
#For test phase - remember to fix that
return result['items']
def attach_disk(
vm,
disk_name: str,
device_name: str,
boot: bool = False
) -> Dict:
"""
Attach disk on the instance. By default it will attach as secundary
https://cloud.google.com/compute/docs/reference/rest/v1/instances/attachDisk
Returns:
operation-result: Dict
"""
if not boot:
request = _set_disk_label(vm, disk_name)
if request['status'] != 'DONE':
_logger.error(f'Unable to set label to disk {disk_name}.')
raise Exception(request)
else:
_logger.info(f'Label configured successfully disk {disk_name}.')
attach_disk_body = {
'boot': boot,
'name': disk_name,
'deviceName': device_name,
'type': 'PERSISTENT',
'source': f'projects/{vm.project}/zones/{vm.zone}/disks/{disk_name}'
}
_logger.info(f'Attaching disk {disk_name}...')
operation = vm.compute.instances().attachDisk(
**vm.project_data,
instance = vm.name,
body = attach_disk_body).execute()
result = wait_for_operation(vm, oper=operation)
return result
def _detach_disk(vm, disk: str) -> Dict:
""" Detach disk from the instance.
https://cloud.google.com/compute/docs/reference/rest/v1/instances/detachDisk
"""
_logger.info(f'Detaching disk {disk} from {vm.name}...')
operation = vm.compute.instances().detachDisk(
**vm.project_data,
instance = vm.name,
deviceName = disk).execute()
result = wait_for_operation(vm, oper=operation)
return result
def take_snapshot(vm, join_snapshot=None) -> None:
global snapshot_thread
if not join_snapshot:
snapshot_thread = Thread(target=create_snapshot, args=(vm,), daemon=True)
snapshot_thread.start()
else:
snapshot_thread.join()
def create_rescue_disk(vm) -> None:
device_name = vm.disks['device_name']
# task1 = multitasks.Handler(
# target = backup,
# kwargs={'vm' : vm}
# )
# task1.start()
task2 = Handler(
target = _create_rescue_disk,
kwargs={'vm': vm, 'source_disk': vm.rescue_source_disk}
)
task2.start()
task2.join()
_detach_disk(vm, disk=device_name)
attach_disk(
vm,
disk_name=vm.rescue_disk,
device_name=vm.rescue_disk,
boot=True
)
def list_snapshot(vm) -> str:
snapshot_name = f"{vm.disks['disk_name']}-{vm.ts}"
try:
result = vm.compute.snapshots().get(
snapshot=snapshot_name,
project=vm.project
).execute()
except HttpError:
_logger.info('Snapshot was not found for VM in active rescue mode')
return ''
return snapshot_name
def restore_original_disk(vm) -> None:
""" Restore tasks to the original disk """
device_name = vm.disks['device_name']
_detach_disk(vm, disk=vm.rescue_disk)
_detach_disk(vm, disk=device_name)
_delete_rescue_disk(vm, disk_name=vm.rescue_disk)
attach_disk(vm, **vm.disks, boot=True)