in src/common/data_mesh/src/data_mesh_types_util.py [0:0]
def get_request_type(spec: Any, **kwargs) -> Any:
"""Returns the request type associated with the given ConfigSpec.
Args:
spec: A data_mesh_types dataclass associated with an API request.
kwargs: Kwargs that are needed to construct specific request types.
CatalogTagTemplate: is_publicly_readable (optional)
CatlogTag:
template_spec: Spec for the associated template definition.
template: String representing the full template resource name.
column (optional): String representing a column being attached.
DataPolicy: parent_policy, data_policy_type (optional)
PolicyTag: parent_policy (optional)
PolicyTaxonomy: activated_policy_types (optional)
Asset: resource_name
Zone: none
Lake: none
"""
try:
if isinstance(spec, data_mesh_types.CatalogTagTemplate):
return _get_catalog_tag_template_as_request_type(spec, **kwargs)
if isinstance(spec, data_mesh_types.CatalogTag):
return _get_catalog_tag_as_request_type(spec, **kwargs)
if isinstance(spec, data_mesh_types.DataPolicy):
return _get_data_policy_spec_as_request_type(spec, **kwargs)
if isinstance(spec, data_mesh_types.PolicyTag):
return _get_policy_tag_spec_as_request_type(spec, **kwargs)
if isinstance(spec, data_mesh_types.PolicyTaxonomy):
return _get_policy_taxonomy_spec_as_request_type(spec, **kwargs)
if isinstance(spec, data_mesh_types.Asset):
return _get_asset_spec_as_request_type(spec, **kwargs)
if isinstance(spec, data_mesh_types.Zone):
return _get_zone_spec_as_request_type(spec)
if isinstance(spec, data_mesh_types.Lake):
return _get_lake_spec_as_request_type(spec)
except:
logging.error("Unable to get request type for spec:\n%s", spec)
raise
raise cortex_exc.NotImplementedCError(f"Type: {type(spec)}")