in amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java [267:360]
public void exploreExternalCatalog(ExternalCatalog externalCatalog) {
final List<CompletableFuture<Set<TableIdentity>>> tableIdentifiersFutures =
Lists.newArrayList();
externalCatalog
.listDatabases()
.forEach(
database -> {
try {
tableIdentifiersFutures.add(
CompletableFuture.supplyAsync(
() ->
externalCatalog.listTables(database).stream()
.map(TableIdentity::new)
.collect(Collectors.toSet()),
tableExplorerExecutors)
.exceptionally(
ex -> {
LOG.error(
"TableExplorer list tables in database {} error", database, ex);
throw new RuntimeException(ex);
}));
} catch (RejectedExecutionException e) {
LOG.error(
"The queue of table explorer is full, please increase the queue size or thread count.");
}
});
Set<TableIdentity> tableIdentifiers =
tableIdentifiersFutures.stream()
.map(CompletableFuture::join)
.reduce(
(a, b) -> {
a.addAll(b);
return a;
})
.orElse(Sets.newHashSet());
LOG.info(
"Loaded {} tables from external catalog {}.",
tableIdentifiers.size(),
externalCatalog.name());
Map<TableIdentity, ServerTableIdentifier> serverTableIdentifiers =
getAs(
TableMetaMapper.class,
mapper -> mapper.selectTableIdentifiersByCatalog(externalCatalog.name()))
.stream()
.collect(Collectors.toMap(TableIdentity::new, tableIdentifier -> tableIdentifier));
LOG.info(
"Loaded {} tables from Amoro server catalog {}.",
serverTableIdentifiers.size(),
externalCatalog.name());
final List<CompletableFuture<Void>> taskFutures = Lists.newArrayList();
Sets.difference(tableIdentifiers, serverTableIdentifiers.keySet())
.forEach(
tableIdentity -> {
try {
taskFutures.add(
CompletableFuture.runAsync(
() -> {
try {
syncTable(externalCatalog, tableIdentity);
} catch (Exception e) {
LOG.error(
"TableExplorer sync table {} error", tableIdentity.toString(), e);
}
},
tableExplorerExecutors));
} catch (RejectedExecutionException e) {
LOG.error(
"The queue of table explorer is full, please increase the queue size or thread count.");
}
});
Sets.difference(serverTableIdentifiers.keySet(), tableIdentifiers)
.forEach(
tableIdentity -> {
try {
taskFutures.add(
CompletableFuture.runAsync(
() -> {
try {
disposeTable(serverTableIdentifiers.get(tableIdentity));
} catch (Exception e) {
LOG.error(
"TableExplorer dispose table {} error",
tableIdentity.toString(),
e);
}
},
tableExplorerExecutors));
} catch (RejectedExecutionException e) {
LOG.error(
"The queue of table explorer is full, please increase the queue size or thread count.");
}
});
taskFutures.forEach(CompletableFuture::join);
}