in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/BigQueryUtils.py [0:0]
def insert_history_row(self, tag_creator_account, tag_invoker_account, job_uuid, table_id, asset_name, tagged_values):
success = True
row = {'event_time': datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f') + ' UTC', 'asset_name': asset_name,
'tag_creator_account': tag_creator_account, 'tag_invoker_account': tag_invoker_account, 'job_uuid': job_uuid}
for tagged_value in tagged_values:
#print('tagged_value: ' + str(tagged_value))
if 'field_value' not in tagged_value:
continue
if tagged_value['field_value'] == '':
continue
if isinstance(tagged_value['field_value'], decimal.Decimal):
row[tagged_value['field_id']] = float(tagged_value['field_value'])
elif isinstance(tagged_value['field_value'], datetime.datetime) or isinstance(tagged_value['field_value'], datetime.date):
row[tagged_value['field_id']] = tagged_value['field_value'].isoformat()
else:
row[tagged_value['field_id']]= json.dumps(tagged_value['field_value'], default=str)
row[tagged_value['field_id']]= tagged_value['field_value']
#print('insert row: ' + str(row))
row_to_insert = [row,]
try:
status = self.client.insert_rows_json(table_id, row_to_insert)
if len(status) > 0:
print('Inserted row into tag history table. Return status: ', status)
else:
print('Inserted row into tag history table.')
except Exception as e:
print('Error while writing to tag history table:', e)
if '404' in str(e):
# table isn't quite ready to be written to
print('Tag history table not ready to be written to. Sleeping for 5 seconds.')
time.sleep(5)
try:
status = self.client.insert_rows_json(table_id, row_to_insert)
print('Retrying insert row into tag history table. Return status: ', status)
except Exception as e:
print('Error occurred while writing to tag history table: {}'.format(e))
success = False
return success