processors/computeengine.py (457 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .base import Processor, NotConfiguredException
import google.auth
from googleapiclient import discovery
import time
import re
class ComputeengineOperationFailed(Exception):
pass
class ComputeengineProcessor(Processor):
"""
Perform actions on Compute Engine instances.
Args:
project (str, optional): Google Cloud project ID.
instance (str): Instance to operate on.
disk (str, optional): Disk to operate on.
deviceName (str, optional): Device name to operate on.
snapshotName (str, optional): Snapshot name (or regexp for disks.instanceSnapshots) for instant snapshots.
maxSnapshots (int, optional): Maximum snapshots for disks.instanceSnapshots.purge.
storageLocations (list, optional): Storage locations for disk.snapshots.
snapshotParameters (dict, optional): Additional snapshot configuration.
labels (dict, optional): Labels for instant snapshots.
region (str, optional): Google Cloud region.
zone (str, optional): Google Cloud zone for the instance.
mode (str): One of: instances.get, instances.stop, instances.reset, instances.start,
instances.detachDisk, instances.attachDisk, disks.instantSnapshots.create, disks.instantSnapshots.purge,
disk.snapshots.create, disk.snapshots.purge
"""
def get_default_config_key():
return 'computeengine'
def wait_for_operation_done(self,
compute_service,
operation_name,
operation_self_link,
project,
zone,
region,
timeout=30):
end_time = start_time = time.monotonic()
while True and (end_time - start_time) < timeout:
if '/global/' in operation_self_link:
op_request = compute_service.globalOperations().get(
project=project, operation=operation_name).execute()
elif '/zones/' in operation_self_link:
op_request = compute_service.zoneOperations().get(
project=project, zone=zone,
operation=operation_name).execute()
else:
op_request = compute_service.regionOperations().get(
project=project, region=region,
operation=operation_name).execute()
if 'status' in op_request and op_request['status'] == 'DONE':
if 'error' in op_request:
self.logger.error(
'Error while waiting for long running operation %s to complete.'
% (operation_name),
extra={'error': op_request['error']})
return op_request['error']
return op_request
time.sleep(5)
end_time = time.monotonic()
def get_instance(self, compute_service, project, zone, instance):
get_request = compute_service.instances().get(project=project,
zone=zone,
instance=instance)
get_response = get_request.execute()
if 'id' in get_response:
return get_response
else:
raise ComputeengineOperationFailed('Failed to get instance: %s' %
str(get_response))
def process(self, output_var='computeengine'):
if 'mode' not in self.config:
raise NotConfiguredException(
'No Compute Engine operation specified.')
if self.config['mode'].startswith('instances.'):
if 'instance' not in self.config:
raise NotConfiguredException(
'No instance specified in configuration.')
if self.config['mode'].lower() in [
'instances.attachdisk', 'instances.detachdisk'
]:
if 'disk' not in self.config:
raise NotConfiguredException(
'No disk specified in configuration.')
credentials, credentials_project_id = google.auth.default()
project = self._jinja_expand_string(
self.config['project'],
'project') if 'project' in self.config else credentials_project_id
if not project:
project = credentials.quota_project_id
compute_service = discovery.build(
'compute', 'v1', http=self._get_branded_http(credentials))
timeout = self._jinja_expand_int(
self.config['timeout']) if 'timeout' in self.config else 30
zone = self._jinja_expand_string(
self.config['zone']) if 'zone' in self.config else None
region = self._jinja_expand_string(
self.config['region']) if 'region' in self.config else None
instance = self._jinja_expand_string(
self.config['instance']) if 'instance' in self.config else None
disk = self._jinja_expand_string(
self.config['disk']) if 'disk' in self.config else None
device_name = self._jinja_expand_string(
self.config['deviceName']) if 'deviceName' in self.config else None
if disk is not None:
disk = disk.replace('https://www.googleapis.com/compute/v1/', '')
if self.config['mode'] == 'instances.get':
return {
output_var:
self.get_instance(compute_service, project, zone, instance)
}
if self.config['mode'] == 'instances.stop':
inst = self.get_instance(compute_service, project, zone, instance)
if inst['status'] == 'RUNNING':
self.logger.info('Stopping instance %s in zone %s' %
(instance, zone))
stop_request = compute_service.instances().stop(
project=project, zone=zone, instance=instance)
stop_response = stop_request.execute()
if 'id' in stop_response:
self.wait_for_operation_done(compute_service,
stop_response['id'],
stop_response['selfLink'],
project, zone, region, timeout)
return {
output_var:
self.get_instance(compute_service, project, zone,
instance)
}
else:
raise ComputeengineOperationFailed(
'Failed to stop instance: %s' % str(stop_response))
return {output_var: inst}
if self.config['mode'] == 'instances.reset':
inst = self.get_instance(compute_service, project, zone, instance)
self.logger.info('Resetting instance %s in zone %s' %
(instance, zone))
reset_request = compute_service.instances().reset(project=project,
zone=zone,
instance=instance)
reset_response = reset_request.execute()
if 'id' in reset_response:
self.wait_for_operation_done(compute_service,
reset_response['id'],
reset_response['selfLink'],
project, zone, region, timeout)
return {
output_var:
self.get_instance(compute_service, project, zone,
instance)
}
else:
raise ComputeengineOperationFailed(
'Failed to reset instance: %s' % str(reset_response))
return {output_var: inst}
if self.config['mode'] == 'instances.start':
inst = self.get_instance(compute_service, project, zone, instance)
if inst['status'] != 'RUNNING':
self.logger.info('Starting instance %s in zone %s' %
(instance, zone))
start_request = compute_service.instances().start(
project=project, zone=zone, instance=instance)
start_response = start_request.execute()
if 'id' in start_response:
self.wait_for_operation_done(compute_service,
start_response['id'],
start_response['selfLink'],
project, zone, region, timeout)
return {
output_var:
self.get_instance(compute_service, project, zone,
instance)
}
else:
raise ComputeengineOperationFailed(
'Failed to start instance: %s' % str(start_response))
return {output_var: inst}
if self.config['mode'].lower() == 'instances.detachdisk':
wait_for_stopped = self._jinja_expand_bool(
self.config['waitForStopped']
) if 'waitForStopped' in self.config else True
# Wait for instance to be stopped if required
inst = self.get_instance(compute_service, project, zone, instance)
count = 0
while wait_for_stopped and True:
if inst['status'] != 'TERMINATED':
time.sleep(2)
count += 2
if count > timeout:
raise ComputeengineOperationFailed(
'Instance did not stop in time: %s' % (inst))
else:
break
# Check if disk is already detached
disk_is_detached = True
if 'disks' in inst:
for attached_disk in inst['disks']:
if attached_disk[
'deviceName'] == device_name or attached_disk[
'source'].replace(
'https://www.googleapis.com/compute/v1/',
'') == disk:
disk_is_detached = False
if device_name is None:
device_name = attached_disk['deviceName']
break
if not disk_is_detached:
self.logger.info(
'Detaching disk %s from instance %s (zone %s)' %
(disk if disk is not None else device_name, instance, zone))
detach_request = compute_service.instances().detachDisk(
project=project,
zone=zone,
instance=instance,
deviceName=device_name)
detach_response = detach_request.execute()
if 'id' in detach_response:
self.wait_for_operation_done(compute_service,
detach_response['id'],
detach_response['selfLink'],
project, zone, region, timeout)
return {
output_var:
self.get_instance(compute_service, project, zone,
instance)
}
else:
raise ComputeengineOperationFailed(
'Failed to detach disk %s from %s: %s' %
(disk, instance, str(detach_response)))
else:
return {output_var: inst}
if self.config['mode'] == 'instances.attachdisk':
force_attach = self._jinja_expand_bool(
self.config['forceAttach']
) if 'forceAttach' in self.config else False
# Check if disk is already attached
inst = self.get_instance(compute_service, project, zone, instance)
disk_is_attached = False
if 'disks' in inst:
for attached_disk in inst['disks']:
if attached_disk[
'deviceName'] == device_name or attached_disk[
'source'].replace(
'https://www.googleapis.com/compute/v1/',
'') == disk:
disk_is_attached = True
break
if not disk_is_attached:
request_body = {
'type': 'PERSISTENT',
'mode': 'READ_WRITE',
'source': disk,
'deviceName': device_name,
'boot': False,
'forceAttach': force_attach,
}
if 'attachParameters' in self.config:
request_body = {
**request_body,
**self._jinja_expand_dict_all(
self.config['attachParameters'], 'attach_parameters')
}
self.logger.info('Attaching disk %s to instance %s (zone %s)' %
(disk, instance, zone))
attach_request = compute_service.instances().attachDisk(
project=project,
zone=zone,
instance=instance,
forceAttach=force_attach,
body=request_body)
attach_response = attach_request.execute()
if 'id' in attach_response:
self.wait_for_operation_done(compute_service,
attach_response['id'],
attach_response['selfLink'],
project, zone, region, timeout)
return {
output_var:
self.get_instance(compute_service, project, zone,
instance)
}
else:
raise ComputeengineOperationFailed(
'Failed to attach disk %s to %s: %s' %
(disk, instance, str(attach_response)))
if self.config['mode'].lower().startswith('disks.snapshots.'):
if 'snapshotName' not in self.config:
raise NotConfiguredException(
'No snapshotName in configuration!')
snapshot_name = self._jinja_expand_string(
self.config['snapshotName'], 'snapshot_name')
if self.config['mode'].lower() == 'disks.snapshots.create':
if 'storageLocations' not in self.config:
raise NotConfiguredException(
'No storageLocations in configuration!')
storage_locations = self._jinja_expand_list(
self.config['storageLocations'], 'storage_locations')
request_body = {
'name': snapshot_name,
'sourceDisk': disk,
'storageLocations': storage_locations,
}
if 'snapshotParameters' in self.config:
snapshot_parameters = self._jinja_expand_dict_all(
self.config['snapshotParameters'],
'snapshot_parameters')
request_body = {**request_body, **snapshot_parameters}
if 'labels' in self.config:
request_body['labels'] = self._jinja_expand_dict_all(
self.config['labels'], 'labels')
snapshot_request = None
if region is not None:
self.logger.info(
'Creating snapshot %s of disk %s in %s...' %
(snapshot_name, disk, region))
snapshot_request = compute_service.regionDisks(
).createSnapshot(project=project,
region=region,
disk=disk,
body=request_body)
else:
self.logger.info(
'Creating snapshot %s of disk %s in %s...' %
(snapshot_name, disk, zone))
snapshot_request = compute_service.disks().createSnapshot(
project=project,
zone=zone,
disk=disk,
body=request_body)
snapshot_response = snapshot_request.execute()
if 'id' in snapshot_response:
ret = self.wait_for_operation_done(
compute_service, snapshot_response['id'],
snapshot_response['selfLink'], project, zone, region,
timeout)
return {output_var: ret}
else:
raise ComputeengineOperationFailed(
'Failed to create snapshot %s of disk %s: %s' %
(snapshot_name, disk, str(snapshot_response)))
if self.config['mode'].lower() == 'disks.snapshots.purge':
max_snapshots = self._jinja_expand_int(
self.config['maxSnapshots'], 'max_snapshots')
snapshot_request = None
page_token = None
matching_snapshots = []
while True:
snapshot_request = compute_service.snapshots().list(
project=project, pageToken=page_token)
snapshot_response = snapshot_request.execute()
if 'items' in snapshot_response:
for snapshot in snapshot_response['items']:
if re.match(snapshot_name, snapshot['name']):
matching_snapshots.append(snapshot)
if 'nextPageToken' in snapshot_response:
page_token = snapshot_response['nextPageToken']
else:
break
if len(matching_snapshots) > max_snapshots:
matching_snapshots.sort(
key=lambda item: item['creationTimestamp'],
reverse=True)
for snapshot in matching_snapshots[max_snapshots:]:
self.logger.info(
'Purging snapshot %s in project %s...' %
(snapshot['name'], project))
delete_request = compute_service.snapshots().delete(
project=project, snapshot=snapshot['name'])
delete_response = delete_request.execute()
if 'id' in delete_response:
self.wait_for_operation_done(
compute_service, delete_response['id'],
delete_response['selfLink'], project, zone,
region, timeout)
return {output_var: len(matching_snapshots)}
if self.config['mode'].lower().startswith('disks.instantsnapshots.'):
if 'snapshotName' not in self.config:
raise NotConfiguredException(
'No snapshotName in configuration!')
snapshot_name = self._jinja_expand_string(
self.config['snapshotName'], 'snapshot_name')
compute_service_beta = discovery.build(
'compute',
'beta',
cache_discovery=False,
discoveryServiceUrl=
'https://www.googleapis.com/discovery/v1/apis/compute/beta/rest',
http=self._get_branded_http(credentials))
if self.config['mode'].lower() == 'disks.instantsnapshots.create':
request_body = {
'name': snapshot_name,
'sourceDisk': disk,
}
if 'snapshotParameters' in self.config:
snapshot_parameters = self._jinja_expand_dict_all(
self.config['snapshotParameters'],
'snapshot_parameters')
request_body = {**request_body, **snapshot_parameters}
if 'labels' in self.config:
request_body['labels'] = self._jinja_expand_dict_all(
self.config['labels'], 'labels')
snapshot_request = None
if '/regions/' in disk:
self.logger.info(
'Creating instant snapshot %s of disk %s in %s...' %
(snapshot_name, disk, region))
snapshot_request = compute_service_beta.regionInstantSnapshots(
).insert(project=project, region=region, body=request_body)
else:
self.logger.info(
'Creating instant snapshot %s of disk %s in %s...' %
(snapshot_name, disk, zone))
snapshot_request = compute_service_beta.instantSnapshots(
).insert(project=project, zone=zone, body=request_body)
snapshot_response = snapshot_request.execute()
if 'id' in snapshot_response:
ret = self.wait_for_operation_done(
compute_service_beta, snapshot_response['id'],
snapshot_response['selfLink'], project, zone, region,
timeout)
return {output_var: ret}
else:
raise ComputeengineOperationFailed(
'Failed to create instant snapshot %s of disk %s: %s' %
(snapshot_name, disk, str(snapshot_response)))
if self.config['mode'].lower() == 'disks.instantsnapshots.purge':
max_snapshots = self._jinja_expand_int(
self.config['maxSnapshots'], 'max_snapshots')
snapshot_request = None
page_token = None
matching_snapshots = []
while True:
if region is not None:
snapshot_request = compute_service_beta.regionInstantSnapshots(
).list(project=project,
region=region,
pageToken=page_token)
else:
snapshot_request = compute_service_beta.instantSnapshots(
).list(project=project, zone=zone, pageToken=page_token)
snapshot_response = snapshot_request.execute()
if 'items' in snapshot_response:
for snapshot in snapshot_response['items']:
if re.match(snapshot_name, snapshot['name']):
matching_snapshots.append(snapshot)
if 'nextPageToken' in snapshot_response:
page_token = snapshot_response['nextPageToken']
else:
break
if len(matching_snapshots) > max_snapshots:
matching_snapshots.sort(
key=lambda item: item['creationTimestamp'],
reverse=True)
for snapshot in matching_snapshots[max_snapshots:]:
delete_request = None
self.logger.info(
'Purging snapshot %s in project %s...' %
(snapshot['name'], project))
if '/regions/' in snapshot['selfLink']:
delete_request = compute_service_beta.regionInstantSnapshots(
).delete(project=project,
region=region,
instantSnapshot=snapshot['name'])
else:
delete_request = compute_service_beta.instantSnapshots(
).delete(project=project,
zone=zone,
instantSnapshot=snapshot['name'])
delete_response = delete_request.execute()
if 'id' in delete_response:
self.wait_for_operation_done(
compute_service_beta, delete_response['id'],
delete_response['selfLink'], project, zone,
region, timeout)
return {output_var: len(matching_snapshots)}
return {
output_var: None,
}