in glam/api/management/commands/import_glean_aggs.py [0:0]
def import_file(self, tmp_table, fp, app_id, product):
model = apps.get_model(PRODUCT_TO_MODEL[product])
csv_columns = [
f.name for f in model._meta.get_fields() if f.name not in ["id", "app_id"]
]
log(app_id, "Importing file into temp table.")
with connection.cursor() as cursor:
with open(fp.name, "r") as tmp_file:
sql = f"""
COPY {tmp_table} ({", ".join(csv_columns)}) FROM STDIN WITH CSV
"""
try:
cursor.copy_expert(sql, tmp_file)
except CharacterNotInRepertoire:
log(
app_id,
" Postgres found an invalid character while importing this file.",
)
log(
app_id,
" Attempting to sanitize file by removing invalid rows...",
)
sanitized_tmp_file = self.sanitize_import_file_quickfix_1804769(
fp.name
)
with open(sanitized_tmp_file, "r") as s_temp_file:
cursor.copy_expert(sql, s_temp_file)
log(app_id, " Inserting data from temp table into aggregation tables.")
with connection.cursor() as cursor:
sql = f"""
INSERT INTO {model._meta.db_table} (app_id, {", ".join(csv_columns)})
SELECT '{app_id}', * from {tmp_table}
ON CONFLICT ON CONSTRAINT {model._meta.constraints[0].name}
DO UPDATE SET
total_users = EXCLUDED.total_users,
histogram = EXCLUDED.histogram,
percentiles = EXCLUDED.percentiles,
total_sample = EXCLUDED.total_sample
"""
cursor.execute(sql)