in src/package/dataplexutils/metadata/wizard.py [0:0]
def _update_column_metadata_as_regenerated(self, table_fqn,column_name):
"""Add stringdocs
Args:
Add stringdocs
Raises:
Add stringdocs
"""
# Create a client
client = self._cloud_clients[constants["CLIENTS"]["DATAPLEX_CATALOG"]]
#client = dataplex_v1.CatalogServiceClient()
logger.info(f"Updating column {column_name} in table {table_fqn} as regenerated")
try:
new_aspect = dataplex_v1.Aspect()
aspect_type = f"""projects/{self._project_id}/locations/global/aspectTypes/{constants["ASPECT_TEMPLATE"]["name"]}"""
aspect_name=f"""{self._project_id}.global.{constants["ASPECT_TEMPLATE"]["name"]}@Schema.{column_name}"""
aspect_types = [aspect_type]
logger.info(f"aspect_type: {aspect_type}")
except Exception as e:
logger.error(f"Failed to create new aspect")
logger.error(f"Exception: {e}.")
raise e
project_id, dataset_id, table_id = self._split_table_fqn(table_fqn)
entry = dataplex_v1.Entry()
entry.name = f"projects/{project_id}/locations/{self._get_dataset_location(table_fqn)}/entryGroups/@bigquery/entries/bigquery.googleapis.com/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
#entry.aspects[f"""{project_id}.global.{constants["ASPECT_TEMPLATE"]["name"]}"""] = aspect
# Check if the aspect already exists
try:
get_request=dataplex_v1.GetEntryRequest(name=entry.name,view=dataplex_v1.EntryView.CUSTOM,aspect_types=aspect_types)
entry = client.get_entry(request=get_request)
#logger.info(f"Found entry: {entry}")
except Exception as e:
logger.error(f"Exception: {e}.")
raise e
data_struct = struct_pb2.Struct()
try:
for i in entry.aspects:
if i.endswith(f"""global.{constants["ASPECT_TEMPLATE"]["name"]}@Schema.{column_name}""") and entry.aspects[i].path==f"Schema.{column_name}":
logger.info(f"**********Updating new aspect {i} with old_values")
new_aspect.data=entry.aspects[i].data
new_aspect.path=f"Schema.{column_name}"
new_aspect.data.update({
"generation-date" : datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
"to-be-regenerated" : "false"
}
)
logger.info(f"entry.aspects[aspect_name].data: {entry.aspects[i].data}")
logger.info(f"new_aspect.data: {new_aspect.data}")
except Exception as e:
logger.error(f"Failed to assign data to new aspect copy")
logger.error(f"Exception: {e}.")
raise e
new_entry=dataplex_v1.Entry()
new_entry.name=entry.name
new_entry.aspects[aspect_name]=new_aspect
# Initialize request argument(s)
request = dataplex_v1.UpdateEntryRequest(
entry=new_entry,
update_mask=field_mask_pb2.FieldMask(paths=["aspects"]),
allow_missing=False,
aspect_keys=[aspect_name]
)
# Make the request
try:
response = client.update_entry(request=request)
print( f"Aspect created: {response.name}")
return True
except Exception as e:
print(f"Failed to create aspect: {e}")
return False
return True