in client/python/cli/command/catalog_roles.py [0:0]
def execute(self, api: PolarisDefaultApi) -> None:
if self.catalog_roles_subcommand == Subcommands.CREATE:
request = CreateCatalogRoleRequest(
catalog_role=CatalogRole(
name=self.catalog_role_name,
properties=self.properties
)
)
api.create_catalog_role(self.catalog_name, request)
elif self.catalog_roles_subcommand == Subcommands.DELETE:
api.delete_catalog_role(self.catalog_name, self.catalog_role_name)
elif self.catalog_roles_subcommand == Subcommands.GET:
print(api.get_catalog_role(self.catalog_name, self.catalog_role_name).to_json())
elif self.catalog_roles_subcommand == Subcommands.LIST:
if self.principal_role_name:
for catalog_role in api.list_catalog_roles_for_principal_role(
self.principal_role_name, self.catalog_name).roles:
print(catalog_role.to_json())
else:
for catalog_role in api.list_catalog_roles(self.catalog_name).roles:
print(catalog_role.to_json())
elif self.catalog_roles_subcommand == Subcommands.UPDATE:
catalog_role = api.get_catalog_role(self.catalog_name, self.catalog_role_name)
new_properties = catalog_role.properties or {}
# Add or update all entries specified in set_properties
if self.set_properties:
new_properties = {**new_properties, **self.set_properties}
# Remove all keys specified in remove_properties
if self.remove_properties:
for to_remove in self.remove_properties:
new_properties.pop(to_remove, None)
request = UpdateCatalogRoleRequest(
current_entity_version=catalog_role.entity_version,
properties=new_properties
)
api.update_catalog_role(self.catalog_name, self.catalog_role_name, request)
elif self.catalog_roles_subcommand == Subcommands.GRANT:
request = GrantCatalogRoleRequest(
catalog_role=CatalogRole(
name=self.catalog_role_name
),
properties=self.properties
)
api.assign_catalog_role_to_principal_role(self.principal_role_name, self.catalog_name, request)
elif self.catalog_roles_subcommand == Subcommands.REVOKE:
api.revoke_catalog_role_from_principal_role(
self.principal_role_name, self.catalog_name, self.catalog_role_name)
else:
raise Exception(f'{self.catalog_roles_subcommand} is not supported in the CLI')