tools/dns-sync/dns_sync/audit_log.py (212 lines of code) (raw):

# Copyright 2017 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import json import logging import time from google.cloud import datastore from googleapiclient import errors from dns_sync import api from dns_sync import auth from dns_sync import config class UTC(datetime.tzinfo): """UTC Time Zone.""" def utcoffset(self, _): return datetime.timedelta(0) def tzname(self, _): return 'UTC' def dst(self, _): return datetime.timedelta(0) UTC_INSTANCE = UTC() def utcnow(): """Current time in UTC. Returns: Current time in UTC. """ return datetime.datetime.now(UTC_INSTANCE) class AuditLogLoop(datastore.Entity): """Manage the audit loop. In order to verify proper functioning of the system we'll constantly create and destroy a coumpute instance the cloud-dns project and check how long it takes for the audit event to be received writing a custom metric for the results. """ KEY = api.CLIENTS.datastore.key('AuditLogState', 'audit_log_state') RESOURCE_NAME = 'dns-sync-test' ZONE = 'us-central1-a' CUSTOM_METRIC_TYPE = 'custom.googleapis.com/audit_event_delay' @classmethod def get_state_entity(cls): """Returns the AuditLoop state entity. Creates the datastore entity if it doesn't exist. Returns: instance of AuditLogLoop """ state = api.CLIENTS.datastore.get(AuditLogLoop.KEY) if state is None: state = AuditLogLoop(None) else: state = AuditLogLoop(state) return state @classmethod def is_audit_log_test_event(cls, message_payload): """True if the supplied event is for the test resource. Args: message_payload: Dictionary populated from audit log message. Returns: Boolean. """ resource_name = message_payload['resource']['name'] if resource_name == AuditLogLoop.RESOURCE_NAME: return True return False @classmethod def get_custom_metric(cls): """Query for and return the Stackdriver audit loop custom metric. Returns: Dictionary representing the custom metric. """ filter_str = 'metric.type=starts_with("{}")'.format( AuditLogLoop.CUSTOM_METRIC_TYPE) descriptors = api.CLIENTS.metrics.projects().metricDescriptors().list( name='projects/' + config.get_project_id(), filter=filter_str).execute() return descriptors.get('metricDescriptors', None) @classmethod def create_custom_metric(cls): """Create Stackdriver audit log custom metric. Returns: Operation for creating the custom metric. """ body = { 'type': AuditLogLoop.CUSTOM_METRIC_TYPE, 'labels': [{ 'key': 'call', 'valueType': 'STRING', 'description': 'the API call the delay is for. (insert/delete)' }], 'metricKind': 'GAUGE', 'valueType': 'INT64', 'unit': 'msecs', 'description': 'ms delay for receiving GCE audit log events.', 'displayName': 'dns sync gce audit log delay ' } operation = api.CLIENTS.metrics.projects().metricDescriptors().create( name='projects/' + config.get_project_id(), body=body).execute() return operation @classmethod def write_custom_metric(cls, call, value): """Writes a value to the audit loop custom metric. Args: call: The call the metric is for. A string like insert or delete. value: Number of milliseconds latency. Returns: An empty dictionary. """ now = utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') body = { 'timeSeries': [{ 'metric': { 'type': AuditLogLoop.CUSTOM_METRIC_TYPE, 'labels': { 'call': call } }, 'points': [{ 'interval': { 'startTime': now, 'endTime': now }, 'value': { 'int64Value': value } }] }] } response = api.CLIENTS.metrics.projects().timeSeries().create( name='projects/' + config.get_project_id(), body=body).execute() return response @classmethod def get_test_resource(cls): """Return the GCE Instance the audit loop uses. Returns: Dictionary representing the GCE instance, None if not found. Raises: errors.HttpError: On a failed API call. """ try: resource = api.CLIENTS.compute.instances().get( instance=AuditLogLoop.RESOURCE_NAME, project=config.get_project_id(), zone=AuditLogLoop.ZONE).execute() except errors.HttpError as e: if e.resp.status == 404: return None else: raise return resource def __init__(self, entity): """Create the AuditLogLoop instance. Shouldn't be called, use AuditLogLoop.get_entity(). Args: entity: datastore entity holding state. """ if entity: super(AuditLogLoop, self).__init__(entity.key, list(entity.exclude_from_indexes)) self.update(entity) else: super(AuditLogLoop, self).__init__(AuditLogLoop.KEY, [ 'running', 'current_operation', 'last_call', 'last_call_time', 'last_event', 'last_event_time', 'last_call_event_received' ]) self.update({ 'running': False, 'current_operation': None, 'last_call': None, 'last_call_time': None, 'last_event': None, 'last_event_time': None, 'last_call_event_received': False }) def start_test_resource(self): """Call GCE API to start the test resource. Returns: GCE operation. """ operation = api.CLIENTS.compute.instances().start( instance=AuditLogLoop.RESOURCE_NAME, project=config.get_project_id(), zone=AuditLogLoop.ZONE).execute() self.record_call('start', operation) return operation def stop_test_resource(self): """Call GCE API to stop the test resource. Returns: GCE operation. """ operation = api.CLIENTS.compute.instances().stop( instance=AuditLogLoop.RESOURCE_NAME, project=config.get_project_id(), zone=AuditLogLoop.ZONE).execute() self.record_call('stop', operation) return operation def delete_test_resource(self): """Call GCE API to delete the test resource. Returns: GCE operation. Raises: errors.HttpError: On a failed API call. """ request = api.CLIENTS.compute.instances().delete( instance=AuditLogLoop.RESOURCE_NAME, project=config.get_project_id(), zone=AuditLogLoop.ZONE) try: operation = request.execute() except errors.HttpError as e: if e.resp.status == 404: logging.warning('test resource does not exist') return None else: raise self.record_call('delete', operation) return operation def create_test_resource_body(self): """Creates the body of a request to create the test resource. Returns: A dictionary for supplying as the body parameter of a compute.instances.insert call. """ machine_type = ('https://www.googleapis.com/compute/v1/projects/' '{}/zones/{}/machineTypes/f1-micro').format( config.get_project_id(), AuditLogLoop.ZONE) network = ('https://www.googleapis.com/compute/v1/projects/' '{}/global/networks/default').format( config.get_project_id()) image = ('https://www.googleapis.com/compute/v1/projects/' 'ubuntu-os-cloud/global/images/family/ubuntu-1604-lts') body = { 'zone': AuditLogLoop.ZONE, 'machineType': machine_type, 'name': AuditLogLoop.RESOURCE_NAME, 'networkInterfaces': [{ 'network': network }], 'disks': [{ 'type': 'PERSISTENT', 'boot': True, 'autoDelete': True, 'initializeParams': { 'sourceImage': image } }] } return body def insert_test_resource(self): """Call GCE API to create the test resource. Returns: GCE operation. Raises: errors.HttpError: On a failed API call. """ body = self.create_test_resource_body() try: request = api.CLIENTS.compute.instances().insert( project=config.get_project_id(), zone=AuditLogLoop.ZONE, body=body) operation = request.execute() except errors.HttpError as e: if e.resp.status == 409: logging.warning('test resource already exists') return None else: raise self.record_call('insert', operation) return operation def handle_delete_event(self): """Handle the delete event. The resource was successfully stopped, so wait 30 seconds, then start it again. """ if self['running']: time.sleep(30) self.insert_test_resource() def handle_create_event(self): """Handle create event. The resource was successfully created, so wait 30 seconds, then delete it again. """ if self['running']: time.sleep(30) self.delete_test_resource() def record_event(self, message_payload): """"Record the event in our metric.""" self['last_event'] = json.dumps(message_payload) now = utcnow() self['last_event_time'] = now self['last_call_event_received'] = True last_call_time = self['last_call_time'] if last_call_time is not None: audit_log_delay_delta = now - last_call_time audit_log_delay_seconds = audit_log_delay_delta.total_seconds() audit_log_delay_msecs = round(audit_log_delay_seconds * 1000) AuditLogLoop.write_custom_metric(self['last_call'], audit_log_delay_msecs) def record_call(self, call, operation): """Save the call information in the entity fields.""" self['last_call'] = call self['current_operation'] = json.dumps(operation) self['last_call_time'] = utcnow() self['last_call_event_received'] = False def put(self): """Save audit loop state in the datastore.""" api.CLIENTS.datastore.put(self) class StartAuditLogLoop(auth.AdminRequestHandler): """Web handler to start the audit loop.""" def post(self): """HTTP endpoint to start the audit loop.""" StartAuditLogLoop.start_audit_log() self.response.write('Started') @classmethod def start_audit_log(cls): """Start the audit loop.""" # Ensure we have a custom metric descriptor. metrics = AuditLogLoop.get_custom_metric() if metrics is None: AuditLogLoop.create_custom_metric() # Set AuditLogState's running property to True. state = AuditLogLoop.get_state_entity() state['running'] = True # Check if the resource exists instance = AuditLogLoop.get_test_resource() # If not then we create it. if instance is None: state.insert_test_resource() # If it does, toggle it. Stop if started. elif instance['status'] == 'RUNNING': state.stop_test_resource() # Start if stopped. elif instance['status'] == 'TERMINATED': state.start_test_resource() state.put() class StopAuditLogLoop(auth.AdminRequestHandler): """Web handler to stop the audit loop.""" def post(self): """HTTP endpoint to stop the audit loop.""" StopAuditLogLoop.stop_audit_log() self.response.write('Stopped') @classmethod def stop_audit_log(cls): """Stop the audit log loop.""" state = AuditLogLoop.get_state_entity() state['running'] = False # Check if the resource exists. instance = AuditLogLoop.get_test_resource() # If so then we delete it. if instance is not None: state.delete_test_resource() state.put() class GetAuditLogState(auth.AdminRequestHandler): """Web handler to return audit loop state.""" def get(self): """Returns json for audit loop state.""" state = AuditLogLoop.get_state_entity() self.response.content_type = 'application/json' def datetime_handler(x): if isinstance(x, datetime.datetime): return x.isoformat() raise TypeError("Unknown type") self.response.write(json.dumps(state, default=datetime_handler))