def handle()

in glam/api/management/commands/import_glean_aggs.py [0:0]


    def handle(self, app_id, bucket, *args, **options):
        self.gcs_client = storage.Client()
        for product in PRODUCT_TO_MODEL.keys():
            model = apps.get_model(PRODUCT_TO_MODEL[product])
            # Find all files in bucket that match the pattern with provided app_id.
            pattern = re.compile(f"glam-extract-{product}_glam_{app_id}-\\d+.csv")
            blobs = self.gcs_client.list_blobs(bucket)
            blobs = [blob for blob in blobs if pattern.fullmatch(blob.name)]

            for blob in blobs:
                # Create temp table for data.
                tmp_table = "tmp_import_glean_{}".format(app_id)
                log(app_id, f"Creating temp table for import: {tmp_table}.")
                with connection.cursor() as cursor:
                    cursor.execute(f"DROP TABLE IF EXISTS {tmp_table}")
                    cursor.execute(
                        f"CREATE TABLE {tmp_table} (LIKE {model._meta.db_table})"
                    )
                    # The incoming CSV files don't have an `id` or `app_id`.
                    cursor.execute(
                        f"ALTER TABLE {tmp_table} DROP COLUMN id, DROP COLUMN app_id"
                    )

                # Download CSV file to local filesystem.
                fp = tempfile.NamedTemporaryFile()
                log(app_id, f"Copying GCS file {blob.name} to local file {fp.name}.")
                blob.download_to_filename(fp.name)

                #  Load CSV into temp table & insert data from temp table into
                #  aggregation tables, using upserts.
                self.import_file(tmp_table, fp, app_id, product)

                #  Drop temp table and remove file.
                log(app_id, "Dropping temp table.")
                with connection.cursor() as cursor:
                    cursor.execute(f"DROP TABLE {tmp_table}")
                log(app_id, f"Deleting local file: {fp.name}.")
                fp.close()

            # Once all files are loaded, refresh the materialized views.

            if blobs:
                with connection.cursor() as cursor:
                    view = f"view_{model._meta.db_table}"
                    log(app_id, f"Refreshing materialized view for {view}")
                    cursor.execute(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view}")
                    log(app_id, "Refresh completed.")

            LastUpdated.objects.update_or_create(
                product=f"fenix-{app_id}", defaults={"last_updated": timezone.now()}
            )