in src/package/dataplexutils/metadata/wizard.py [0:0]
def _create_aspect_type(self, aspect_type_id: str):
"""Creates a new aspect type in Dataplex catalog.
Args:
aspect_type_id (str): The ID to use for the new aspect type
Raises:
Exception: If there is an error creating the aspect type
"""
# Create a client
client = self._cloud_clients[constants["CLIENTS"]["DATAPLEX_CATALOG"]]
# Initialize request argument(s)
aspect_type = dataplex_v1.AspectType()
full_metadata_template = {
"type_": constants["ASPECT_TEMPLATE"]["type_"],
"name": constants["ASPECT_TEMPLATE"]["name"],
"record_fields": constants["record_fields"]
}
import json
print("Will deploy following template:")
print(json.dumps(full_metadata_template))
metadata_template = dataplex_v1.AspectType.MetadataTemplate(full_metadata_template)
print("Will deploy following template:" + str(metadata_template))
aspect_type.metadata_template = metadata_template
aspect_type.display_name = constants["ASPECT_TEMPLATE"]["display_name"]
request = dataplex_v1.CreateAspectTypeRequest(
parent=f"projects/{self._project_id}/locations/global",
aspect_type_id = aspect_type_id,
aspect_type=aspect_type,
)
# Make the request
try:
operation = client.create_aspect_type(request=request)
except Exception as e:
logger.error(f"Failed to create aspect type: {e}")
raise e