protected KeyedTable createKeyedTable()

in amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/catalog/MixedHiveTables.java [178:293]


  protected KeyedTable createKeyedTable(
      TableMeta tableMeta,
      Schema schema,
      PrimaryKeySpec primaryKeySpec,
      PartitionSpec partitionSpec) {
    boolean allowExistedHiveTable = allowExistedHiveTable(tableMeta);
    TableIdentifier tableIdentifier = TableIdentifier.of(tableMeta.getTableIdentifier());
    String baseLocation = checkLocation(tableMeta, MetaTableProperties.LOCATION_KEY_BASE);
    String changeLocation = checkLocation(tableMeta, MetaTableProperties.LOCATION_KEY_CHANGE);
    String tableLocation = checkLocation(tableMeta, MetaTableProperties.LOCATION_KEY_TABLE);
    fillTableProperties(tableMeta);
    String hiveLocation =
        tableMeta.getProperties().get(HiveTableProperties.BASE_HIVE_LOCATION_ROOT);
    // Default 1 day
    if (!tableMeta.properties.containsKey(TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL)) {
      tableMeta.putToProperties(TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, "86400000");
    }

    AuthenticatedHadoopFileIO fileIO =
        AuthenticatedFileIOs.buildRecoverableHadoopFileIO(
            tableIdentifier,
            tableLocation,
            tableMeta.getProperties(),
            tableMetaStore,
            catalogProperties);
    Table baseIcebergTable =
        tableMetaStore.doAs(
            () -> {
              try {
                Table createTable =
                    tables.create(schema, partitionSpec, tableMeta.getProperties(), baseLocation);
                createTable
                    .updateProperties()
                    .set(
                        org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING,
                        NameMappingParser.toJson(MappingUtil.create(createTable.schema())))
                    .commit();
                return createTable;
              } catch (Exception e) {
                throw new IllegalStateException("create base table failed", e);
              }
            });
    UnkeyedHiveTable baseTable =
        new KeyedHiveTable.HiveBaseInternalTable(
            tableIdentifier,
            MixedFormatCatalogUtil.useMixedTableOperations(
                baseIcebergTable, baseLocation, fileIO, tableMetaStore.getConfiguration()),
            fileIO,
            tableLocation,
            hiveClientPool,
            catalogProperties,
            false);

    Table changeIcebergTable =
        tableMetaStore.doAs(
            () -> {
              try {
                Table createTable =
                    tables.create(schema, partitionSpec, tableMeta.getProperties(), changeLocation);
                createTable
                    .updateProperties()
                    .set(
                        org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING,
                        NameMappingParser.toJson(MappingUtil.create(createTable.schema())))
                    .commit();
                return createTable;
              } catch (Exception e) {
                throw new IllegalStateException("create change table failed", e);
              }
            });
    ChangeTable changeTable =
        new KeyedHiveTable.HiveChangeInternalTable(
            tableIdentifier,
            MixedFormatCatalogUtil.useMixedTableOperations(
                changeIcebergTable, changeLocation, fileIO, tableMetaStore.getConfiguration()),
            fileIO,
            catalogProperties);

    Map<String, String> metaProperties = tableMeta.getProperties();
    try {
      hiveClientPool.run(
          client -> {
            if (allowExistedHiveTable) {
              org.apache.hadoop.hive.metastore.api.Table hiveTable =
                  client.getTable(tableIdentifier.getDatabase(), tableIdentifier.getTableName());
              Map<String, String> hiveParameters = hiveTable.getParameters();
              hiveParameters.putAll(constructProperties(primaryKeySpec, tableMeta));
              hiveTable.setParameters(hiveParameters);
              client.alterTable(
                  tableIdentifier.getDatabase(), tableIdentifier.getTableName(), hiveTable);
            } else {
              org.apache.hadoop.hive.metastore.api.Table hiveTable =
                  newHiveTable(tableMeta, schema, partitionSpec);
              hiveTable.setSd(
                  HiveTableUtil.storageDescriptor(
                      schema,
                      partitionSpec,
                      hiveLocation,
                      FileFormat.valueOf(
                          PropertyUtil.propertyAsString(
                                  metaProperties,
                                  TableProperties.DEFAULT_FILE_FORMAT,
                                  TableProperties.DEFAULT_FILE_FORMAT_DEFAULT)
                              .toUpperCase(Locale.ENGLISH))));
              setProToHive(hiveTable, primaryKeySpec, tableMeta);
              client.createTable(hiveTable);
            }
            return null;
          });
    } catch (TException | InterruptedException e) {
      throw new RuntimeException(
          "Failed to create hive table:" + tableMeta.getTableIdentifier(), e);
    }
    return new KeyedHiveTable(
        tableMeta, tableLocation, primaryKeySpec, hiveClientPool, baseTable, changeTable);
  }