in src/package/dataplexutils/metadata/wizard.py [0:0]
def _update_table_dataplex_description(self, table_fqn, description):
"""Add stringdocs
Args:
Add stringdocs
Raises:
Add stringdocs
"""
# Create a client
project_id, dataset_id, table_id = self._split_table_fqn(table_fqn)
client = self._cloud_clients[constants["CLIENTS"]["DATAPLEX_CATALOG"]]
client = dataplex_v1.CatalogServiceClient()
entry_name = f"projects/{project_id}/locations/{self._get_dataset_location(table_fqn)}/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
aspect_type = f"""projects/dataplex-types/locations/global/aspectTypes/overview"""
aspect_types = [aspect_type]
old_overview=None
aspect_content=None
try:
request=dataplex_v1.GetEntryRequest(name=entry_name,view=dataplex_v1.EntryView.CUSTOM,aspect_types=aspect_types)
current_entry = client.get_entry(request=request)
for i in current_entry.aspects:
if i.endswith(f"""global.overview""") and current_entry.aspects[i].path=="":
# Start of Selection
from google.protobuf.json_format import MessageToDict,ParseDict
logger.info(f"Reading existing aspect {i} of table {table_fqn}")
old_overview = dict(current_entry.aspects[i].data)
logger.info(f"""old_overview: {old_overview["content"]}""")
except Exception as e:
logger.error(f"Exception: {e}.")
raise e
# Create the aspect
aspect = dataplex_v1.Aspect()
aspect.aspect_type = aspect_type
aspect_content={}
#aspect.aspect_type = f"{project_id}/global/{aspect_type_id}"
if old_overview is not None and self._client_options._add_ai_warning==True:
logging.info(f"""if condidion for old_overview met""")
#aspect_content = old_overview
logging.info(f"""aspect_content: {aspect_content}""")
try:
# Start of Selection
index = old_overview["content"].index(constants["OUTPUT_CLAUSES"]["AI_WARNING"])
aspect_content["content"] = old_overview["content"][:index] + description
except Exception as e:
logging.error(f"""Exception: {e}""")
aspect_content["content"] = f"""{old_overview["content"]}\n{description}"""
else:
aspect_content = {"content": description }
logging.info(f"""aspect_content: {aspect_content}""")
# Convert aspect_content to a Struct
data_struct = struct_pb2.Struct()
data_struct.update(aspect_content)
aspect.data = data_struct
overview_path = f"dataplex-types.global.overview"
print(f"project_id: {project_id}, dataset_id: {dataset_id}, table_id: {table_id}")
entry = dataplex_v1.Entry()
entry.name = entry_name
entry.aspects[overview_path]= aspect
# Initialize request argument(s)
request = dataplex_v1.UpdateEntryRequest(
entry=entry,
update_mask=field_mask_pb2.FieldMask(paths=["aspects"]),
)
# Make the request
try:
response = client.update_entry(request=request)
print( f"Aspect created: {response.name}")
return True
except Exception as e:
print(f"Failed to create aspect: {e}")
return False