private void createJob()

in fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java [1261:1739]


    private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long, LinkedList<Column>> indexSchemaMap,
                           Map<String, String> propertyMap, List<Index> indexes) throws UserException {
        checkReplicaCount(olapTable);

        // process properties first
        // for now. properties has 3 options
        // property 1. to specify short key column count.
        // eg.
        //     "indexname1#short_key" = "3"
        //     "indexname2#short_key" = "4"
        Map<Long, Map<String, String>> indexIdToProperties = new HashMap<Long, Map<String, String>>();
        if (propertyMap.size() > 0) {
            for (String key : propertyMap.keySet()) {
                if (key.endsWith(PropertyAnalyzer.PROPERTIES_SHORT_KEY)) {
                    // short key
                    String[] keyArray = key.split("#");
                    if (keyArray.length != 2 || keyArray[0].isEmpty() || !keyArray[1].equals(
                            PropertyAnalyzer.PROPERTIES_SHORT_KEY)) {
                        throw new DdlException("Invalid alter table property: " + key);
                    }

                    HashMap<String, String> prop = new HashMap<String, String>();

                    if (!olapTable.hasMaterializedIndex(keyArray[0])) {
                        throw new DdlException("Index[" + keyArray[0] + "] does not exist");
                    }

                    prop.put(PropertyAnalyzer.PROPERTIES_SHORT_KEY, propertyMap.get(key));
                    indexIdToProperties.put(olapTable.getIndexIdByName(keyArray[0]), prop);
                }
            } // end for property keys
        }

        // for bitmapIndex
        boolean hasIndexChange = false;
        Set<Index> newSet = new HashSet<>(indexes);
        Set<Index> oriSet = new HashSet<>(olapTable.getIndexes());
        if (!newSet.equals(oriSet)) {
            hasIndexChange = true;
        }

        // property 2. bloom filter
        // eg. "bloom_filter_columns" = "k1,k2", "bloom_filter_fpp" = "0.05"
        Set<String> bfColumns = null;
        double bfFpp = 0;
        try {
            bfColumns = PropertyAnalyzer.analyzeBloomFilterColumns(propertyMap,
                    indexSchemaMap.get(olapTable.getBaseIndexId()), olapTable.getKeysType());
            bfFpp = PropertyAnalyzer.analyzeBloomFilterFpp(propertyMap);
        } catch (AnalysisException e) {
            throw new DdlException(e.getMessage());
        }

        // check bloom filter has change
        boolean hasBfChange = false;
        Set<String> oriBfColumns = olapTable.getCopiedBfColumns();
        double oriBfFpp = olapTable.getBfFpp();
        if (bfColumns != null) {
            if (bfFpp == 0) {
                // columns: yes, fpp: no
                if (bfColumns.equals(oriBfColumns)) {
                    throw new DdlException("Bloom filter index has no change");
                }

                if (oriBfColumns == null) {
                    bfFpp = FeConstants.default_bloom_filter_fpp;
                } else {
                    bfFpp = oriBfFpp;
                }
            } else {
                // columns: yes, fpp: yes
                if (bfColumns.equals(oriBfColumns) && bfFpp == oriBfFpp) {
                    throw new DdlException("Bloom filter index has no change");
                }
            }

            hasBfChange = true;
        } else {
            if (bfFpp == 0) {
                // columns: no, fpp: no
                bfFpp = oriBfFpp;
            } else {
                // columns: no, fpp: yes
                if (bfFpp == oriBfFpp) {
                    throw new DdlException("Bloom filter index has no change");
                }
                if (oriBfColumns == null) {
                    throw new DdlException("Bloom filter index has no change");
                }

                hasBfChange = true;
            }

            bfColumns = oriBfColumns;
        }

        if (bfColumns != null && bfColumns.isEmpty()) {
            bfColumns = null;
        }
        if (bfColumns == null) {
            bfFpp = 0;
        }

        Index.checkConflict(newSet, bfColumns);

        // property 3: timeout
        long timeoutSecond = PropertyAnalyzer.analyzeTimeout(propertyMap, Config.alter_table_timeout_second);

        TStorageFormat storageFormat = PropertyAnalyzer.analyzeStorageFormat(propertyMap);

        // property store_row_column && row_store_columns
        // eg. "store_row_column" = "true"
        // eg. "row_store_columns" = "k1, k2"
        List<String> rsColumns = Lists.newArrayList();
        boolean storeRowColumn = false;
        try {
            storeRowColumn = PropertyAnalyzer.analyzeStoreRowColumn(propertyMap);
            rsColumns = PropertyAnalyzer.analyzeRowStoreColumns(propertyMap,
                        olapTable.getColumns().stream().map(Column::getName).collect(Collectors.toList()));
        } catch (AnalysisException e) {
            throw new DdlException(e.getMessage());
        }

        boolean hasRowStoreChanged = false;
        if (storeRowColumn || rsColumns != null) {
            List<String> oriRowStoreColumns = olapTable.getTableProperty().getCopiedRowStoreColumns();
            // correct the comparison logic for null and empty list
            boolean columnsChanged = false;
            if (rsColumns == null) {
                columnsChanged = oriRowStoreColumns != null;
            } else if (oriRowStoreColumns == null) {
                columnsChanged = true;
            } else {
                columnsChanged = !oriRowStoreColumns.equals(rsColumns);
            }
            // partial row store columns changed, or store_row_column enabled, not supported to disable at present
            if (columnsChanged || (!olapTable.storeRowColumn() && storeRowColumn)) {
                // only support mow and duplicate model
                if (!(olapTable.getKeysType() == KeysType.DUP_KEYS
                        || olapTable.getEnableUniqueKeyMergeOnWrite())) {
                    throw new DdlException("`store_row_column` only support duplicate model or mow model");
                }
                hasRowStoreChanged = true;
            }
        }

        // begin checking each table
        // ATTN: DO NOT change any meta in this loop
        long tableId = olapTable.getId();
        Map<Long, Short> indexIdToShortKeyColumnCount = Maps.newHashMap();
        Map<Long, List<Column>> changedIndexIdToSchema = Maps.newHashMap();
        for (Long alterIndexId : indexSchemaMap.keySet()) {
            // Must get all columns including invisible columns.
            // Because in alter process, all columns must be considered.
            List<Column> originSchema = olapTable.getSchemaByIndexId(alterIndexId, true);
            List<Column> alterSchema = indexSchemaMap.get(alterIndexId);
            Set<Column> needAlterColumns = Sets.newHashSet();

            // 0. check if unchanged
            boolean hasColumnChange = false;
            if (alterSchema.size() != originSchema.size()) {
                hasColumnChange = true;
            } else {
                for (int i = 0; i < alterSchema.size(); i++) {
                    Column alterColumn = alterSchema.get(i);
                    if (!alterColumn.equals(originSchema.get(i))) {
                        needAlterColumns.add(alterColumn);
                        hasColumnChange = true;
                    } else {
                        Column oriColumn = originSchema.get(i);
                        if ((oriColumn.getGeneratedColumnInfo() != null
                                || alterColumn.getGeneratedColumnInfo() != null)
                                        && !oriColumn.getGeneratedColumnInfo().getExprSql()
                                                .equals(alterColumn.getGeneratedColumnInfo().getExprSql())) {
                            throw new DdlException("Not supporting alter table modify generated columns.");
                        }
                    }
                }
            }

            // if has column changed, alter it.
            // else:
            //     if no bf change, no alter
            //     if has bf change, should check
            boolean needAlter = false;
            if (hasColumnChange) {
                needAlter = true;
            } else if (hasBfChange) {
                for (Column alterColumn : alterSchema) {
                    String columnName = alterColumn.getName();

                    boolean isOldBfColumn = false;
                    if (oriBfColumns != null && oriBfColumns.contains(columnName)) {
                        isOldBfColumn = true;
                    }

                    boolean isNewBfColumn = false;
                    if (bfColumns != null && bfColumns.contains(columnName)) {
                        isNewBfColumn = true;
                    }

                    if (isOldBfColumn != isNewBfColumn) {
                        // bf column change
                        needAlter = true;
                    } else if (isOldBfColumn && isNewBfColumn && oriBfFpp != bfFpp) {
                        // bf fpp change
                        needAlter = true;
                    }

                    if (needAlter) {
                        break;
                    }
                }
            } else if (hasIndexChange) {
                needAlter = true;
            } else if (hasRowStoreChanged) {
                needAlter = true;
            } else if (storageFormat == TStorageFormat.V2) {
                if (olapTable.getStorageFormat() != TStorageFormat.V2) {
                    needAlter = true;
                }
            }

            if (!needAlter) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("index[{}] is not changed. ignore", alterIndexId);
                }
                continue;
            }

            if (LOG.isDebugEnabled()) {
                LOG.debug("index[{}] is changed. start checking...", alterIndexId);
            }
            // 1. check order: a) has key; b) value after key
            boolean meetValue = false;
            boolean hasKey = false;
            for (Column column : alterSchema) {
                if (column.isKey() && meetValue) {
                    throw new DdlException(
                            "Invalid column order. value should be after key. index[" + olapTable.getIndexNameById(
                                    alterIndexId) + "]");
                }
                if (!column.isKey()) {
                    meetValue = true;
                } else {
                    hasKey = true;
                }
            }
            if (!hasKey && !olapTable.isDuplicateWithoutKey()) {
                throw new DdlException("No key column left. index[" + olapTable.getIndexNameById(alterIndexId) + "]");
            }

            // 2. check compatible
            for (Column alterColumn : alterSchema) {
                for (Column oriColumn : originSchema) {
                    if (alterColumn.nameEquals(oriColumn.getName(), true /* ignore prefix */)) {
                        if (!alterColumn.equals(oriColumn)) {
                            // 3.1 check type
                            oriColumn.checkSchemaChangeAllowed(alterColumn);
                        }
                    }
                } // end for ori
            } // end for alter

            // 3. check partition key
            PartitionInfo partitionInfo = olapTable.getPartitionInfo();
            if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
                List<Column> partitionColumns = partitionInfo.getPartitionColumns();
                for (Column partitionCol : partitionColumns) {
                    boolean found = false;
                    for (Column alterColumn : alterSchema) {
                        if (alterColumn.nameEquals(partitionCol.getName(), true)) {
                            // 2.1 partition column cannot be modified
                            if (needAlterColumns.contains(alterColumn) && !alterColumn.equals(partitionCol)) {
                                throw new DdlException(
                                        "Can not modify partition column[" + partitionCol.getName() + "]. index["
                                                + olapTable.getIndexNameById(alterIndexId) + "]");
                            }
                            found = true;
                            break;
                        }
                    } // end for alterColumns

                    if (!found && alterIndexId == olapTable.getBaseIndexId()) {
                        // 2.1 partition column cannot be deleted.
                        throw new DdlException(
                                "Partition column[" + partitionCol.getName() + "] cannot be dropped. index["
                                        + olapTable.getIndexNameById(alterIndexId) + "]");
                        // ATTN. partition columns' order also need remaining unchanged.
                        // for now, we only allow one partition column, so no need to check order.
                    }
                } // end for partitionColumns
            }

            // 4. check distribution key:
            DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
            if (distributionInfo.getType() == DistributionInfoType.HASH) {
                List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
                for (Column distributionCol : distributionColumns) {
                    boolean found = false;
                    for (Column alterColumn : alterSchema) {
                        if (alterColumn.nameEquals(distributionCol.getName(), true)) {
                            // 3.1 distribution column cannot be modified
                            if (needAlterColumns.contains(alterColumn) && !alterColumn.equals(distributionCol)) {
                                throw new DdlException(
                                        "Can not modify distribution column[" + distributionCol.getName() + "]. index["
                                                + olapTable.getIndexNameById(alterIndexId) + "]");
                            }
                            found = true;
                            break;
                        }
                    } // end for alterColumns

                    if (!found && alterIndexId == olapTable.getBaseIndexId()) {
                        // 2.2 distribution column cannot be deleted.
                        throw new DdlException(
                                "Distribution column[" + distributionCol.getName() + "] cannot be dropped. index["
                                        + olapTable.getIndexNameById(alterIndexId) + "]");
                    }
                } // end for distributionCols
            }

            // 5. calc short key
            short newShortKeyColumnCount = Env.calcShortKeyColumnCount(alterSchema,
                    indexIdToProperties.get(alterIndexId), !olapTable.isDuplicateWithoutKey());
            if (LOG.isDebugEnabled()) {
                LOG.debug("alter index[{}] short key column count: {}", alterIndexId, newShortKeyColumnCount);
            }
            indexIdToShortKeyColumnCount.put(alterIndexId, newShortKeyColumnCount);

            // 6. store the changed columns for edit log
            changedIndexIdToSchema.put(alterIndexId, alterSchema);

            if (LOG.isDebugEnabled()) {
                LOG.debug("schema change[{}-{}-{}] check pass.", dbId, tableId, alterIndexId);
            }
        } // end for indices

        if (changedIndexIdToSchema.isEmpty() && !hasIndexChange && !hasRowStoreChanged) {
            throw new DdlException("Nothing is changed. please check your alter stmt.");
        }

        // create job
        long bufferSize = IdGeneratorUtil.getBufferSizeForAlterTable(olapTable, changedIndexIdToSchema.keySet());
        IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
        long jobId = idGeneratorBuffer.getNextId();
        SchemaChangeJobV2 schemaChangeJob =
                AlterJobV2Factory.createSchemaChangeJobV2(rawSql, jobId, dbId, olapTable.getId(), olapTable.getName(),
                        timeoutSecond * 1000);
        schemaChangeJob.setBloomFilterInfo(hasBfChange, bfColumns, bfFpp);
        schemaChangeJob.setAlterIndexInfo(hasIndexChange, indexes);
        schemaChangeJob.setStoreRowColumnInfo(hasRowStoreChanged, storeRowColumn, rsColumns);

        // If StorageFormat is set to TStorageFormat.V2
        // which will create tablet with preferred_rowset_type set to BETA
        // for both base table and rollup index
        if (hasIndexChange) {
            // only V2 support index, so if there is index changed, storage format must be V2
            storageFormat = TStorageFormat.V2;
        }
        schemaChangeJob.setStorageFormat(storageFormat);

        // the following operations are done outside the 'for indices' loop
        // to avoid partial check success

        /*
         * Create schema change job
         * 1. For each index which has been changed, create a SHADOW index,
         *    and save the mapping of origin index to SHADOW index.
         * 2. Create all tablets and replicas of all SHADOW index, add them to tablet inverted index.
         * 3. Change table's state as SCHEMA_CHANGE
         */
        for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
            long originIndexId = entry.getKey();
            MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(originIndexId);
            // 1. get new schema version/schema version hash, short key column count
            int currentSchemaVersion = currentIndexMeta.getSchemaVersion();
            int newSchemaVersion = currentSchemaVersion + 1;
            // generate schema hash for new index has to generate a new schema hash not equal to current schema hash
            int currentSchemaHash = currentIndexMeta.getSchemaHash();
            int newSchemaHash = Util.generateSchemaHash();
            while (currentSchemaHash == newSchemaHash) {
                newSchemaHash = Util.generateSchemaHash();
            }
            String newIndexName = SHADOW_NAME_PREFIX + olapTable.getIndexNameById(originIndexId);
            short newShortKeyColumnCount = indexIdToShortKeyColumnCount.get(originIndexId);
            long shadowIndexId = idGeneratorBuffer.getNextId();

            // create SHADOW index for each partition
            List<Tablet> addedTablets = Lists.newArrayList();
            for (Partition partition : olapTable.getPartitions()) {
                long partitionId = partition.getId();
                TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
                // index state is SHADOW
                MaterializedIndex shadowIndex = new MaterializedIndex(shadowIndexId, IndexState.SHADOW);
                MaterializedIndex originIndex = partition.getIndex(originIndexId);
                ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
                Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
                for (Tablet originTablet : originIndex.getTablets()) {
                    TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId,
                            newSchemaHash, medium);
                    long originTabletId = originTablet.getId();
                    long shadowTabletId = idGeneratorBuffer.getNextId();

                    Tablet shadowTablet = EnvFactory.getInstance().createTablet(shadowTabletId);
                    shadowIndex.addTablet(shadowTablet, shadowTabletMeta);
                    addedTablets.add(shadowTablet);

                    schemaChangeJob.addTabletIdMap(partitionId, shadowIndexId, shadowTabletId, originTabletId);
                    List<Replica> originReplicas = originTablet.getReplicas();

                    int healthyReplicaNum = 0;
                    for (Replica originReplica : originReplicas) {
                        long shadowReplicaId = idGeneratorBuffer.getNextId();
                        long backendId = originReplica.getBackendId();

                        if (originReplica.getState() == Replica.ReplicaState.CLONE
                                || originReplica.getState() == Replica.ReplicaState.DECOMMISSION
                                || originReplica.getState() == ReplicaState.COMPACTION_TOO_SLOW
                                || originReplica.getLastFailedVersion() > 0) {
                            LOG.info("origin replica {} of tablet {} state is {},"
                                            + " and last failed version is {}, skip creating shadow replica",
                                    originReplica.getId(), originReplica, originReplica.getState(),
                                    originReplica.getLastFailedVersion());
                            continue;
                        }
                        Preconditions.checkState(originReplica.getState() == ReplicaState.NORMAL,
                                originReplica.getState());
                        ReplicaContext context = new ReplicaContext();
                        context.replicaId = shadowReplicaId;
                        context.backendId = backendId;
                        context.state = ReplicaState.ALTER;
                        context.version = Partition.PARTITION_INIT_VERSION;
                        context.schemaHash = newSchemaHash;
                        context.dbId = dbId;
                        context.tableId = tableId;
                        context.partitionId = partitionId;
                        context.indexId = shadowIndexId;
                        context.originReplica = originReplica;
                        // replica's init state is ALTER, so that tablet report process will ignore its report
                        Replica shadowReplica = EnvFactory.getInstance().createReplica(context);
                        shadowTablet.addReplica(shadowReplica);
                        healthyReplicaNum++;
                    }

                    if (healthyReplicaNum < totalReplicaNum / 2 + 1) {
                        /*
                         * TODO(cmy): This is a bad design.
                         * Because in the schema change job, we will only send tasks to the shadow replicas
                         * that have been created, without checking whether the quorum of replica number are satisfied.
                         * This will cause the job to fail until we find that the quorum of replica number
                         * is not satisfied until the entire job is done.
                         * So here we check the replica number strictly and do not allow to submit the job
                         * if the quorum of replica number is not satisfied.
                         */
                        for (Tablet tablet : addedTablets) {
                            Env.getCurrentInvertedIndex().deleteTablet(tablet.getId());
                        }
                        throw new DdlException(
                                "tablet " + originTabletId + " has few healthy replica: " + healthyReplicaNum);
                    }
                }

                schemaChangeJob.addPartitionShadowIndex(partitionId, shadowIndexId, shadowIndex);
            } // end for partition
            schemaChangeJob.addIndexSchema(shadowIndexId, originIndexId, newIndexName, newSchemaVersion, newSchemaHash,
                    newShortKeyColumnCount, entry.getValue());
        } // end for index

        // set table state
        olapTable.setState(OlapTableState.SCHEMA_CHANGE);

        // 2. add schemaChangeJob
        addAlterJobV2(schemaChangeJob);

        // 3. write edit log
        Env.getCurrentEnv().getEditLog().logAlterJob(schemaChangeJob);
        LOG.info("finished to create schema change job: {}", schemaChangeJob.getJobId());
    }