in src/package/dataplexutils/metadata/wizard.py [0:0]
def _get_column_comment(self,table_fqn, column_name,comment_number=None):
"""Return comment for coolumn. if comment_number is None return all comments,
if comment_number is an integer return the n-th comment
Args:
table_fqn: table FQN
column_name: column name
comment_number: comment number
Raises:
Add stringdocs
"""
from typing import MutableSequence
# Create a client
client = self._cloud_clients[constants["CLIENTS"]["DATAPLEX_CATALOG"]]
client = dataplex_v1.CatalogServiceClient()
aspect_types = [f"""projects/{self._project_id}/locations/global/aspectTypes/{constants["ASPECT_TEMPLATE"]["name"]}"""]
# Create the aspect
project_id, dataset_id, table_id = self._split_table_fqn(table_fqn)
entry_name = f"projects/{project_id}/locations/{self._get_dataset_location(table_fqn)}/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
aspect=dataplex_v1.Aspect()
request=dataplex_v1.GetEntryRequest(name=entry_name,view=dataplex_v1.EntryView.CUSTOM,aspect_types=aspect_types)
overview=None
comments=[]
try:
entry = client.get_entry(request=request)
except Exception as e:
logger.error(f"Exception: {e}.")
raise e
comments=[]
for aspect in entry.aspects:
logger.info(f"aspect: {aspect}")
aspect= entry.aspects[aspect]
logger.info(f"aspect.aspect_type: {aspect.aspect_type}")
logger.info(f"aspect.path: {aspect.path}")
if aspect.aspect_type.endswith(f"""aspectTypes/{constants["ASPECT_TEMPLATE"]["name"]}""") and aspect.path==f"Schema.{column_name}":
for i in aspect.data:
if i == "human-comments":
if comment_number is None:
comments.extend(aspect.data[i])
else:
comments.append(aspect.data[i][comment_number])
logger.info(f"comments: {comments}")
return comments