def handle()

in glam/api/management/commands/import_desktop_aggs.py [0:0]


    def handle(self, bucket, *args, **options):

        channel = options["channel"]
        model = apps.get_model(CHANNEL_TO_MODEL[channel])

        self.gcs_client = storage.Client()
        self.bq_client = bigquery.Client()

        blobs = self.gcs_client.list_blobs(bucket)
        blobs = list(
            filter(lambda b: b.name.startswith(f"aggs-desktop-{channel}"), blobs)
        )

        for blob in blobs:
            # Create temp table for data.
            tmp_table = f"tmp_import_desktop_{channel}"
            log(channel, f"Creating temp table for import: {tmp_table}.")
            with connection.cursor() as cursor:
                cursor.execute(f"DROP TABLE IF EXISTS {tmp_table}")
                cursor.execute(
                    f"CREATE TABLE {tmp_table} (LIKE {model._meta.db_table})"
                )
                cursor.execute(f"ALTER TABLE {tmp_table} DROP COLUMN id")

            # Download CSV file to local filesystem.
            fp = tempfile.NamedTemporaryFile()
            log(channel, f"Copying GCS file {blob.name} to local file {fp.name}.")
            blob.download_to_filename(fp.name)

            #  Load CSV into temp table & insert data from temp table into
            #  aggregation tables, using upserts.
            self.import_file(tmp_table, fp, model, channel)

            #  Drop temp table and remove file.
            log(channel, "Dropping temp table.")
            with connection.cursor() as cursor:
                cursor.execute(f"DROP TABLE {tmp_table}")
            log(channel, f"Deleting local file: {fp.name}.")
            fp.close()

        # Once all files are loaded, refresh the materialized views.

        if blobs:
            with connection.cursor() as cursor:
                view = f"view_{model._meta.db_table}"
                log(channel, f"Refreshing materialized view for {view}")
                cursor.execute(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view}")
                log(channel, "Refresh completed.")

        LastUpdated.objects.update_or_create(
            product="desktop", defaults={"last_updated": timezone.now()}
        )

        # Now import the revisions to match any new build IDs we imported.
        self.import_revisions(channel)