sample-client/cloudfunctions/gcs_create/main.py (67 lines of code) (raw):
# Copyright 2019 Google LLC. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
# Any software provided by Google hereunder is distributed "AS IS", WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, and is not intended for production use.
import logging
import os
import re
import requests
import urllib.parse
from common_lib import utils
from common_lib.utils import EVENTS_ENDPOINT
from common_lib.utils import RETENTION_RULES_ENDPOINT
LOGGER = logging.getLogger('sdrs_cf_gcs_create')
LOGGER.setLevel(os.getenv('logLevel'))
RPO_REGEX = re.compile(os.getenv('rpoPattern'))
DELETE_REGEX = re.compile(os.getenv('deleteMarkerPattern'))
def handler(event, context):
event_attributes = event['attributes']
object_id = event_attributes['objectId']
re_match = DELETE_REGEX.search(object_id)
if re_match:
_process_delete(re_match, event_attributes, object_id)
return
re_match = RPO_REGEX.search(object_id)
if re_match:
_process_rpo(re_match, event_attributes, object_id)
return
def _process_delete(re_match, event_attributes, object_id):
"""Makes a request to create an immediate USER type retention job"""
sdrs_request = utils.parse_delete_request(re_match, event_attributes,
object_id)
url = '{}/execution'.format(EVENTS_ENDPOINT)
body = {'target': sdrs_request.data_storage_name,
'projectId': sdrs_request.project_id,
'type': 'USER'}
LOGGER.debug('POST: %s', url)
LOGGER.debug('Body: %s', body)
response = requests.post(url, json=body, headers=utils.get_auth_header())
LOGGER.debug('Response: %s', response.text)
def _process_rpo(re_match, event_attributes, object_id):
"""Find outs if the retention rule needs to be created or updated."""
sdrs_request = utils.parse_rpo_request(re_match, event_attributes,
object_id)
# Check to see if the retention rule already exists
url = '{}?projectId={}&dataStorageName={}&type=DATASET'.format(
RETENTION_RULES_ENDPOINT, sdrs_request.project_id,
urllib.parse.quote_plus(sdrs_request.data_storage_name))
LOGGER.debug('GET: %s', url)
response = requests.get(url, headers=utils.get_auth_header())
if response.status_code == requests.codes.ok:
# A 200 response means the rule already exists, so update it
rule_id = response.json().get('ruleId')
_process_rpo_update(rule_id, sdrs_request.retention_period)
elif response.status_code == 404:
# A 404 response means the rule doesn't exist, so create it
_process_rpo_create(sdrs_request)
else:
LOGGER.error('Unexpected response code %s returned: %s',
response.status_code, response.text)
def _process_rpo_update(rule_id, retention_period):
"""Makes a request to update an existing retention rule."""
# Build the URL for the PUT to update the retention rule
url = '{}/{}'.format(RETENTION_RULES_ENDPOINT, rule_id)
body = {'retentionPeriod': retention_period}
LOGGER.debug('PUT: %s', url)
LOGGER.debug('Body: %s', body)
response = requests.put(url, json=body, headers=utils.get_auth_header())
LOGGER.debug('Response: %s', response.text)
def _process_rpo_create(sdrs_request):
"""Makes a request to create a retention rule."""
body = {'dataStorageName': sdrs_request.data_storage_name,
'projectId': sdrs_request.project_id,
'retentionPeriod': sdrs_request.retention_period,
'type': 'DATASET'}
LOGGER.debug('POST: %s', RETENTION_RULES_ENDPOINT)
LOGGER.debug('Body: %s', body)
response = requests.post(RETENTION_RULES_ENDPOINT, json=body,
headers=utils.get_auth_header())
LOGGER.debug('Response: %s', response.text)