in adb3client/src/main/java/com/alibaba/cloud/analyticdb/adb3client/impl/ExecutionPool.java [484:529]
private void refreshTableSchema() {
//避免getTableSchema返回太慢的时候,同个表重复刷新TableSchema
if (pendingRefreshTableSchemaActionCount.get() == 0) {
// 万一出现非预期的异常也不会导致线程结束工作
try {
tableCache.filterKeys(tableSchemaRemainLife).forEach(tableNameWithState -> {
Cache.ItemState state = tableNameWithState.l;
TableName tableName = tableNameWithState.r;
switch (state) {
case EXPIRE:
LOGGER.info("remove expire tableSchema for {}", tableName);
tableCache.remove(tableName);
break;
case NEED_REFRESH:
try {
getOrSubmitTableSchema(tableName, true);
MetaAction metaAction = new MetaAction(tableName);
if (submit(metaAction)) {
LOGGER.info("refresh tableSchema for {}, because remain lifetime < {} ms", tableName, tableSchemaRemainLife);
pendingRefreshTableSchemaActionCount.incrementAndGet();
metaAction.getFuture().whenCompleteAsync((tableSchema, exception) -> {
pendingRefreshTableSchemaActionCount.decrementAndGet();
if (exception != null) {
LOGGER.warn("refreshTableSchema fail", exception);
if (exception.getMessage() != null && exception.getMessage().contains("can not found table")) {
tableCache.remove(tableName);
}
} else {
tableCache.put(tableName, tableSchema);
}
});
}
} catch (Exception e) {
LOGGER.warn("refreshTableSchema fail", e);
}
break;
default:
LOGGER.error("undefine item state {}", state);
}
});
} catch (Throwable e) {
LOGGER.warn("refreshTableSchema unexpected fail", e);
}
}
}