dataplex/quickstart/quickstart.py (134 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # [START dataplex_quickstart] import time from google.cloud import dataplex_v1 from google.protobuf import struct_pb2 # Method to demonstrate lifecycle of different Dataplex resources and their interactions. # Method creates Aspect Type, Entry Type, Entry Group and Entry, retrieves Entry # and cleans up created resources. def quickstart( project_id: str, location: str, aspect_type_id: str, entry_type_id: str, entry_group_id: str, entry_id: str, ) -> None: # Initialize client that will be used to send requests across threads. This # client only needs to be created once, and can be reused for multiple requests. # After completing all of your requests, call the "__exit__()" method to safely # clean up any remaining background resources. Alternatively, use the client as # a context manager. with dataplex_v1.CatalogServiceClient() as client: # 0) Prepare variables used in following steps global_parent = f"projects/{project_id}/locations/global" specific_location_parent = f"projects/{project_id}/locations/{location}" # 1) Create Aspect Type that will be attached to Entry Type aspect_field = dataplex_v1.AspectType.MetadataTemplate( # The name must follow regex ^(([a-zA-Z]{1})([\\w\\-_]{0,62}))$ # That means name must only contain alphanumeric character or dashes or underscores, # start with an alphabet, and must be less than 63 characters. name="example_field", # Metadata Template is recursive structure, # primitive types such as "string" or "integer" indicate leaf node, # complex types such as "record" or "array" would require nested Metadata Template type="string", index=1, annotations=dataplex_v1.AspectType.MetadataTemplate.Annotations( description="example field to be filled during entry creation" ), constraints=dataplex_v1.AspectType.MetadataTemplate.Constraints( # Specifies if field will be required in Aspect Type. required=True ), ) aspect_type = dataplex_v1.AspectType( description="aspect type for dataplex quickstart", metadata_template=dataplex_v1.AspectType.MetadataTemplate( name="example_template", type="record", # Aspect Type fields, that themselves are Metadata Templates. record_fields=[aspect_field], ), ) aspect_type_create_operation = client.create_aspect_type( # Aspect Type is created in "global" location to highlight, that resources from # "global" region can be attached to Entry created in specific location parent=global_parent, aspect_type=aspect_type, aspect_type_id=aspect_type_id, ) created_aspect_type = aspect_type_create_operation.result(60) print(f"Step 1: Created aspect type -> {created_aspect_type.name}") # 2) Create Entry Type, of which type Entry will be created entry_type = dataplex_v1.EntryType( description="entry type for dataplex quickstart", required_aspects=[ dataplex_v1.EntryType.AspectInfo( # Aspect Type created in step 1 type=f"projects/{project_id}/locations/global/aspectTypes/{aspect_type_id}" ) ], ) entry_type_create_operation = client.create_entry_type( # Entry Type is created in "global" location to highlight, that resources from # "global" region can be attached to Entry created in specific location parent=global_parent, entry_type=entry_type, entry_type_id=entry_type_id, ) created_entry_type = entry_type_create_operation.result(60) print(f"Step 2: Created entry type -> {created_entry_type.name}") # 3) Create Entry Group in which Entry will be located entry_group = dataplex_v1.EntryGroup( description="entry group for dataplex quickstart" ) entry_group_create_operation = client.create_entry_group( # Entry Group is created for specific location parent=specific_location_parent, entry_group=entry_group, entry_group_id=entry_group_id, ) created_entry_group = entry_group_create_operation.result(60) print(f"Step 3: Created entry group -> {created_entry_group.name}") # 4) Create Entry # Wait 10 second to allow previously created resources to propagate time.sleep(10) aspect_key = f"{project_id}.global.{aspect_type_id}" entry = dataplex_v1.Entry( # Entry is an instance of Entry Type created in step 2 entry_type=f"projects/{project_id}/locations/global/entryTypes/{entry_type_id}", entry_source=dataplex_v1.EntrySource( description="entry for dataplex quickstart" ), aspects={ # Attach Aspect that is an instance of Aspect Type created in step 1 aspect_key: dataplex_v1.Aspect( aspect_type=f"projects/{project_id}/locations/global/aspectTypes/{aspect_type_id}", data=struct_pb2.Struct( fields={ "example_field": struct_pb2.Value( string_value="example value for the field" ), } ), ) }, ) created_entry = client.create_entry( # Entry is created in specific location, but it is still possible to link it with # resources (Aspect Type and Entry Type) from "global" location parent=f"projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}", entry=entry, entry_id=entry_id, ) print(f"Step 4: Created entry -> {created_entry.name}") # 5) Retrieve created Entry get_entry_request = dataplex_v1.GetEntryRequest( name=f"projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}/entries/{entry_id}", view=dataplex_v1.EntryView.FULL, ) retrieved_entry = client.get_entry(request=get_entry_request) print(f"Step 5: Retrieved entry -> {retrieved_entry.name}") for retrieved_aspect in retrieved_entry.aspects.values(): print("Retrieved aspect for entry:") print(f" * aspect type -> {retrieved_aspect.aspect_type}") print(f" * aspect field value -> {retrieved_aspect.data['example_field']}") # 6) Use Search capabilities to find Entry # Wait 30 second to allow resources to propagate to Search print("Step 6: Waiting for resources to propagate to Search...") time.sleep(30) search_entries_request = dataplex_v1.SearchEntriesRequest( name=global_parent, query="name:dataplex-quickstart-entry" ) results = client.search_entries(search_entries_request) search_entries_response = results._response entries_from_search = [ result.dataplex_entry for result in search_entries_response.results ] print("Entries found in Search:") # Please note in output that Entry Group and Entry Type are also represented as Entries for entry_from_search in entries_from_search: print(f" * {entry_from_search.name}") # 7) Clean created resources client.delete_entry_group( name=f"projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}" ) client.delete_entry_type( name=f"projects/{project_id}/locations/global/entryTypes/{entry_type_id}" ) client.delete_aspect_type( name=f"projects/{project_id}/locations/global/aspectTypes/{aspect_type_id}" ) print("Step 7: Successfully cleaned up resources") if __name__ == "__main__": # TODO(developer): Replace these variables before running the sample. project_id = "MY_PROJECT_ID" # Available locations: https://cloud.google.com/dataplex/docs/locations location = "MY_LOCATION" # Variables below can be replaced with custom values or defaults can be kept aspect_type_id = "dataplex-quickstart-aspect-type" entry_type_id = "dataplex-quickstart-entry-type" entry_group_id = "dataplex-quickstart-entry-group" entry_id = "dataplex-quickstart-entry" quickstart( project_id, location, aspect_type_id, entry_type_id, entry_group_id, entry_id ) # [END dataplex_quickstart]