in fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java [2428:3303]
private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
String tableName = stmt.getTableName();
if (LOG.isDebugEnabled()) {
LOG.debug("begin create olap table: {}", tableName);
}
String tableShowName = tableName;
if (stmt.isTemp()) {
tableName = Util.generateTempTableInnerName(tableName);
}
boolean tableHasExist = false;
BinlogConfig dbBinlogConfig;
db.readLock();
try {
dbBinlogConfig = new BinlogConfig(db.getBinlogConfig());
} finally {
db.readUnlock();
}
BinlogConfig createTableBinlogConfig = new BinlogConfig(dbBinlogConfig);
createTableBinlogConfig.mergeFromProperties(stmt.getProperties());
if (dbBinlogConfig.isEnable() && !createTableBinlogConfig.isEnable() && !stmt.isTemp()) {
throw new DdlException("Cannot create table with binlog disabled when database binlog enable");
}
if (stmt.isTemp() && createTableBinlogConfig.isEnable()) {
throw new DdlException("Cannot create temporary table with binlog enable");
}
stmt.getProperties().putAll(createTableBinlogConfig.toProperties());
// get keys type
KeysDesc keysDesc = stmt.getKeysDesc();
Preconditions.checkNotNull(keysDesc);
KeysType keysType = keysDesc.getKeysType();
int keysColumnSize = keysDesc.keysColumnSize();
boolean isKeysRequired = !(keysType == KeysType.DUP_KEYS && keysColumnSize == 0);
// create columns
List<Column> baseSchema = stmt.getColumns();
validateColumns(baseSchema, isKeysRequired);
checkAutoIncColumns(baseSchema, keysType);
// analyze replica allocation
ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(stmt.getProperties(), "");
if (replicaAlloc.isNotSet()) {
replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
}
long bufferSize = IdGeneratorUtil.getBufferSizeForCreateTable(stmt, replicaAlloc);
IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
// create partition info
PartitionDesc partitionDesc = stmt.getPartitionDesc();
ConnectContext ctx = ConnectContext.get();
Env env = Env.getCurrentEnv();
// check legality of partiton exprs.
if (ctx != null && env != null && partitionDesc != null && partitionDesc.getPartitionExprs() != null) {
checkLegalityofPartitionExprs(stmt, partitionDesc);
}
PartitionInfo partitionInfo = null;
Map<String, Long> partitionNameToId = Maps.newHashMap();
if (partitionDesc != null) {
for (SinglePartitionDesc desc : partitionDesc.getSinglePartitionDescs()) {
// check legality of nullity of partition items.
checkPartitionNullity(baseSchema, partitionDesc, desc);
long partitionId = idGeneratorBuffer.getNextId();
partitionNameToId.put(desc.getPartitionName(), partitionId);
}
partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId, false);
} else {
if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(stmt.getProperties())) {
throw new DdlException("Only support dynamic partition properties on range partition table");
}
long partitionId = idGeneratorBuffer.getNextId();
// use table name as single partition name
partitionNameToId.put(Util.getTempTableDisplayName(tableName), partitionId);
partitionInfo = new SinglePartitionInfo();
}
// create distribution info
DistributionDesc distributionDesc = stmt.getDistributionDesc();
Preconditions.checkNotNull(distributionDesc);
DistributionInfo defaultDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
if (defaultDistributionInfo instanceof HashDistributionInfo
&& ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns()
.stream().anyMatch(column -> column.getType().isVariantType())) {
throw new DdlException("Hash distribution info should not contain variant columns");
}
// calc short key column count
short shortKeyColumnCount = Env.calcShortKeyColumnCount(baseSchema, stmt.getProperties(), isKeysRequired);
if (LOG.isDebugEnabled()) {
LOG.debug("create table[{}] short key column count: {}", tableName, shortKeyColumnCount);
}
// create table
long tableId = idGeneratorBuffer.getNextId();
TableType tableType = OlapTableFactory.getTableType(stmt);
OlapTable olapTable = (OlapTable) new OlapTableFactory()
.init(tableType, stmt.isTemp())
.withTableId(tableId)
.withTableName(tableName)
.withSchema(baseSchema)
.withKeysType(keysType)
.withPartitionInfo(partitionInfo)
.withDistributionInfo(defaultDistributionInfo)
.withExtraParams(stmt)
.build();
olapTable.setComment(stmt.getComment());
// set base index id
long baseIndexId = idGeneratorBuffer.getNextId();
olapTable.setBaseIndexId(baseIndexId);
// set base index info to table
// this should be done before create partition.
Map<String, String> properties = stmt.getProperties();
if (stmt.isTemp()) {
properties.put("binlog.enable", "false");
}
short minLoadReplicaNum = -1;
try {
minLoadReplicaNum = PropertyAnalyzer.analyzeMinLoadReplicaNum(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
if (minLoadReplicaNum > replicaAlloc.getTotalReplicaNum()) {
throw new DdlException("Failed to check min load replica num [" + minLoadReplicaNum + "] <= "
+ "default replica num [" + replicaAlloc.getTotalReplicaNum() + "]");
}
olapTable.setMinLoadReplicaNum(minLoadReplicaNum);
// get use light schema change
Boolean enableLightSchemaChange;
try {
enableLightSchemaChange = PropertyAnalyzer.analyzeUseLightSchemaChange(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
// use light schema change optimization
olapTable.setEnableLightSchemaChange(enableLightSchemaChange);
// check if light schema change is disabled, variant type rely on light schema change
if (!enableLightSchemaChange) {
for (Column column : baseSchema) {
if (column.getType().isVariantType()) {
throw new DdlException("Variant type rely on light schema change, "
+ " please use light_schema_change = true.");
}
}
}
boolean disableAutoCompaction = false;
try {
disableAutoCompaction = PropertyAnalyzer.analyzeDisableAutoCompaction(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
// use light schema change optimization
olapTable.setDisableAutoCompaction(disableAutoCompaction);
// set compaction policy
String compactionPolicy = PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY;
try {
compactionPolicy = PropertyAnalyzer.analyzeCompactionPolicy(properties, olapTable.getKeysType());
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setCompactionPolicy(compactionPolicy);
if (!compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
&& (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)
|| properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))) {
throw new DdlException("only time series compaction policy support for time series config");
}
// set time series compaction goal size
long timeSeriesCompactionGoalSizeMbytes
= PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE;
try {
timeSeriesCompactionGoalSizeMbytes = PropertyAnalyzer
.analyzeTimeSeriesCompactionGoalSizeMbytes(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes);
// set time series compaction file count threshold
long timeSeriesCompactionFileCountThreshold
= PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
try {
timeSeriesCompactionFileCountThreshold = PropertyAnalyzer
.analyzeTimeSeriesCompactionFileCountThreshold(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold);
// set time series compaction time threshold
long timeSeriesCompactionTimeThresholdSeconds
= PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE;
try {
timeSeriesCompactionTimeThresholdSeconds = PropertyAnalyzer
.analyzeTimeSeriesCompactionTimeThresholdSeconds(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds);
// set time series compaction empty rowsets threshold
long timeSeriesCompactionEmptyRowsetsThreshold
= PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE;
try {
timeSeriesCompactionEmptyRowsetsThreshold = PropertyAnalyzer
.analyzeTimeSeriesCompactionEmptyRowsetsThreshold(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold);
// set time series compaction level threshold
long timeSeriesCompactionLevelThreshold
= PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
try {
timeSeriesCompactionLevelThreshold = PropertyAnalyzer
.analyzeTimeSeriesCompactionLevelThreshold(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold);
boolean variantEnableFlattenNested = false;
try {
variantEnableFlattenNested = PropertyAnalyzer.analyzeVariantFlattenNested(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setVariantEnableFlattenNested(variantEnableFlattenNested);
// get storage format
TStorageFormat storageFormat = TStorageFormat.V2; // default is segment v2
try {
storageFormat = PropertyAnalyzer.analyzeStorageFormat(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setStorageFormat(storageFormat);
TInvertedIndexFileStorageFormat invertedIndexFileStorageFormat;
try {
invertedIndexFileStorageFormat = PropertyAnalyzer.analyzeInvertedIndexFileStorageFormat(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setInvertedIndexFileStorageFormat(invertedIndexFileStorageFormat);
// get compression type
TCompressionType compressionType = TCompressionType.LZ4;
try {
compressionType = PropertyAnalyzer.analyzeCompressionType(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setCompressionType(compressionType);
// get row_store_page_size
long rowStorePageSize = PropertyAnalyzer.ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
try {
rowStorePageSize = PropertyAnalyzer.analyzeRowStorePageSize(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setRowStorePageSize(rowStorePageSize);
long storagePageSize = PropertyAnalyzer.STORAGE_PAGE_SIZE_DEFAULT_VALUE;
try {
storagePageSize = PropertyAnalyzer.analyzeStoragePageSize(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
olapTable.setStoragePageSize(storagePageSize);
// check data sort properties
int keyColumnSize = CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnNames()) ? keysDesc.keysColumnSize() :
keysDesc.getClusterKeysColumnNames().size();
DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
keyColumnSize, storageFormat);
olapTable.setDataSortInfo(dataSortInfo);
boolean enableUniqueKeyMergeOnWrite = false;
if (keysType == KeysType.UNIQUE_KEYS) {
try {
enableUniqueKeyMergeOnWrite = PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
if (enableUniqueKeyMergeOnWrite && !enableLightSchemaChange && !CollectionUtils.isEmpty(
keysDesc.getClusterKeysColumnNames())) {
throw new DdlException(
"Unique merge-on-write tables with cluster keys require light schema change to be enabled.");
}
}
olapTable.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
if (keysType == KeysType.UNIQUE_KEYS && enableUniqueKeyMergeOnWrite) {
try {
// don't store this property, check and remove it from `properties`
PropertyAnalyzer.analyzeUniqueKeySkipBitmapColumn(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
}
boolean enableDeleteOnDeletePredicate = false;
try {
enableDeleteOnDeletePredicate = PropertyAnalyzer.analyzeEnableDeleteOnDeletePredicate(properties,
enableUniqueKeyMergeOnWrite);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
if (enableDeleteOnDeletePredicate && !enableUniqueKeyMergeOnWrite) {
throw new DdlException(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE
+ " property is only supported for unique merge-on-write table");
}
olapTable.setEnableMowLightDelete(enableDeleteOnDeletePredicate);
boolean enableSingleReplicaCompaction = false;
try {
enableSingleReplicaCompaction = PropertyAnalyzer.analyzeEnableSingleReplicaCompaction(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
if (enableUniqueKeyMergeOnWrite && enableSingleReplicaCompaction) {
throw new DdlException(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION
+ " property is not supported for merge-on-write table");
}
olapTable.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction);
if (Config.isCloudMode() && ((CloudEnv) env).getEnableStorageVault()) {
// <storageVaultName, storageVaultId>
Pair<String, String> storageVaultInfoPair = PropertyAnalyzer.analyzeStorageVault(properties, db);
// Check if user has storage vault usage privilege
if (ConnectContext.get() != null && !env.getAccessManager()
.checkStorageVaultPriv(ctx.getCurrentUserIdentity(),
storageVaultInfoPair.first, PrivPredicate.USAGE)) {
throw new DdlException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser()
+ "'@'" + ConnectContext.get().getRemoteIP()
+ "' for storage vault '" + storageVaultInfoPair.first + "'");
}
Preconditions.checkArgument(StringUtils.isNumeric(storageVaultInfoPair.second),
"Invalid storage vault id :%s", storageVaultInfoPair.second);
olapTable.setStorageVaultId(storageVaultInfoPair.second);
}
// check `update on current_timestamp`
if (!enableUniqueKeyMergeOnWrite) {
for (Column column : baseSchema) {
if (column.hasOnUpdateDefaultValue()) {
throw new DdlException("'ON UPDATE CURRENT_TIMESTAMP' is only supportted"
+ " in unique table with merge-on-write enabled.");
}
}
}
// analyze bloom filter columns
Set<String> bfColumns = null;
double bfFpp = 0;
try {
bfColumns = PropertyAnalyzer.analyzeBloomFilterColumns(properties, baseSchema, keysType);
if (bfColumns != null && bfColumns.isEmpty()) {
bfColumns = null;
}
bfFpp = PropertyAnalyzer.analyzeBloomFilterFpp(properties);
if (bfColumns != null && bfFpp == 0) {
bfFpp = FeConstants.default_bloom_filter_fpp;
} else if (bfColumns == null) {
bfFpp = 0;
}
olapTable.setBloomFilterInfo(bfColumns, bfFpp);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
Index.checkConflict(stmt.getIndexes(), bfColumns);
olapTable.setReplicationAllocation(replicaAlloc);
// set auto bucket
boolean isAutoBucket = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_AUTO_BUCKET,
false);
olapTable.setIsAutoBucket(isAutoBucket);
// set estimate partition size
if (isAutoBucket) {
String estimatePartitionSize = PropertyAnalyzer.analyzeEstimatePartitionSize(properties);
olapTable.setEstimatePartitionSize(estimatePartitionSize);
}
// set in memory
boolean isInMemory = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY,
false);
if (isInMemory) {
throw new AnalysisException("Not support set 'in_memory'='true' now!");
}
olapTable.setIsInMemory(false);
boolean isBeingSynced = PropertyAnalyzer.analyzeIsBeingSynced(properties, false);
olapTable.setIsBeingSynced(isBeingSynced);
if (isBeingSynced) {
// erase colocate table, storage policy
olapTable.ignoreInvalidPropertiesWhenSynced(properties);
// remark auto bucket
if (isAutoBucket) {
olapTable.markAutoBucket();
}
}
// analyze row store columns
try {
boolean storeRowColumn = false;
storeRowColumn = PropertyAnalyzer.analyzeStoreRowColumn(properties);
if (storeRowColumn && !enableLightSchemaChange) {
throw new DdlException(
"Row store column rely on light schema change, enable light schema change first");
}
olapTable.setStoreRowColumn(storeRowColumn);
List<String> rowStoreColumns;
try {
rowStoreColumns = PropertyAnalyzer.analyzeRowStoreColumns(properties,
baseSchema.stream().map(Column::getName).collect(Collectors.toList()));
if (rowStoreColumns != null && rowStoreColumns.isEmpty()) {
rowStoreColumns = null;
}
olapTable.setRowStoreColumns(rowStoreColumns);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
// set skip inverted index on load
boolean skipWriteIndexOnLoad = PropertyAnalyzer.analyzeSkipWriteIndexOnLoad(properties);
olapTable.setSkipWriteIndexOnLoad(skipWriteIndexOnLoad);
boolean isMutable = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_MUTABLE, true);
Long ttlSeconds = PropertyAnalyzer.analyzeTTL(properties);
olapTable.setTTLSeconds(ttlSeconds);
// set storage policy
String storagePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties);
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(storagePolicy);
if (olapTable.getEnableUniqueKeyMergeOnWrite()
&& !Strings.isNullOrEmpty(storagePolicy)) {
throw new AnalysisException(
"Can not create UNIQUE KEY table that enables Merge-On-write"
+ " with storage policy(" + storagePolicy + ")");
}
// Consider one situation: if the table has no storage policy but some partitions
// have their own storage policy then it might be erased by the following function.
// So we only set the storage policy if the table's policy is not null or empty
if (!Strings.isNullOrEmpty(storagePolicy)) {
olapTable.setStoragePolicy(storagePolicy);
}
TTabletType tabletType;
try {
tabletType = PropertyAnalyzer.analyzeTabletType(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
// set binlog config
try {
Map<String, String> binlogConfigMap = PropertyAnalyzer.analyzeBinlogConfig(properties);
if (binlogConfigMap != null) {
BinlogConfig binlogConfig = new BinlogConfig();
binlogConfig.mergeFromProperties(binlogConfigMap);
olapTable.setBinlogConfig(binlogConfig);
}
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
BinlogConfig binlogConfigForTask = new BinlogConfig(olapTable.getBinlogConfig());
if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
// if this is an unpartitioned table, we should analyze data property and replication num here.
// if this is a partitioned table, there properties are already analyzed
// in RangePartitionDesc analyze phase.
// use table name as this single partition name
long partitionId = -1;
partitionId = partitionNameToId.get(Util.getTempTableDisplayName(tableName));
DataProperty dataProperty = null;
try {
dataProperty = PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(),
new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM));
olapTable.setStorageMedium(dataProperty.getStorageMedium());
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
Preconditions.checkNotNull(dataProperty);
partitionInfo.setDataProperty(partitionId, dataProperty);
partitionInfo.setReplicaAllocation(partitionId, replicaAlloc);
partitionInfo.setIsInMemory(partitionId, isInMemory);
partitionInfo.setTabletType(partitionId, tabletType);
partitionInfo.setIsMutable(partitionId, isMutable);
if (isBeingSynced) {
partitionInfo.refreshTableStoragePolicy("");
}
}
// check colocation properties
try {
String colocateGroup = PropertyAnalyzer.analyzeColocate(properties);
if (colocateGroup != null) {
if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) {
throw new AnalysisException("Random distribution for colocate table is unsupported");
}
if (isAutoBucket) {
throw new AnalysisException("Auto buckets for colocate table is unsupported");
}
String fullGroupName = GroupId.getFullGroupName(db.getId(), colocateGroup);
ColocateGroupSchema groupSchema = Env.getCurrentColocateIndex().getGroupSchema(fullGroupName);
if (groupSchema != null) {
// group already exist, check if this table can be added to this group
groupSchema.checkColocateSchema(olapTable);
groupSchema.checkDynamicPartition(properties, olapTable.getDefaultDistributionInfo());
}
// add table to this group, if group does not exist, create a new one
Env.getCurrentColocateIndex()
.addTableToGroup(db.getId(), olapTable, fullGroupName, null /* generate group id inside */);
olapTable.setColocateGroup(colocateGroup);
}
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
// get base index storage type. default is COLUMN
TStorageType baseIndexStorageType = null;
try {
baseIndexStorageType = PropertyAnalyzer.analyzeStorageType(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
Preconditions.checkNotNull(baseIndexStorageType);
// set base index meta
int schemaVersion = 0;
try {
schemaVersion = PropertyAnalyzer.analyzeSchemaVersion(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
int schemaHash = Util.generateSchemaHash();
olapTable.setIndexMeta(baseIndexId, tableName, baseSchema, schemaVersion, schemaHash, shortKeyColumnCount,
baseIndexStorageType, keysType, olapTable.getIndexes());
for (AlterClause alterClause : stmt.getRollupAlterClauseList()) {
if (olapTable.isDuplicateWithoutKey()) {
throw new DdlException("Duplicate table without keys do not support add rollup!");
}
AddRollupClause addRollupClause = (AddRollupClause) alterClause;
Long baseRollupIndex = olapTable.getIndexIdByName(tableName);
// get storage type for rollup index
TStorageType rollupIndexStorageType = null;
try {
rollupIndexStorageType = PropertyAnalyzer.analyzeStorageType(addRollupClause.getProperties());
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
Preconditions.checkNotNull(rollupIndexStorageType);
// set rollup index meta to olap table
List<Column> rollupColumns = Env.getCurrentEnv().getMaterializedViewHandler()
.checkAndPrepareMaterializedView(addRollupClause, olapTable, baseRollupIndex, false);
short rollupShortKeyColumnCount = Env.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties(),
true/*isKeysRequired*/);
int rollupSchemaHash = Util.generateSchemaHash();
long rollupIndexId = idGeneratorBuffer.getNextId();
olapTable.setIndexMeta(rollupIndexId, addRollupClause.getRollupName(), rollupColumns, schemaVersion,
rollupSchemaHash, rollupShortKeyColumnCount, rollupIndexStorageType, keysType, null);
}
// analyse sequence map column
String sequenceMapCol = null;
try {
sequenceMapCol = PropertyAnalyzer.analyzeSequenceMapCol(properties, olapTable.getKeysType());
if (sequenceMapCol != null) {
Column col = olapTable.getColumn(sequenceMapCol);
if (col == null) {
throw new DdlException("The specified sequence column[" + sequenceMapCol + "] not exists");
}
if (!col.getType().isFixedPointType() && !col.getType().isDateType()) {
throw new DdlException("Sequence type only support integer types and date types");
}
olapTable.setSequenceMapCol(col.getName());
olapTable.setSequenceInfo(col.getType(), col);
}
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
// analyse sequence type
Type sequenceColType = null;
try {
sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, olapTable.getKeysType());
if (sequenceMapCol != null && sequenceColType != null) {
throw new DdlException("The sequence_col and sequence_type cannot be set at the same time");
}
if (sequenceColType != null) {
olapTable.setSequenceInfo(sequenceColType, null);
}
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
try {
int groupCommitIntervalMs = PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties);
olapTable.setGroupCommitIntervalMs(groupCommitIntervalMs);
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
try {
int groupCommitDataBytes = PropertyAnalyzer.analyzeGroupCommitDataBytes(properties);
olapTable.setGroupCommitDataBytes(groupCommitDataBytes);
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
olapTable.initSchemaColumnUniqueId();
olapTable.initAutoIncrementGenerator(db.getId());
olapTable.rebuildFullSchema();
// analyze version info
Long versionInfo = null;
try {
versionInfo = PropertyAnalyzer.analyzeVersionInfo(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
Preconditions.checkNotNull(versionInfo);
// a set to record every new tablet created when create table
// if failed in any step, use this set to do clear things
Set<Long> tabletIdSet = new HashSet<>();
// create partition
boolean hadLogEditCreateTable = false;
try {
if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
if (properties != null && !properties.isEmpty()) {
// here, all properties should be checked
throw new DdlException("Unknown properties: " + properties);
}
// this is a 1-level partitioned table
// use table name as partition name
DistributionInfo partitionDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
String partitionName = tableName;
if (stmt.isTemp()) {
partitionName = Util.getTempTableDisplayName(tableName);
}
long partitionId = partitionNameToId.get(partitionName);
// check replica quota if this operation done
long indexNum = olapTable.getIndexIdToMeta().size();
long bucketNum = partitionDistributionInfo.getBucketNum();
long replicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
long totalReplicaNum = indexNum * bucketNum * replicaNum;
if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
throw new DdlException(
"Database " + db.getFullName() + " create unpartitioned table " + tableShowName
+ " increasing " + totalReplicaNum + " of replica exceeds quota["
+ db.getReplicaQuota() + "]");
}
beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true);
Partition partition = createPartitionWithIndices(db.getId(), olapTable,
partitionId, partitionName,
olapTable.getIndexIdToMeta(), partitionDistributionInfo,
partitionInfo.getDataProperty(partitionId),
partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, tabletIdSet,
isInMemory, tabletType,
storagePolicy,
idGeneratorBuffer,
binlogConfigForTask,
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
afterCreatePartitions(db.getId(), olapTable.getId(), null,
olapTable.getIndexIdList(), true);
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE
|| partitionInfo.getType() == PartitionType.LIST) {
try {
DataProperty dataProperty = PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(),
new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM));
Map<String, String> propertiesCheck = new HashMap<>(properties);
propertiesCheck.entrySet().removeIf(entry -> entry.getKey().contains("dynamic_partition"));
if (propertiesCheck != null && !propertiesCheck.isEmpty()) {
// here, all properties should be checked
throw new DdlException("Unknown properties: " + propertiesCheck);
}
// just for remove entries in stmt.getProperties(),
// and then check if there still has unknown properties
olapTable.setStorageMedium(dataProperty.getStorageMedium());
if (partitionInfo.getType() == PartitionType.RANGE) {
DynamicPartitionUtil.checkDynamicPartitionPropertyKeysValid(properties);
DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(olapTable, properties, db);
} else if (partitionInfo.getType() == PartitionType.LIST) {
if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
throw new DdlException(
"Only support dynamic partition properties on range partition table");
}
}
// check the interval same between dynamic & auto range partition
DynamicPartitionProperty dynamicProperty = olapTable.getTableProperty()
.getDynamicPartitionProperty();
if (dynamicProperty.isExist() && dynamicProperty.getEnable()
&& partitionDesc.isAutoCreatePartitions()) {
String dynamicUnit = dynamicProperty.getTimeUnit();
ArrayList<Expr> autoExprs = partitionDesc.getPartitionExprs();
// check same interval. fail will leading to AnalysisException
DynamicPartitionUtil.partitionIntervalCompatible(dynamicUnit, autoExprs);
}
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
// check replica quota if this operation done
long totalReplicaNum = 0;
for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
long indexNum = olapTable.getIndexIdToMeta().size();
long bucketNum = defaultDistributionInfo.getBucketNum();
long replicaNum = partitionInfo.getReplicaAllocation(entry.getValue()).getTotalReplicaNum();
totalReplicaNum += indexNum * bucketNum * replicaNum;
}
if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
throw new DdlException(
"Database " + db.getFullName() + " create table " + tableShowName + " increasing "
+ totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
}
beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true);
// this is a 2-level partitioned tables
for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
DataProperty dataProperty = partitionInfo.getDataProperty(entry.getValue());
DistributionInfo partitionDistributionInfo = distributionDesc.toDistributionInfo(baseSchema);
// use partition storage policy if it exist.
String partionStoragePolicy = partitionInfo.getStoragePolicy(entry.getValue());
if (olapTable.getEnableUniqueKeyMergeOnWrite()
&& !Strings.isNullOrEmpty(partionStoragePolicy)) {
throw new AnalysisException(
"Can not create UNIQUE KEY table that enables Merge-On-write"
+ " with storage policy(" + partionStoragePolicy + ")");
}
// The table's storage policy has higher priority than partition's policy,
// so we'll directly use table's policy when it's set. Otherwise we use the
// partition's policy
if (!storagePolicy.isEmpty()) {
partionStoragePolicy = storagePolicy;
}
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(partionStoragePolicy);
Partition partition = createPartitionWithIndices(db.getId(),
olapTable, entry.getValue(),
entry.getKey(), olapTable.getIndexIdToMeta(), partitionDistributionInfo,
dataProperty, partitionInfo.getReplicaAllocation(entry.getValue()),
versionInfo, bfColumns, tabletIdSet, isInMemory,
partitionInfo.getTabletType(entry.getValue()),
partionStoragePolicy, idGeneratorBuffer,
binlogConfigForTask,
dataProperty.isStorageMediumSpecified());
olapTable.addPartition(partition);
olapTable.getPartitionInfo().getDataProperty(partition.getId())
.setStoragePolicy(partionStoragePolicy);
}
afterCreatePartitions(db.getId(), olapTable.getId(), null,
olapTable.getIndexIdList(), true);
} else {
throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name());
}
Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists());
if (!result.first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableShowName);
}
if (result.second) {
if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
// if this is a colocate table, its table id is already added to colocate group
// so we should remove the tableId here
Env.getCurrentColocateIndex().removeTable(tableId);
}
for (Long tabletId : tabletIdSet) {
Env.getCurrentInvertedIndex().deleteTablet(tabletId);
}
LOG.info("duplicate create table[{};{}], skip next steps", tableName, tableId);
} else {
// if table not exists, then db.createTableWithLock will write an editlog.
hadLogEditCreateTable = true;
// we have added these index to memory, only need to persist here
if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
GroupId groupId = Env.getCurrentColocateIndex().getGroup(tableId);
Map<Tag, List<List<Long>>> backendsPerBucketSeq = Env.getCurrentColocateIndex()
.getBackendsPerBucketSeq(groupId);
ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId,
backendsPerBucketSeq);
Env.getCurrentEnv().getEditLog().logColocateAddTable(info);
}
LOG.info("successfully create table[{};{}]", tableName, tableId);
Env.getCurrentEnv().getDynamicPartitionScheduler()
.executeDynamicPartitionFirstTime(db.getId(), olapTable.getId());
// register or remove table from DynamicPartition after table created
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
Env.getCurrentEnv().getDynamicPartitionScheduler()
.createOrUpdateRuntimeInfo(tableId, DynamicPartitionScheduler.LAST_UPDATE_TIME,
TimeUtils.getCurrentFormatTime());
}
if (DebugPointUtil.isEnable("FE.createOlapTable.exception")) {
LOG.info("debug point FE.createOlapTable.exception, throw e");
throw new DdlException("debug point FE.createOlapTable.exception");
}
} catch (DdlException e) {
LOG.warn("create table failed {} - {}", tabletIdSet, e.getMessage());
for (Long tabletId : tabletIdSet) {
Env.getCurrentInvertedIndex().deleteTablet(tabletId);
}
// edit log write DropTableInfo will result in deleting colocate group,
// but follow fe may need wait 30s (recycle bin mgr run every 30s).
if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
Env.getCurrentColocateIndex().removeTable(tableId);
}
try {
dropTable(db, tableId, true, false, 0L);
if (hadLogEditCreateTable) {
DropInfo info = new DropInfo(db.getId(), tableId, olapTable.getName(), false, true, 0L);
Env.getCurrentEnv().getEditLog().logDropTable(info);
}
} catch (Exception ex) {
LOG.warn("drop table", ex);
}
throw e;
}
if (olapTable instanceof MTMV) {
try {
Env.getCurrentEnv().getMtmvService().createMTMV((MTMV) olapTable);
} catch (Throwable t) {
LOG.warn("create mv failed, start rollback, error msg: " + t.getMessage());
try {
DropMTMVInfo dropMTMVInfo = new DropMTMVInfo(
new TableNameInfo(olapTable.getDatabase().getFullName(), olapTable.getName()), true);
Env.getCurrentEnv().dropTable(dropMTMVInfo.translateToLegacyStmt());
} catch (Throwable throwable) {
LOG.warn("rollback mv failed, please drop mv by manual, error msg: " + t.getMessage());
}
throw t;
}
}
return tableHasExist;
}