samcli/lib/schemas/schemas_api_caller.py (269 lines of code) (raw):

"""To isolate Schemas API calls""" import json import logging from json import JSONDecodeError from botocore.exceptions import ClientError, EndpointConnectionError from samcli.commands.exceptions import SchemasApiException from samcli.commands.local.cli_common.user_exceptions import ( NotAvailableInRegion, ResourceNotFound, ) from samcli.lib.schemas.schemas_constants import DEFAULT_EVENT_DETAIL_TYPE, DEFAULT_EVENT_SOURCE from samcli.lib.schemas.schemas_directory_hierarchy_builder import get_package_hierarchy, sanitize_name SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR = ( "EventBridge Schemas are not available in provided region. Please check AWS doc for Schemas supported regions." ) LOG = logging.getLogger(__name__) class SchemasApiCaller: def __init__(self, schemas_client): self._schemas_client = schemas_client def list_registries(self, next_token=None, limit=10): """ Calls schemas service to get list of schema registries. Parameters ---------- next_token: Continuation token limit: Number of items tro fetch Returns ------- List of registries available """ if limit is None: limit = 10 registries = [] try: paginator = self._schemas_client.get_paginator("list_registries") page_iterator = paginator.paginate( PaginationConfig={"StartingToken": next_token, "MaxItems": limit, "PageSize": limit} ) page = None for page in page_iterator: for registry in page["Registries"]: registries.append(registry["RegistryName"]) if not registries: raise ResourceNotFound("No Registries found. This should not be possible, please raise an issue.") next_token = page.get("NextToken", None) return {"registries": registries, "next_token": next_token} except EndpointConnectionError as ex: raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex def list_schemas(self, registry_name, next_token=None, limit=10): """ Calls schemas service to get list of schemas for given registry. Parameters ---------- registry_name: Name of the registry next_token: Continuation token limit: Number of items to fetch Returns ------- List of Schemas available for given registry """ schemas = [] try: paginator = self._schemas_client.get_paginator("list_schemas") page_iterator = paginator.paginate( RegistryName=registry_name, PaginationConfig={"StartingToken": next_token, "MaxItems": limit, "PageSize": limit}, ) page = None for page in page_iterator: for schema in page["Schemas"]: schemas.append(schema["SchemaName"]) if not schemas: raise ResourceNotFound("No Schemas found for registry %s" % registry_name) next_token = page.get("NextToken", None) return {"schemas": schemas, "next_token": next_token} except EndpointConnectionError as ex: raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex def list_schema_versions(self, registry_name, schema_name): """ Calls schemas service to list all schema versions. Parameters ---------- registry_name: Registry name schema_name: Schema Name Returns ------- List of Schema versions """ versions = [] next_token = None try: while True: paginator = self._schemas_client.get_paginator("list_schema_versions") page_iterator = paginator.paginate( RegistryName=registry_name, SchemaName=schema_name, PaginationConfig={"StartingToken": next_token} ) page = None for page in page_iterator: for version in page["SchemaVersions"]: versions.append(version["SchemaVersion"]) next_token = page.get("NextToken") if next_token is None: break except EndpointConnectionError as ex: raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex versions.sort(key=int) return versions def get_latest_schema_version(self, registry_name, schema_name): """ Calls schemas service to get schema latest version. Parameters ---------- registry_name: Registry name schema_name: Schema Name Returns ------- Latest Schema version """ versions = self.list_schema_versions(registry_name, schema_name) return versions[-1] def get_schema_metadata(self, registry_name, schema_name): """ Calls schemas service to get schema metadata. Parameters ---------- registry_name: Registry Name schema_name: Schema Name Returns ------- Schema metadata """ try: describe_schema_response = self._schemas_client.describe_schema( RegistryName=registry_name, SchemaName=schema_name ) except EndpointConnectionError as ex: raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex try: content = json.loads(describe_schema_response["Content"]) schemas = content["components"]["schemas"] # setting default values event_source = DEFAULT_EVENT_SOURCE event_source_detail_type = DEFAULT_EVENT_DETAIL_TYPE schema_root_name = sanitize_name(list(schemas.keys())[0]) schemas_package_hierarchy = get_package_hierarchy(schema_name) if schemas.get("AWSEvent") is not None: aws_event = schemas.get("AWSEvent") if aws_event.get("x-amazon-events-source") is not None: event_source = aws_event.get("x-amazon-events-source") if aws_event.get("x-amazon-events-detail-type") is not None: event_source_detail_type = aws_event.get("x-amazon-events-detail-type") possible_root_schema_name = aws_event["properties"]["detail"]["$ref"] schema_root_name = sanitize_name(possible_root_schema_name[len("#/components/schemas/") :]) return { "event_source": event_source, "event_source_detail_type": event_source_detail_type, "schema_root_name": schema_root_name, "schemas_package_hierarchy": schemas_package_hierarchy, } except JSONDecodeError as ex: raise SchemasApiException( "Parse error reading the content from Schemas response. " "This should not be possible, please raise an issue." ) from ex def download_source_code_binding(self, runtime, registry_name, schema_name, schema_version, download_location): """ Calls schemas service to download code binding for given schema in download_location. Parameters ---------- runtime: Code binding runtime e.g: Java, Python, Go registry_name: Registry Name schema_name: Schema Name schema_version: Schema version for which code binding needs to be downloaded download_location: Location at which code binding should be downloaded """ try: response = self._schemas_client.get_code_binding_source( Language=runtime, RegistryName=registry_name, SchemaName=schema_name, SchemaVersion=schema_version ) except EndpointConnectionError as ex: raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex for data in response["Body"]: download_location.write(data) def put_code_binding(self, runtime, registry_name, schema_name, schema_version): """ Calls schemas service to generate code binding for given schema. Parameters ---------- runtime: Code binding runtime e.g: Java, Python, Go registry_name: Registry Name schema_name: Schema Name schema_version: Schema version for which code binding needs to be generated """ try: self._schemas_client.put_code_binding( Language=runtime, RegistryName=registry_name, SchemaName=schema_name, SchemaVersion=schema_version ) except EndpointConnectionError as ex: raise NotAvailableInRegion( "EventBridge Schemas are not available in provided region. " "Please check AWS doc for Schemas supported regions." ) from ex except ClientError as e: if e.response["Error"]["Code"] != "ConflictException": raise e def poll_for_code_binding_status(self, schemas_runtime, registry_name, schema_name, schema_version): """ Calls schemas service and wait for code binding to be generated. Parameters ---------- schemas_runtime: Code binding runtime e.g: Java, Python, Go registry_name: Registry Name schema_name: Schema Name schema_version: Schema version """ try: waiter = self._schemas_client.get_waiter("code_binding_exists") waiter.wait( Language=schemas_runtime, RegistryName=registry_name, SchemaName=schema_name, SchemaVersion=schema_version, ) except EndpointConnectionError as ex: raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex def discover_schema(self, event_data: str, schema_type: str) -> str: """ Returns a schema based on an event using the DiscoverSchema API Parameters ---------- event_data: A JSON test event as a string schema_type: Type of the schema to generate ("OpenApi3" or "JSONSchemaDraft4") Returns ------- Generated schema JSON as a string """ try: LOG.debug("Discover schema from contents: '%s'.", event_data) schema = self._schemas_client.get_discovered_schema(Events=[event_data], Type=schema_type) return str(schema["Content"]) except EndpointConnectionError as ex: LOG.error("Failure calling get_discovered_schema") raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex def create_schema(self, schema: str, registry_name: str, schema_name: str, schema_type: str): """ Creates a new schema in the specified registry Parameters ---------- schema: Contents for the schema to be created registry_name: The registry the schema will be created in schema_name: The name for the new created schema schema_type: Type of the schema to generate ("OpenApi3" or "JSONSchemaDraft4") """ try: LOG.debug("Creating schema %s on registry %s.", schema_name, registry_name) self._schemas_client.create_schema( Content=schema, RegistryName=registry_name, SchemaName=schema_name, Type=schema_type ) return True except EndpointConnectionError as ex: LOG.error("Failure calling create_schema in registry %s for schema %s", registry_name, schema_name) raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex def update_schema(self, schema: str, registry_name: str, schema_name: str, schema_type: str): """ Updates an existing schema Parameters ---------- schema: Contents for the updated schema registry_name: The registry of the schema that will be updated schema_name: The name of the schema to be updated schema_type: Type of the schema to generate ("OpenApi3" or "JSONSchemaDraft4") """ try: LOG.debug("Updating schema %s on registry %s.", schema_name, registry_name) self._schemas_client.update_schema( Content=schema, RegistryName=registry_name, SchemaName=schema_name, Type=schema_type ) return True except ClientError as ex: error_message: str = ex.response.get("Message", "") # type: ignore if ex.response.get("Code") == "Conflict" and "No change since the previous" in error_message: # Nothing to update LOG.debug("No changes to the schema from the previous version") return True raise ex except EndpointConnectionError as ex: LOG.error("Failure calling update_schema in registry %s for schema %s", registry_name, schema_name) raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex def get_schema(self, registry_name: str, schema_name: str) -> str: """ Gets a schema from the registry Parameters ---------- registry_name: The registry of the schema that will be updated schema_name: The name of the schema to be updated Returns ------- A schema dict """ try: LOG.debug("Describing schema %s on registry %s.", schema_name, registry_name) schema = self._schemas_client.describe_schema(RegistryName=registry_name, SchemaName=schema_name) return str(schema["Content"]) except ClientError as ex: if ex.response.get("Error", {}).get("Code") != "NotFoundException": LOG.error( "%s error calling describe_schema in registry %s for schema %s", ex.response.get("Error", {}).get("Code"), registry_name, schema_name, ) raise ex LOG.debug("Schema %s doesn't exist", schema_name) return "" except EndpointConnectionError as ex: LOG.error("Failure calling describe_schema in registry %s for schema %s", registry_name, schema_name) raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex def check_registry_exists(self, registry_name: str) -> bool: """ Gets a registry with the specified name Parameters ---------- registry_name: The name of the registry to fetch Returns ------- The specified registry, or None if it does not exist """ try: LOG.debug("Describing registry %s.", registry_name) self._schemas_client.describe_registry(RegistryName=registry_name) return True # If it didn't raise an exception, then it exists except ClientError as ex: if ex.response.get("Error", {}).get("Code") != "NotFoundException": LOG.error( "%s error calling describe_registry in registry %s", ex.response.get("Error", {}).get("Code"), registry_name, ) raise ex LOG.debug("Registry %s doesn't exist", registry_name) except EndpointConnectionError as ex: LOG.error("Failure calling describe_registry in registry %s", registry_name) raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex return False def create_registry(self, registry_name: str): """ Creates a new registry with the specified name Parameters ---------- registry_name: The name of the registry to be created """ try: LOG.debug("Creating registry %s.", registry_name) self._schemas_client.create_registry(RegistryName=registry_name) return True except ClientError as ex: if ex.response.get("Error", {}).get("Code") != "ConflictException": LOG.error( "%s error calling create_registry for registry %s", ex.response.get("Error", {}).get("Code"), registry_name, ) raise ex LOG.debug("Registry %s already exists", registry_name) except EndpointConnectionError as ex: LOG.error("Failure calling create_registry for registry %s", registry_name) raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex return False def delete_schema(self, registry_name, schema_name) -> bool: """ Deletes a schema from the EBSR Parameters ---------- registry_name: The registry that contains the schema that will be deleted schema_name: The name of the schema to be deleted """ try: LOG.debug("Deleting schema %s on registry %s.", schema_name, registry_name) self._schemas_client.delete_schema(RegistryName=registry_name, SchemaName=schema_name) return True except ClientError as ex: if ex.response.get("Error", {}).get("Code") != "NotFoundException": LOG.error( "%s error when calling delete_delete schema with %s schema in %s registry", ex.response.get("Error", {}).get("Code"), schema_name, registry_name, ) raise ex LOG.debug("Schema %s doesn't exist so it couldn't be deleted", schema_name) except EndpointConnectionError as ex: LOG.error("Failure calling delete_schema for schema %s in registry %s", schema_name, registry_name) raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex return False def delete_version(self, registry_name, schema_name, schema_version: str): """ Delete a version of a schema Parameters ---------- registry_name: The registry that contains the schema schema_name: The name of the schema schema_version: Version to be deleted """ try: LOG.debug("Deleting version %s of schema %s on registry %s.", schema_version, schema_name, registry_name) self._schemas_client.delete_schema_version( RegistryName=registry_name, SchemaName=schema_name, SchemaVersion=schema_version, ) return True except ClientError as ex: if ex.response.get("Error", {}).get("Code") != "NotFoundException": raise ex LOG.debug("Schema version %s of %s doesn't exist so it couldn't be deleted", schema_version, schema_name) except EndpointConnectionError as ex: LOG.error("Error when calling limit_versions") raise NotAvailableInRegion(SCHEMAS_NOT_AVAILABLE_IN_REGION_ERROR) from ex return False