public void exploreExternalCatalog()

in amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java [267:360]


  public void exploreExternalCatalog(ExternalCatalog externalCatalog) {
    final List<CompletableFuture<Set<TableIdentity>>> tableIdentifiersFutures =
        Lists.newArrayList();
    externalCatalog
        .listDatabases()
        .forEach(
            database -> {
              try {
                tableIdentifiersFutures.add(
                    CompletableFuture.supplyAsync(
                            () ->
                                externalCatalog.listTables(database).stream()
                                    .map(TableIdentity::new)
                                    .collect(Collectors.toSet()),
                            tableExplorerExecutors)
                        .exceptionally(
                            ex -> {
                              LOG.error(
                                  "TableExplorer list tables in database {} error", database, ex);
                              throw new RuntimeException(ex);
                            }));
              } catch (RejectedExecutionException e) {
                LOG.error(
                    "The queue of table explorer is full, please increase the queue size or thread count.");
              }
            });
    Set<TableIdentity> tableIdentifiers =
        tableIdentifiersFutures.stream()
            .map(CompletableFuture::join)
            .reduce(
                (a, b) -> {
                  a.addAll(b);
                  return a;
                })
            .orElse(Sets.newHashSet());
    LOG.info(
        "Loaded {} tables from external catalog {}.",
        tableIdentifiers.size(),
        externalCatalog.name());
    Map<TableIdentity, ServerTableIdentifier> serverTableIdentifiers =
        getAs(
                TableMetaMapper.class,
                mapper -> mapper.selectTableIdentifiersByCatalog(externalCatalog.name()))
            .stream()
            .collect(Collectors.toMap(TableIdentity::new, tableIdentifier -> tableIdentifier));
    LOG.info(
        "Loaded {} tables from Amoro server catalog {}.",
        serverTableIdentifiers.size(),
        externalCatalog.name());
    final List<CompletableFuture<Void>> taskFutures = Lists.newArrayList();
    Sets.difference(tableIdentifiers, serverTableIdentifiers.keySet())
        .forEach(
            tableIdentity -> {
              try {
                taskFutures.add(
                    CompletableFuture.runAsync(
                        () -> {
                          try {
                            syncTable(externalCatalog, tableIdentity);
                          } catch (Exception e) {
                            LOG.error(
                                "TableExplorer sync table {} error", tableIdentity.toString(), e);
                          }
                        },
                        tableExplorerExecutors));
              } catch (RejectedExecutionException e) {
                LOG.error(
                    "The queue of table explorer is full, please increase the queue size or thread count.");
              }
            });
    Sets.difference(serverTableIdentifiers.keySet(), tableIdentifiers)
        .forEach(
            tableIdentity -> {
              try {
                taskFutures.add(
                    CompletableFuture.runAsync(
                        () -> {
                          try {
                            disposeTable(serverTableIdentifiers.get(tableIdentity));
                          } catch (Exception e) {
                            LOG.error(
                                "TableExplorer dispose table {} error",
                                tableIdentity.toString(),
                                e);
                          }
                        },
                        tableExplorerExecutors));
              } catch (RejectedExecutionException e) {
                LOG.error(
                    "The queue of table explorer is full, please increase the queue size or thread count.");
              }
            });
    taskFutures.forEach(CompletableFuture::join);
  }