in redash/tasks/queries/maintenance.py [0:0]
def refresh_schema(data_source_id, max_type_string_length=250):
ds = models.DataSource.get_by_id(data_source_id)
logger.info("task=refresh_schema state=start ds_id=%s", ds.id)
lock_key = "data_source:schema:refresh:{}:lock".format(data_source_id)
lock = redis_connection.lock(lock_key, timeout=settings.SCHEMA_REFRESH_TIME_LIMIT)
acquired = lock.acquire(blocking=False)
start_time = time.time()
if acquired:
logger.info("task=refresh_schema state=locked ds_id=%s", ds.id)
try:
# Stores data from the updated schema that tells us which
# columns and which tables currently exist
existing_tables_set = set()
existing_columns_set = set()
# Stores data that will be inserted into postgres
table_data = {}
column_data = {}
new_column_names = {}
new_column_metadata = {}
for table in ds.query_runner.get_schema(get_stats=True):
table_name = table["name"]
existing_tables_set.add(table_name)
table_data[table_name] = {
"org_id": ds.org_id,
"name": table_name,
"data_source_id": ds.id,
"column_metadata": "metadata" in table,
"exists": True,
}
new_column_names[table_name] = table["columns"]
new_column_metadata[table_name] = table.get("metadata", None)
models.TableMetadata.store(ds, existing_tables_set, table_data)
all_existing_persisted_tables = models.TableMetadata.query.filter(
models.TableMetadata.exists.is_(True),
models.TableMetadata.data_source_id == ds.id,
).all()
for table in all_existing_persisted_tables:
for i, column in enumerate(new_column_names.get(table.name, [])):
existing_columns_set.add(column)
column_data[column] = {
"org_id": ds.org_id,
"table_id": table.id,
"name": column,
"type": None,
"exists": True,
}
if table.column_metadata:
column_type = new_column_metadata[table.name][i]["type"]
column_type = truncate_long_string(
column_type, max_type_string_length
)
column_data[column]["type"] = column_type
models.ColumnMetadata.store(table, existing_columns_set, column_data)
existing_columns_list = list(existing_columns_set)
# If a column did not exist, set the 'column_exists' flag to false.
models.ColumnMetadata.query.filter(
models.ColumnMetadata.exists.is_(True),
models.ColumnMetadata.table_id == table.id,
~models.ColumnMetadata.name.in_(existing_columns_list),
).update(
{"exists": False, "updated_at": models.db.func.now()},
synchronize_session="fetch",
)
# Clear the set for the next round
existing_columns_set.clear()
# If a table did not exist in the get_schema() response above,
# set the 'exists' flag to false.
existing_tables_list = list(existing_tables_set)
models.TableMetadata.query.filter(
models.TableMetadata.exists.is_(True),
models.TableMetadata.data_source_id == ds.id,
~models.TableMetadata.name.in_(existing_tables_list),
).update(
{"exists": False, "updated_at": models.db.func.now()},
synchronize_session="fetch",
)
models.db.session.commit()
logger.info("task=refresh_schema state=caching ds_id=%s", ds.id)
ds.schema_cache.populate(forced=True)
logger.info("task=refresh_schema state=cached ds_id=%s", ds.id)
logger.info(
"task=refresh_schema state=finished ds_id=%s runtime=%.2f",
ds.id,
time.time() - start_time,
)
statsd_client.incr("refresh_schema.success")
except JobTimeoutException:
logger.info(
"task=refresh_schema state=timeout ds_id=%s runtime=%.2f",
ds.id,
time.time() - start_time,
)
statsd_client.incr("refresh_schema.timeout")
except Exception:
logger.warning(
"Failed refreshing schema for the data source: %s", ds.name, exc_info=1
)
statsd_client.incr("refresh_schema.error")
logger.info(
"task=refresh_schema state=failed ds_id=%s runtime=%.2f",
ds.id,
time.time() - start_time,
)
finally:
lock.release()
logger.info("task=refresh_schema state=unlocked ds_id=%s", ds.id)
else:
logger.info("task=refresh_schema state=alreadylocked ds_id=%s", ds.id)