in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/DataCatalogController.py [0:0]
def apply_entry_config(self, fields, uri, job_uuid, config_uuid, template_uuid, tag_history):
print('** apply_entry_config **')
op_status = constants.SUCCESS
bucket_name, filename = uri
bucket = self.gcs_client.get_bucket(bucket_name)
blob = bucket.get_blob(filename)
entry_group_short_name = bucket_name.replace('-', '_')
entry_group_full_name = 'projects/' + self.template_project + '/locations/' + self.template_region + '/entryGroups/' + bucket_name.replace('-', '_')
# create the entry group
is_entry_group = self.entry_group_exists(entry_group_full_name)
print('is_entry_group: ', is_entry_group)
if is_entry_group != True:
self.create_entry_group(entry_group_short_name)
# generate the entry id, replace '/' with '_' and remove the file extension from the name
entry_id = filename.split('.')[0].replace('/', '_')
try:
entry_name = entry_group_full_name + '/entries/' + entry_id
print('Info: entry_name: ', entry_name)
entry = self.client.get_entry(name=entry_name)
print('Info: entry already exists: ', entry.name)
except Exception as e:
msg = 'Entry does not exist {}'.format(entry_name)
log_error(msg, e, job_uuid)
# populate the entry
entry = datacatalog.Entry()
entry.name = filename
entry.display_name = entry_id
entry.type_ = 'FILESET'
entry.gcs_fileset_spec.file_patterns = ['gs://' + bucket_name + '/' + filename]
entry.fully_qualified_name = 'gs://' + bucket_name + '/' + filename
entry.source_system_timestamps.create_time = datetime.utcnow()
entry.source_system_timestamps.update_time = datetime.utcnow()
# get the file's schema
# download the file to App Engine's tmp directory
tmp_file = '/tmp/' + entry_id
blob.download_to_filename(filename=tmp_file)
# validate that it's a parquet file
try:
parquet.ParquetFile(tmp_file)
except Exception as e:
# not a parquet file, ignore it
msg = 'Error: {} is not a parquet file, ignoring it'.format(filename)
log_error(msg, e, job_uuid)
op_status = constants.ERROR
return op_status
schema = parquet.read_schema(tmp_file, memory_map=True)
df = pd.DataFrame(({"column": name, "datatype": str(pa_dtype)} for name, pa_dtype in zip(schema.names, schema.types)))
df = df.reindex(columns=["column", "datatype"], fill_value=pd.NA)
#print('df: ', df)
for index, row in df.iterrows():
entry.schema.columns.append(
types.ColumnSchema(
column=row['column'],
type_=row['datatype'],
description=None,
mode=None
)
)
# create the entry
#print('entry request: ', entry)
created_entry = self.client.create_entry(parent=entry_group_full_name, entry_id=entry_id, entry=entry)
print('Info: created entry: ', created_entry.name)
# get the number of rows in the file
num_rows = parquet.ParquetFile(tmp_file).metadata.num_rows
#print('num_rows: ', num_rows)
# delete the tmp file ASAP to free up memory
os.remove(tmp_file)
# create the file metadata tag
template_path = self.client.tag_template_path(self.template_project, self.template_region, self.template_id)
tag = datacatalog.Tag()
tag.template = template_path
for field in fields:
if field['field_id'] == 'name':
string_field = datacatalog.TagField()
string_field.string_value = filename
tag.fields['name'] = string_field
field['field_value'] = filename # field_value is used by the BQ exporter
if field['field_id'] == 'bucket':
string_field = datacatalog.TagField()
string_field.string_value = bucket_name
tag.fields['bucket'] = string_field
field['field_value'] = bucket_name # field_value is used by the BQ exporter
if field['field_id'] == 'path':
string_field = datacatalog.TagField()
string_field.string_value = 'gs://' + bucket_name + '/' + filename
tag.fields['path'] = string_field
field['field_value'] = 'gs://' + bucket_name + '/' + filename # field_value is used by the BQ exporter
if field['field_id'] == 'type':
enum_field = datacatalog.TagField()
enum_field.enum_value.display_name = 'PARQUET' # hardcode file extension for now
tag.fields['type'] = enum_field
field['field_value'] = 'PARQUET' # field_value is used by the BQ exporter
if field['field_id'] == 'size':
double_field = datacatalog.TagField()
double_field.double_value = blob.size
tag.fields['size'] = double_field
field['field_value'] = blob.size # field_value is used by the BQ exporter
if field['field_id'] == 'num_rows':
double_field = datacatalog.TagField()
double_field.double_value = num_rows
tag.fields['num_rows'] = double_field
field['field_value'] = num_rows # field_value is used by the BQ exporter
if field['field_id'] == 'created_time':
datetime_field = datacatalog.TagField()
datetime_field.timestamp_value = blob.time_created
tag.fields['created_time'] = datetime_field
field['field_value'] = blob.time_created # field_value is used by the BQ exporter
if field['field_id'] == 'updated_time':
datetime_field = datacatalog.TagField()
datetime_field.timestamp_value = blob.time_created
tag.fields['updated_time'] = datetime_field
field['field_value'] = blob.time_created # field_value is used by the BQ exporter
if field['field_id'] == 'storage_class':
string_field = datacatalog.TagField()
string_field.string_value = blob.storage_class
tag.fields['storage_class'] = string_field
field['field_value'] = blob.storage_class # field_value is used by the BQ exporter
if field['field_id'] == 'content_encoding':
if blob.content_encoding:
string_field = datacatalog.TagField()
string_field.string_value = blob.content_encoding
tag.fields['content_encoding'] = string_field
field['field_value'] = blob.content_encoding # field_value is used by the BQ exporter
if field['field_id'] == 'content_language':
if blob.content_language:
string_field = datacatalog.TagField()
string_field.string_value = blob.content_language
tag.fields['content_language'] = string_field
field['field_value'] = blob.content_language # field_value is used by the BQ exporter
if field['field_id'] == 'media_link':
string_field = datacatalog.TagField()
string_field.string_value = blob.media_link
tag.fields['media_link'] = string_field
field['field_value'] = blob.media_link # field_value is used by the BQ exporter
#print('tag request: ', tag)
created_tag = self.client.create_tag(parent=entry_name, tag=tag)
#print('created_tag: ', created_tag)
if tag_history:
bqu = bq.BigQueryUtils(self.credentials, BIGQUERY_REGION)
template_fields = self.get_template()
bqu.copy_tag(self.tag_creator_account, self.tag_invoker_account, job_uuid, self.template_id, template_fields, '/'.join(uri), None, fields)
return op_status