in glam/api/management/commands/import_desktop_aggs.py [0:0]
def handle(self, bucket, *args, **options):
channel = options["channel"]
model = apps.get_model(CHANNEL_TO_MODEL[channel])
self.gcs_client = storage.Client()
self.bq_client = bigquery.Client()
blobs = self.gcs_client.list_blobs(bucket)
blobs = list(
filter(lambda b: b.name.startswith(f"aggs-desktop-{channel}"), blobs)
)
for blob in blobs:
# Create temp table for data.
tmp_table = f"tmp_import_desktop_{channel}"
log(channel, f"Creating temp table for import: {tmp_table}.")
with connection.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS {tmp_table}")
cursor.execute(
f"CREATE TABLE {tmp_table} (LIKE {model._meta.db_table})"
)
cursor.execute(f"ALTER TABLE {tmp_table} DROP COLUMN id")
# Download CSV file to local filesystem.
fp = tempfile.NamedTemporaryFile()
log(channel, f"Copying GCS file {blob.name} to local file {fp.name}.")
blob.download_to_filename(fp.name)
# Load CSV into temp table & insert data from temp table into
# aggregation tables, using upserts.
self.import_file(tmp_table, fp, model, channel)
# Drop temp table and remove file.
log(channel, "Dropping temp table.")
with connection.cursor() as cursor:
cursor.execute(f"DROP TABLE {tmp_table}")
log(channel, f"Deleting local file: {fp.name}.")
fp.close()
# Once all files are loaded, refresh the materialized views.
if blobs:
with connection.cursor() as cursor:
view = f"view_{model._meta.db_table}"
log(channel, f"Refreshing materialized view for {view}")
cursor.execute(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view}")
log(channel, "Refresh completed.")
LastUpdated.objects.update_or_create(
product="desktop", defaults={"last_updated": timezone.now()}
)
# Now import the revisions to match any new build IDs we imported.
self.import_revisions(channel)