in fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java [1261:1739]
private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long, LinkedList<Column>> indexSchemaMap,
Map<String, String> propertyMap, List<Index> indexes) throws UserException {
checkReplicaCount(olapTable);
// process properties first
// for now. properties has 3 options
// property 1. to specify short key column count.
// eg.
// "indexname1#short_key" = "3"
// "indexname2#short_key" = "4"
Map<Long, Map<String, String>> indexIdToProperties = new HashMap<Long, Map<String, String>>();
if (propertyMap.size() > 0) {
for (String key : propertyMap.keySet()) {
if (key.endsWith(PropertyAnalyzer.PROPERTIES_SHORT_KEY)) {
// short key
String[] keyArray = key.split("#");
if (keyArray.length != 2 || keyArray[0].isEmpty() || !keyArray[1].equals(
PropertyAnalyzer.PROPERTIES_SHORT_KEY)) {
throw new DdlException("Invalid alter table property: " + key);
}
HashMap<String, String> prop = new HashMap<String, String>();
if (!olapTable.hasMaterializedIndex(keyArray[0])) {
throw new DdlException("Index[" + keyArray[0] + "] does not exist");
}
prop.put(PropertyAnalyzer.PROPERTIES_SHORT_KEY, propertyMap.get(key));
indexIdToProperties.put(olapTable.getIndexIdByName(keyArray[0]), prop);
}
} // end for property keys
}
// for bitmapIndex
boolean hasIndexChange = false;
Set<Index> newSet = new HashSet<>(indexes);
Set<Index> oriSet = new HashSet<>(olapTable.getIndexes());
if (!newSet.equals(oriSet)) {
hasIndexChange = true;
}
// property 2. bloom filter
// eg. "bloom_filter_columns" = "k1,k2", "bloom_filter_fpp" = "0.05"
Set<String> bfColumns = null;
double bfFpp = 0;
try {
bfColumns = PropertyAnalyzer.analyzeBloomFilterColumns(propertyMap,
indexSchemaMap.get(olapTable.getBaseIndexId()), olapTable.getKeysType());
bfFpp = PropertyAnalyzer.analyzeBloomFilterFpp(propertyMap);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
// check bloom filter has change
boolean hasBfChange = false;
Set<String> oriBfColumns = olapTable.getCopiedBfColumns();
double oriBfFpp = olapTable.getBfFpp();
if (bfColumns != null) {
if (bfFpp == 0) {
// columns: yes, fpp: no
if (bfColumns.equals(oriBfColumns)) {
throw new DdlException("Bloom filter index has no change");
}
if (oriBfColumns == null) {
bfFpp = FeConstants.default_bloom_filter_fpp;
} else {
bfFpp = oriBfFpp;
}
} else {
// columns: yes, fpp: yes
if (bfColumns.equals(oriBfColumns) && bfFpp == oriBfFpp) {
throw new DdlException("Bloom filter index has no change");
}
}
hasBfChange = true;
} else {
if (bfFpp == 0) {
// columns: no, fpp: no
bfFpp = oriBfFpp;
} else {
// columns: no, fpp: yes
if (bfFpp == oriBfFpp) {
throw new DdlException("Bloom filter index has no change");
}
if (oriBfColumns == null) {
throw new DdlException("Bloom filter index has no change");
}
hasBfChange = true;
}
bfColumns = oriBfColumns;
}
if (bfColumns != null && bfColumns.isEmpty()) {
bfColumns = null;
}
if (bfColumns == null) {
bfFpp = 0;
}
Index.checkConflict(newSet, bfColumns);
// property 3: timeout
long timeoutSecond = PropertyAnalyzer.analyzeTimeout(propertyMap, Config.alter_table_timeout_second);
TStorageFormat storageFormat = PropertyAnalyzer.analyzeStorageFormat(propertyMap);
// property store_row_column && row_store_columns
// eg. "store_row_column" = "true"
// eg. "row_store_columns" = "k1, k2"
List<String> rsColumns = Lists.newArrayList();
boolean storeRowColumn = false;
try {
storeRowColumn = PropertyAnalyzer.analyzeStoreRowColumn(propertyMap);
rsColumns = PropertyAnalyzer.analyzeRowStoreColumns(propertyMap,
olapTable.getColumns().stream().map(Column::getName).collect(Collectors.toList()));
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
boolean hasRowStoreChanged = false;
if (storeRowColumn || rsColumns != null) {
List<String> oriRowStoreColumns = olapTable.getTableProperty().getCopiedRowStoreColumns();
// correct the comparison logic for null and empty list
boolean columnsChanged = false;
if (rsColumns == null) {
columnsChanged = oriRowStoreColumns != null;
} else if (oriRowStoreColumns == null) {
columnsChanged = true;
} else {
columnsChanged = !oriRowStoreColumns.equals(rsColumns);
}
// partial row store columns changed, or store_row_column enabled, not supported to disable at present
if (columnsChanged || (!olapTable.storeRowColumn() && storeRowColumn)) {
// only support mow and duplicate model
if (!(olapTable.getKeysType() == KeysType.DUP_KEYS
|| olapTable.getEnableUniqueKeyMergeOnWrite())) {
throw new DdlException("`store_row_column` only support duplicate model or mow model");
}
hasRowStoreChanged = true;
}
}
// begin checking each table
// ATTN: DO NOT change any meta in this loop
long tableId = olapTable.getId();
Map<Long, Short> indexIdToShortKeyColumnCount = Maps.newHashMap();
Map<Long, List<Column>> changedIndexIdToSchema = Maps.newHashMap();
for (Long alterIndexId : indexSchemaMap.keySet()) {
// Must get all columns including invisible columns.
// Because in alter process, all columns must be considered.
List<Column> originSchema = olapTable.getSchemaByIndexId(alterIndexId, true);
List<Column> alterSchema = indexSchemaMap.get(alterIndexId);
Set<Column> needAlterColumns = Sets.newHashSet();
// 0. check if unchanged
boolean hasColumnChange = false;
if (alterSchema.size() != originSchema.size()) {
hasColumnChange = true;
} else {
for (int i = 0; i < alterSchema.size(); i++) {
Column alterColumn = alterSchema.get(i);
if (!alterColumn.equals(originSchema.get(i))) {
needAlterColumns.add(alterColumn);
hasColumnChange = true;
} else {
Column oriColumn = originSchema.get(i);
if ((oriColumn.getGeneratedColumnInfo() != null
|| alterColumn.getGeneratedColumnInfo() != null)
&& !oriColumn.getGeneratedColumnInfo().getExprSql()
.equals(alterColumn.getGeneratedColumnInfo().getExprSql())) {
throw new DdlException("Not supporting alter table modify generated columns.");
}
}
}
}
// if has column changed, alter it.
// else:
// if no bf change, no alter
// if has bf change, should check
boolean needAlter = false;
if (hasColumnChange) {
needAlter = true;
} else if (hasBfChange) {
for (Column alterColumn : alterSchema) {
String columnName = alterColumn.getName();
boolean isOldBfColumn = false;
if (oriBfColumns != null && oriBfColumns.contains(columnName)) {
isOldBfColumn = true;
}
boolean isNewBfColumn = false;
if (bfColumns != null && bfColumns.contains(columnName)) {
isNewBfColumn = true;
}
if (isOldBfColumn != isNewBfColumn) {
// bf column change
needAlter = true;
} else if (isOldBfColumn && isNewBfColumn && oriBfFpp != bfFpp) {
// bf fpp change
needAlter = true;
}
if (needAlter) {
break;
}
}
} else if (hasIndexChange) {
needAlter = true;
} else if (hasRowStoreChanged) {
needAlter = true;
} else if (storageFormat == TStorageFormat.V2) {
if (olapTable.getStorageFormat() != TStorageFormat.V2) {
needAlter = true;
}
}
if (!needAlter) {
if (LOG.isDebugEnabled()) {
LOG.debug("index[{}] is not changed. ignore", alterIndexId);
}
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("index[{}] is changed. start checking...", alterIndexId);
}
// 1. check order: a) has key; b) value after key
boolean meetValue = false;
boolean hasKey = false;
for (Column column : alterSchema) {
if (column.isKey() && meetValue) {
throw new DdlException(
"Invalid column order. value should be after key. index[" + olapTable.getIndexNameById(
alterIndexId) + "]");
}
if (!column.isKey()) {
meetValue = true;
} else {
hasKey = true;
}
}
if (!hasKey && !olapTable.isDuplicateWithoutKey()) {
throw new DdlException("No key column left. index[" + olapTable.getIndexNameById(alterIndexId) + "]");
}
// 2. check compatible
for (Column alterColumn : alterSchema) {
for (Column oriColumn : originSchema) {
if (alterColumn.nameEquals(oriColumn.getName(), true /* ignore prefix */)) {
if (!alterColumn.equals(oriColumn)) {
// 3.1 check type
oriColumn.checkSchemaChangeAllowed(alterColumn);
}
}
} // end for ori
} // end for alter
// 3. check partition key
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
List<Column> partitionColumns = partitionInfo.getPartitionColumns();
for (Column partitionCol : partitionColumns) {
boolean found = false;
for (Column alterColumn : alterSchema) {
if (alterColumn.nameEquals(partitionCol.getName(), true)) {
// 2.1 partition column cannot be modified
if (needAlterColumns.contains(alterColumn) && !alterColumn.equals(partitionCol)) {
throw new DdlException(
"Can not modify partition column[" + partitionCol.getName() + "]. index["
+ olapTable.getIndexNameById(alterIndexId) + "]");
}
found = true;
break;
}
} // end for alterColumns
if (!found && alterIndexId == olapTable.getBaseIndexId()) {
// 2.1 partition column cannot be deleted.
throw new DdlException(
"Partition column[" + partitionCol.getName() + "] cannot be dropped. index["
+ olapTable.getIndexNameById(alterIndexId) + "]");
// ATTN. partition columns' order also need remaining unchanged.
// for now, we only allow one partition column, so no need to check order.
}
} // end for partitionColumns
}
// 4. check distribution key:
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
if (distributionInfo.getType() == DistributionInfoType.HASH) {
List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
for (Column distributionCol : distributionColumns) {
boolean found = false;
for (Column alterColumn : alterSchema) {
if (alterColumn.nameEquals(distributionCol.getName(), true)) {
// 3.1 distribution column cannot be modified
if (needAlterColumns.contains(alterColumn) && !alterColumn.equals(distributionCol)) {
throw new DdlException(
"Can not modify distribution column[" + distributionCol.getName() + "]. index["
+ olapTable.getIndexNameById(alterIndexId) + "]");
}
found = true;
break;
}
} // end for alterColumns
if (!found && alterIndexId == olapTable.getBaseIndexId()) {
// 2.2 distribution column cannot be deleted.
throw new DdlException(
"Distribution column[" + distributionCol.getName() + "] cannot be dropped. index["
+ olapTable.getIndexNameById(alterIndexId) + "]");
}
} // end for distributionCols
}
// 5. calc short key
short newShortKeyColumnCount = Env.calcShortKeyColumnCount(alterSchema,
indexIdToProperties.get(alterIndexId), !olapTable.isDuplicateWithoutKey());
if (LOG.isDebugEnabled()) {
LOG.debug("alter index[{}] short key column count: {}", alterIndexId, newShortKeyColumnCount);
}
indexIdToShortKeyColumnCount.put(alterIndexId, newShortKeyColumnCount);
// 6. store the changed columns for edit log
changedIndexIdToSchema.put(alterIndexId, alterSchema);
if (LOG.isDebugEnabled()) {
LOG.debug("schema change[{}-{}-{}] check pass.", dbId, tableId, alterIndexId);
}
} // end for indices
if (changedIndexIdToSchema.isEmpty() && !hasIndexChange && !hasRowStoreChanged) {
throw new DdlException("Nothing is changed. please check your alter stmt.");
}
// create job
long bufferSize = IdGeneratorUtil.getBufferSizeForAlterTable(olapTable, changedIndexIdToSchema.keySet());
IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
long jobId = idGeneratorBuffer.getNextId();
SchemaChangeJobV2 schemaChangeJob =
AlterJobV2Factory.createSchemaChangeJobV2(rawSql, jobId, dbId, olapTable.getId(), olapTable.getName(),
timeoutSecond * 1000);
schemaChangeJob.setBloomFilterInfo(hasBfChange, bfColumns, bfFpp);
schemaChangeJob.setAlterIndexInfo(hasIndexChange, indexes);
schemaChangeJob.setStoreRowColumnInfo(hasRowStoreChanged, storeRowColumn, rsColumns);
// If StorageFormat is set to TStorageFormat.V2
// which will create tablet with preferred_rowset_type set to BETA
// for both base table and rollup index
if (hasIndexChange) {
// only V2 support index, so if there is index changed, storage format must be V2
storageFormat = TStorageFormat.V2;
}
schemaChangeJob.setStorageFormat(storageFormat);
// the following operations are done outside the 'for indices' loop
// to avoid partial check success
/*
* Create schema change job
* 1. For each index which has been changed, create a SHADOW index,
* and save the mapping of origin index to SHADOW index.
* 2. Create all tablets and replicas of all SHADOW index, add them to tablet inverted index.
* 3. Change table's state as SCHEMA_CHANGE
*/
for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
long originIndexId = entry.getKey();
MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(originIndexId);
// 1. get new schema version/schema version hash, short key column count
int currentSchemaVersion = currentIndexMeta.getSchemaVersion();
int newSchemaVersion = currentSchemaVersion + 1;
// generate schema hash for new index has to generate a new schema hash not equal to current schema hash
int currentSchemaHash = currentIndexMeta.getSchemaHash();
int newSchemaHash = Util.generateSchemaHash();
while (currentSchemaHash == newSchemaHash) {
newSchemaHash = Util.generateSchemaHash();
}
String newIndexName = SHADOW_NAME_PREFIX + olapTable.getIndexNameById(originIndexId);
short newShortKeyColumnCount = indexIdToShortKeyColumnCount.get(originIndexId);
long shadowIndexId = idGeneratorBuffer.getNextId();
// create SHADOW index for each partition
List<Tablet> addedTablets = Lists.newArrayList();
for (Partition partition : olapTable.getPartitions()) {
long partitionId = partition.getId();
TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
// index state is SHADOW
MaterializedIndex shadowIndex = new MaterializedIndex(shadowIndexId, IndexState.SHADOW);
MaterializedIndex originIndex = partition.getIndex(originIndexId);
ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
for (Tablet originTablet : originIndex.getTablets()) {
TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId,
newSchemaHash, medium);
long originTabletId = originTablet.getId();
long shadowTabletId = idGeneratorBuffer.getNextId();
Tablet shadowTablet = EnvFactory.getInstance().createTablet(shadowTabletId);
shadowIndex.addTablet(shadowTablet, shadowTabletMeta);
addedTablets.add(shadowTablet);
schemaChangeJob.addTabletIdMap(partitionId, shadowIndexId, shadowTabletId, originTabletId);
List<Replica> originReplicas = originTablet.getReplicas();
int healthyReplicaNum = 0;
for (Replica originReplica : originReplicas) {
long shadowReplicaId = idGeneratorBuffer.getNextId();
long backendId = originReplica.getBackendId();
if (originReplica.getState() == Replica.ReplicaState.CLONE
|| originReplica.getState() == Replica.ReplicaState.DECOMMISSION
|| originReplica.getState() == ReplicaState.COMPACTION_TOO_SLOW
|| originReplica.getLastFailedVersion() > 0) {
LOG.info("origin replica {} of tablet {} state is {},"
+ " and last failed version is {}, skip creating shadow replica",
originReplica.getId(), originReplica, originReplica.getState(),
originReplica.getLastFailedVersion());
continue;
}
Preconditions.checkState(originReplica.getState() == ReplicaState.NORMAL,
originReplica.getState());
ReplicaContext context = new ReplicaContext();
context.replicaId = shadowReplicaId;
context.backendId = backendId;
context.state = ReplicaState.ALTER;
context.version = Partition.PARTITION_INIT_VERSION;
context.schemaHash = newSchemaHash;
context.dbId = dbId;
context.tableId = tableId;
context.partitionId = partitionId;
context.indexId = shadowIndexId;
context.originReplica = originReplica;
// replica's init state is ALTER, so that tablet report process will ignore its report
Replica shadowReplica = EnvFactory.getInstance().createReplica(context);
shadowTablet.addReplica(shadowReplica);
healthyReplicaNum++;
}
if (healthyReplicaNum < totalReplicaNum / 2 + 1) {
/*
* TODO(cmy): This is a bad design.
* Because in the schema change job, we will only send tasks to the shadow replicas
* that have been created, without checking whether the quorum of replica number are satisfied.
* This will cause the job to fail until we find that the quorum of replica number
* is not satisfied until the entire job is done.
* So here we check the replica number strictly and do not allow to submit the job
* if the quorum of replica number is not satisfied.
*/
for (Tablet tablet : addedTablets) {
Env.getCurrentInvertedIndex().deleteTablet(tablet.getId());
}
throw new DdlException(
"tablet " + originTabletId + " has few healthy replica: " + healthyReplicaNum);
}
}
schemaChangeJob.addPartitionShadowIndex(partitionId, shadowIndexId, shadowIndex);
} // end for partition
schemaChangeJob.addIndexSchema(shadowIndexId, originIndexId, newIndexName, newSchemaVersion, newSchemaHash,
newShortKeyColumnCount, entry.getValue());
} // end for index
// set table state
olapTable.setState(OlapTableState.SCHEMA_CHANGE);
// 2. add schemaChangeJob
addAlterJobV2(schemaChangeJob);
// 3. write edit log
Env.getCurrentEnv().getEditLog().logAlterJob(schemaChangeJob);
LOG.info("finished to create schema change job: {}", schemaChangeJob.getJobId());
}