tools/iam-permissions-copier/resources/base.py (138 lines of code) (raw):
# Copyright 2022 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import os
import click
import uuid
class Resource(object):
TEST_ORG = "organizations/{org}".format(org=os.environ.get("TEST_ORG_ID"))
TEST_USER = "user:{email}".format(
email=os.environ.get("TEST_EMAIL")
).lower()
TEST_PROJECT_ID = os.environ.get("TEST_PROJECT_ID")
TEST_BILLING_ACT_ID = os.environ.get("TEST_BILLING_ACT_ID")
RESOURCE_ID_PATTERN = ""
REQUIRED_PERMISSIONS = []
ASSET_TYPE = ""
def __init__(self, resource_id, role, new_member, dry_run=False):
self._role = role
self._dry_run = dry_run
self._new_member = new_member
self._resource_id = resource_id
self._prev_policy_snapshot = {}
self._updated_policy_snapshot = None
def migrate(self):
self.__update_policy_for_resource(self._build_resource_path())
def rollback(self):
resource_path = self._build_resource_path()
updated_policy = self._get_current_policy(resource_path)
if type(updated_policy) is dict and "etag" in updated_policy:
self._prev_policy_snapshot["etag"] = updated_policy["etag"]
if hasattr(updated_policy, "etag"):
if hasattr(self._prev_policy_snapshot, "_properties"):
self._prev_policy_snapshot._properties[
"etag"
] = updated_policy.etag
elif hasattr(self._prev_policy_snapshot, "etag"):
self._prev_policy_snapshot.etag = updated_policy.etag
self._process_updated_iam_policy(
resource_path, self._prev_policy_snapshot
)
click.secho(
"ROLLED BACK BINDING ON {resource}".format(
resource=self._resource_id
),
bg="black",
fg="red",
)
def _get_policy_permissions(self):
return []
def verify_permissions(self):
try:
returnedPermissions = self._get_policy_permissions()
matches = set(self.REQUIRED_PERMISSIONS) == set(returnedPermissions)
if matches:
return click.secho(
"Permissions verified for {resource}".format(
resource=self._resource_id
),
)
except:
pass
raise SystemExit(
"ERROR: Permissions not enough to modify {resource}".format(
resource=self._resource_id
)
)
@staticmethod
def get_test_instance_name():
return "int-test-{val}".format(val=uuid.uuid4().hex)[:20]
def rollback_test_instance(self):
self.delete_test_instance()
click.secho(
"DELETED {resource}".format(resource=self._resource_id),
bg="black",
fg="red",
)
click.secho("".join(map(lambda x: x * 20, "-")))
@classmethod
def get_test_instance(cls, resource, role):
click.secho(
"CREATED NEW {resource}".format(resource=resource),
bg="black",
fg="green",
)
return cls(
resource,
role,
cls.TEST_USER,
)
def _parsed_resource_id(self):
match = re.compile(self.RESOURCE_ID_PATTERN).match(self._resource_id)
if match is None:
raise RuntimeError(
"Unable to parse resource name {name}".format(
name=self._resource_id
)
)
return match.groups()
def _build_resource_path(self):
(resource_path,) = self._parsed_resource_id()
return resource_path
def _get_current_policy(self, resource_path=None):
return self._client().get_iam_policy(
request={"resource": resource_path}
)
def _get_updated_policy(self, resource_path=None):
policy = self._get_current_policy(resource_path)
policy.bindings.add(
role=self._role,
members=[self._new_member],
)
return policy
def _get_role_bindings(self):
policy = self._updated_policy_snapshot
return policy["bindings"] if type(policy) is dict else policy.bindings
def _process_updated_iam_policy(self, resource_path, new_policy):
request = {"resource": resource_path, "policy": new_policy}
return self._client().set_iam_policy(request=request)
def __log_pre_update(self):
click.secho("".join(map(lambda x: x * 20, "-")))
click.secho("UPDATING {resource}".format(resource=self._resource_id))
click.secho(
"NEW_USER => {user}".format(user=self._new_member),
bg="black",
fg="green",
)
click.secho(
"ROLE => {role}".format(role=self._role),
bg="black",
fg="green",
)
click.secho("".join(map(lambda x: x * 20, "-")))
def __update_policy_for_resource(self, resource):
self.__log_pre_update()
self._prev_policy_snapshot = self._get_current_policy(resource)
updated_policy = self._get_updated_policy(resource)
# TODO: if not self._dry_run ?
if self._dry_run is False:
self._updated_policy_snapshot = self._process_updated_iam_policy(
resource, updated_policy
)