def refresh_schema()

in redash/tasks/queries/maintenance.py [0:0]


def refresh_schema(data_source_id, max_type_string_length=250):
    ds = models.DataSource.get_by_id(data_source_id)
    logger.info("task=refresh_schema state=start ds_id=%s", ds.id)
    lock_key = "data_source:schema:refresh:{}:lock".format(data_source_id)
    lock = redis_connection.lock(lock_key, timeout=settings.SCHEMA_REFRESH_TIME_LIMIT)
    acquired = lock.acquire(blocking=False)
    start_time = time.time()

    if acquired:
        logger.info("task=refresh_schema state=locked ds_id=%s", ds.id)
        try:
            # Stores data from the updated schema that tells us which
            # columns and which tables currently exist
            existing_tables_set = set()
            existing_columns_set = set()

            # Stores data that will be inserted into postgres
            table_data = {}
            column_data = {}

            new_column_names = {}
            new_column_metadata = {}

            for table in ds.query_runner.get_schema(get_stats=True):
                table_name = table["name"]
                existing_tables_set.add(table_name)

                table_data[table_name] = {
                    "org_id": ds.org_id,
                    "name": table_name,
                    "data_source_id": ds.id,
                    "column_metadata": "metadata" in table,
                    "exists": True,
                }
                new_column_names[table_name] = table["columns"]
                new_column_metadata[table_name] = table.get("metadata", None)

            models.TableMetadata.store(ds, existing_tables_set, table_data)

            all_existing_persisted_tables = models.TableMetadata.query.filter(
                models.TableMetadata.exists.is_(True),
                models.TableMetadata.data_source_id == ds.id,
            ).all()

            for table in all_existing_persisted_tables:
                for i, column in enumerate(new_column_names.get(table.name, [])):
                    existing_columns_set.add(column)
                    column_data[column] = {
                        "org_id": ds.org_id,
                        "table_id": table.id,
                        "name": column,
                        "type": None,
                        "exists": True,
                    }

                    if table.column_metadata:
                        column_type = new_column_metadata[table.name][i]["type"]
                        column_type = truncate_long_string(
                            column_type, max_type_string_length
                        )
                        column_data[column]["type"] = column_type

                models.ColumnMetadata.store(table, existing_columns_set, column_data)

                existing_columns_list = list(existing_columns_set)

                # If a column did not exist, set the 'column_exists' flag to false.
                models.ColumnMetadata.query.filter(
                    models.ColumnMetadata.exists.is_(True),
                    models.ColumnMetadata.table_id == table.id,
                    ~models.ColumnMetadata.name.in_(existing_columns_list),
                ).update(
                    {"exists": False, "updated_at": models.db.func.now()},
                    synchronize_session="fetch",
                )

                # Clear the set for the next round
                existing_columns_set.clear()

            # If a table did not exist in the get_schema() response above,
            # set the 'exists' flag to false.
            existing_tables_list = list(existing_tables_set)
            models.TableMetadata.query.filter(
                models.TableMetadata.exists.is_(True),
                models.TableMetadata.data_source_id == ds.id,
                ~models.TableMetadata.name.in_(existing_tables_list),
            ).update(
                {"exists": False, "updated_at": models.db.func.now()},
                synchronize_session="fetch",
            )

            models.db.session.commit()

            logger.info("task=refresh_schema state=caching ds_id=%s", ds.id)
            ds.schema_cache.populate(forced=True)
            logger.info("task=refresh_schema state=cached ds_id=%s", ds.id)

            logger.info(
                "task=refresh_schema state=finished ds_id=%s runtime=%.2f",
                ds.id,
                time.time() - start_time,
            )
            statsd_client.incr("refresh_schema.success")
        except JobTimeoutException:
            logger.info(
                "task=refresh_schema state=timeout ds_id=%s runtime=%.2f",
                ds.id,
                time.time() - start_time,
            )
            statsd_client.incr("refresh_schema.timeout")
        except Exception:
            logger.warning(
                "Failed refreshing schema for the data source: %s", ds.name, exc_info=1
            )
            statsd_client.incr("refresh_schema.error")
            logger.info(
                "task=refresh_schema state=failed ds_id=%s runtime=%.2f",
                ds.id,
                time.time() - start_time,
            )
        finally:
            lock.release()
            logger.info("task=refresh_schema state=unlocked ds_id=%s", ds.id)
    else:
        logger.info("task=refresh_schema state=alreadylocked ds_id=%s", ds.id)