in glam/api/management/commands/import_glean_aggs.py [0:0]
def handle(self, app_id, bucket, *args, **options):
self.gcs_client = storage.Client()
for product in PRODUCT_TO_MODEL.keys():
model = apps.get_model(PRODUCT_TO_MODEL[product])
# Find all files in bucket that match the pattern with provided app_id.
pattern = re.compile(f"glam-extract-{product}_glam_{app_id}-\\d+.csv")
blobs = self.gcs_client.list_blobs(bucket)
blobs = [blob for blob in blobs if pattern.fullmatch(blob.name)]
for blob in blobs:
# Create temp table for data.
tmp_table = "tmp_import_glean_{}".format(app_id)
log(app_id, f"Creating temp table for import: {tmp_table}.")
with connection.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS {tmp_table}")
cursor.execute(
f"CREATE TABLE {tmp_table} (LIKE {model._meta.db_table})"
)
# The incoming CSV files don't have an `id` or `app_id`.
cursor.execute(
f"ALTER TABLE {tmp_table} DROP COLUMN id, DROP COLUMN app_id"
)
# Download CSV file to local filesystem.
fp = tempfile.NamedTemporaryFile()
log(app_id, f"Copying GCS file {blob.name} to local file {fp.name}.")
blob.download_to_filename(fp.name)
# Load CSV into temp table & insert data from temp table into
# aggregation tables, using upserts.
self.import_file(tmp_table, fp, app_id, product)
# Drop temp table and remove file.
log(app_id, "Dropping temp table.")
with connection.cursor() as cursor:
cursor.execute(f"DROP TABLE {tmp_table}")
log(app_id, f"Deleting local file: {fp.name}.")
fp.close()
# Once all files are loaded, refresh the materialized views.
if blobs:
with connection.cursor() as cursor:
view = f"view_{model._meta.db_table}"
log(app_id, f"Refreshing materialized view for {view}")
cursor.execute(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view}")
log(app_id, "Refresh completed.")
LastUpdated.objects.update_or_create(
product=f"fenix-{app_id}", defaults={"last_updated": timezone.now()}
)