in core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java [513:593]
private Table internalCreateTable(
NameIdentifier ident,
Column[] columns,
String comment,
Map<String, String> properties,
Transform[] partitions,
Distribution distribution,
SortOrder[] sortOrders,
Index[] indexes) {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.tablePropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
long uid = idGenerator.nextId();
// Add StringIdentifier to the properties, the specific catalog will handle this
// StringIdentifier to make sure only when the operation is successful, the related
// TableEntity will be visible.
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);
// we do not retrieve the table again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Table table =
doWithCatalog(
catalogIdent,
c ->
c.doWithTableOps(
t ->
t.createTable(
ident,
columns,
comment,
updatedProperties,
partitions == null ? EMPTY_TRANSFORM : partitions,
distribution == null ? Distributions.NONE : distribution,
sortOrders == null ? new SortOrder[0] : sortOrders,
indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
NoSuchSchemaException.class,
TableAlreadyExistsException.class);
AuditInfo audit =
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build();
List<ColumnEntity> columnEntityList =
IntStream.range(0, columns.length)
.mapToObj(i -> ColumnEntity.toColumnEntity(columns[i], i, idGenerator.nextId(), audit))
.collect(Collectors.toList());
TableEntity tableEntity =
TableEntity.builder()
.withId(uid)
.withName(ident.name())
.withNamespace(ident.namespace())
.withColumns(columnEntityList)
.withAuditInfo(audit)
.build();
try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenProperties(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}
return EntityCombinedTable.of(table, tableEntity)
.withHiddenProperties(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}