in dbconnectors/BQConnector.py [0:0]
def make_audit_entry(self, source_type, user_grouping, model, question, generated_sql, found_in_vector, need_rewrite, failure_step, error_msg, FULL_LOG_TEXT):
# global FULL_LOG_TEXT
auth_user=get_auth_user()
PROJECT_ID = self.project_id
table_id= PROJECT_ID+ '.' + self.opendataqna_dataset + '.' + self.audit_log_table_name
now = datetime.now()
table_exists=False
client = self.getconn()
df1 = pd.DataFrame(columns=[
'source_type',
'project_id',
'user',
'user_grouping',
'model_used',
'question',
'generated_sql',
'found_in_vector',
'need_rewrite',
'failure_step',
'error_msg',
'execution_time',
'full_log'
])
new_row = {
"source_type":source_type,
"project_id":str(PROJECT_ID),
"user":str(auth_user),
"user_grouping": user_grouping,
"model_used": model,
"question": question,
"generated_sql": generated_sql,
"found_in_vector":found_in_vector,
"need_rewrite":need_rewrite,
"failure_step":failure_step,
"error_msg":error_msg,
"execution_time": now,
"full_log": FULL_LOG_TEXT
}
df1.loc[len(df1)] = new_row
db_schema=[
# Specify the type of columns whose type cannot be auto-detected. For
# example the "title" column uses pandas dtype "object", so its
# data type is ambiguous.
bigquery.SchemaField("source_type", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("project_id", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("user", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("user_grouping", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("model_used", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("question", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("generated_sql", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("found_in_vector", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("need_rewrite", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("failure_step", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("error_msg", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("execution_time", bigquery.enums.SqlTypeNames.TIMESTAMP),
bigquery.SchemaField("full_log", bigquery.enums.SqlTypeNames.STRING),
]
try:
client.get_table(table_id) # Make an API request.
# print("Table {} already exists.".format(table_id))
table_exists=True
except NotFound:
print("Table {} is not found. Will create this log table".format(table_id))
table_exists=False
if table_exists is True:
# print('Performing streaming insert')
errors = client.insert_rows_from_dataframe(table=table_id, dataframe=df1, selected_fields=db_schema) # Make an API request.
if errors == [[]]:
print("Logged the run")
else:
print("Encountered errors while inserting rows: {}".format(errors))
else:
job_config = bigquery.LoadJobConfig(schema=db_schema,write_disposition="WRITE_TRUNCATE")
# pandas_gbq.to_gbq(df1, table_id, project_id=PROJECT_ID) # replace to replace table; append to append to a table
client.load_table_from_dataframe(df1,table_id,job_config=job_config) # replace to replace table; append to append to a table
# df1.loc[len(df1)] = new_row
# pandas_gbq.to_gbq(df1, table_id, project_id=PROJECT_ID, if_exists='append') # replace to replace table; append to append to a table
# print('\n Query added to BQ log table \n')
return 'Completed the logging step'