dialogflow-cx/vpc-sc-demo/backend/update_utilities.py (85 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities module for update_blueprint.py."""
import logging
import flask
import google.cloud.storage as storage # pylint: disable=consider-using-from-import
import requests
import status_utilities as su
from google.oauth2 import credentials
logger = logging.getLogger(__name__)
def update_service_perimeter_status_inplace( # pylint: disable=inconsistent-return-statements
api, restrict_access, service_perimeter_status
):
"""Update security perimeter status dict; this function operates inplace."""
if not restrict_access:
if "restrictedServices" not in service_perimeter_status["status"]:
return flask.Response(status=200)
if api not in service_perimeter_status["status"]["restrictedServices"]:
return flask.Response(status=200)
service_perimeter_status["status"]["restrictedServices"] = [
service
for service in service_perimeter_status["status"]["restrictedServices"]
if service != api
]
else:
if "restrictedServices" not in service_perimeter_status["status"]:
service_perimeter_status["status"]["restrictedServices"] = [api]
elif api in service_perimeter_status["status"]["restrictedServices"]:
return flask.Response(status=200)
else:
service_perimeter_status["status"]["restrictedServices"].append(api)
def get_service_perimeter_data(request):
"""Get data needed for update_security_perimeter."""
data = su.get_token_and_project(request)
if "response" in data:
return {"response": data["response"]}
access_policy_title = request.args["access_policy_title"]
response = su.get_access_policy_name(
data["token"],
access_policy_title,
data["project_id"],
)
if "response" in response:
return {"response": response["response"]}
data["access_policy_name"] = response["access_policy_name"]
data["restrict_access"] = request.get_json(silent=True)["status"]
return data
def update_security_perimeter(request, api):
"""Update security perimeter."""
data = get_service_perimeter_data(request)
if "response" in data:
return data["response"]
token = data["token"]
project_id = data["project_id"]
restrict_access = data["restrict_access"]
access_policy_name = data["access_policy_name"]
service_perimeter_status = su.get_service_perimeter_status(
token, project_id, access_policy_name
)
response = update_service_perimeter_status_inplace(
api, restrict_access, service_perimeter_status
)
if response:
return response
headers = {}
headers["x-goog-user-project"] = project_id
headers["Authorization"] = f"Bearer {token}"
response = su.get_service_perimeter_data_uri(token, project_id, access_policy_name)
if "response" in response:
return response
service_perimeter_data_uri = response["uri"]
result = requests.patch(
service_perimeter_data_uri,
headers=headers,
json=service_perimeter_status,
params={"updateMask": "status.restrictedServices"},
timeout=10,
)
if result.status_code != 200:
logger.info(
" accesscontextmanager API rejected PATCH request: %s", result.text
)
return flask.Response(status=result.status_code, response=result.text)
return flask.Response(status=200)
def get_cert(token, project_id, bucket):
"""Utility method to get cert file from bucket."""
curr_credentials = credentials.Credentials(token)
bucket_obj = storage.Client(
project=project_id, credentials=curr_credentials
).bucket(bucket)
blob = storage.blob.Blob("server.der", bucket_obj)
return blob.download_as_string()