public MutationState addColumn()

in phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java [4509:5058]


    public MutationState addColumn(PTable table, List<ColumnDef> origColumnDefs,
            ListMultimap<String, Pair<String, Object>> stmtProperties, boolean ifNotExists,
            boolean removeTableProps, NamedTableNode namedTableNode, PTableType tableType, boolean cascade, List<NamedNode> indexes)
                    throws SQLException {
        connection.rollback();
        List<PTable> indexesPTable = Lists.newArrayListWithExpectedSize(indexes != null ?
                indexes.size() : table.getIndexes().size());
        Map<PTable, Integer> indexToColumnSizeMap = new HashMap<>();

        // if cascade keyword is passed and indexes are provided either implicitly or explicitly
        if (cascade && (indexes == null || !indexes.isEmpty())) {
            indexesPTable = getIndexesPTableForCascade(indexes, table);
            if (indexesPTable.size() == 0) {
                // go back to regular behavior of altering the table/view
                cascade = false;
            } else {
                for (PTable index : indexesPTable) {
                    indexToColumnSizeMap.put(index, index.getColumns().size());
                }
            }
        }
        boolean wasAutoCommit = connection.getAutoCommit();
        List<PColumn> columns = Lists.newArrayListWithExpectedSize(origColumnDefs != null ?
            origColumnDefs.size() : 0);
        PName tenantId = connection.getTenantId();
        boolean sharedIndex = tableType == PTableType.INDEX && (table.getIndexType() == IndexType.LOCAL || table.getViewIndexId() != null);
        String tenantIdToUse = connection.getTenantId() != null && sharedIndex ? connection.getTenantId().getString() : null;
        String schemaName = table.getSchemaName().getString();
        String tableName = table.getTableName().getString();
        PName physicalName = table.getPhysicalName();
        String physicalSchemaName =
                SchemaUtil.getSchemaNameFromFullName(physicalName.getString());
        String physicalTableName =
                SchemaUtil.getTableNameFromFullName(physicalName.getString());
        Set<String> acquiredColumnMutexSet = Sets.newHashSetWithExpectedSize(3);
        boolean acquiredBaseTableMutex = false;
        try {
            connection.setAutoCommit(false);
            List<ColumnDef> columnDefs;
            if ((table.isAppendOnlySchema() || ifNotExists) && origColumnDefs != null) {
                // only make the rpc if we are adding new columns
                columnDefs = Lists.newArrayList();
                for (ColumnDef columnDef : origColumnDefs) {
                    String familyName = columnDef.getColumnDefName().getFamilyName();
                    String columnName = columnDef.getColumnDefName().getColumnName();
                    if (familyName != null) {
                        try {
                            PColumnFamily columnFamily = table.getColumnFamily(familyName);
                            columnFamily.getPColumnForColumnName(columnName);
                            if (!ifNotExists) {
                                throw new ColumnAlreadyExistsException(schemaName, tableName,
                                  columnName);
                            }
                        } catch (ColumnFamilyNotFoundException | ColumnNotFoundException e) {
                            columnDefs.add(columnDef);
                        }
                    } else {
                        try {
                            table.getColumnForColumnName(columnName);
                            if (!ifNotExists) {
                                throw new ColumnAlreadyExistsException(schemaName, tableName,
                                  columnName);
                            }
                        } catch (ColumnNotFoundException e) {
                            columnDefs.add(columnDef);
                        }
                    }
                }
            } else {
                columnDefs = origColumnDefs == null ? Collections.<ColumnDef>emptyList() : origColumnDefs;
            }

            boolean retried = false;
            boolean changingPhoenixTableProperty = false;
            MutableBoolean areWeIntroducingTTLAtThisLevel = new MutableBoolean(false);
            MetaProperties metaProperties = new MetaProperties();
            while (true) {
                Map<String, List<Pair<String, Object>>> properties=new HashMap<>(stmtProperties.size());;
                metaProperties = loadStmtProperties(stmtProperties,properties,table,removeTableProps);

                ColumnResolver resolver = FromCompiler.getResolver(namedTableNode, connection);
                table = resolver.getTables().get(0).getTable();
                int nIndexes = table.getIndexes().size();
                int numCols = columnDefs.size();
                int nNewColumns = numCols;
                List<Mutation> tableMetaData = Lists.newArrayListWithExpectedSize((1 + nNewColumns) * (nIndexes + 1));
                List<Mutation> columnMetaData = Lists.newArrayListWithExpectedSize(nNewColumns * (nIndexes + 1));
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(LogUtil.addCustomAnnotations("Resolved table to " + table.getName().getString() + " with seqNum " + table.getSequenceNumber() + " at timestamp " + table.getTimeStamp() + " with " + table.getColumns().size() + " columns: " + table.getColumns(), connection));
                }

                int position = table.getColumns().size();

                boolean addPKColumns = columnDefs.stream().anyMatch(ColumnDef::isPK);
                if (addPKColumns) {
                    List<PColumn> currentPKs = table.getPKColumns();
                    PColumn lastPK = currentPKs.get(currentPKs.size()-1);
                    // Disallow adding columns if the last column in the primary key is VARBIANRY
                    // or ARRAY.
                    if (lastPK.getDataType() == PVarbinary.INSTANCE || lastPK.getDataType().isArrayType()) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.VARBINARY_LAST_PK)
                        .setColumnName(lastPK.getName().getString()).build().buildException();
                    }
                    // Disallow adding columns if last column in the primary key is fixed width
                    // and nullable.
                    if (lastPK.isNullable() && lastPK.getDataType().isFixedWidth()) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULLABLE_FIXED_WIDTH_LAST_PK)
                        .setColumnName(lastPK.getName().getString()).build().buildException();
                    }
                }

                MetaPropertiesEvaluated metaPropertiesEvaluated = new MetaPropertiesEvaluated();
                changingPhoenixTableProperty = evaluateStmtProperties(metaProperties,metaPropertiesEvaluated,table,schemaName,tableName,areWeIntroducingTTLAtThisLevel);
                if (areWeIntroducingTTLAtThisLevel.booleanValue()) {
                    //As we are introducing TTL for the first time at this level, we need to check
                    //if TTL is already defined up or down in the hierarchy.
                    TTLExpression ttlAlreadyDefined = TTL_EXPRESSION_NOT_DEFINED;
                    //Check up the hierarchy
                    if (table.getType() != PTableType.TABLE) {
                        ttlAlreadyDefined = checkAndGetTTLFromHierarchy(PhoenixRuntime.getTableNoCache(
                                connection, table.getParentName().toString()), tableName);
                    }
                    if (!ttlAlreadyDefined.equals(TTL_EXPRESSION_NOT_DEFINED)) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.
                                TTL_ALREADY_DEFINED_IN_HIERARCHY)
                                .setSchemaName(schemaName)
                                .setTableName(tableName)
                                .build()
                                .buildException();
                    }

                    /**
                     * To check if TTL is defined at any of the child below we are checking it at
                     * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List,
                     * ColumnMutator, int, PTable, PTable, boolean)} level where in function
                     * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
                     * validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[],
                     * byte[], byte[], List, int)} we are already traversing through
                     * allDescendantViews.
                     */
                }

                boolean isTransformNeeded = TransformClient.checkIsTransformNeeded(metaProperties, schemaName, table, tableName, null, tenantIdToUse, connection);
                if (isTransformNeeded) {
                    // We can add a support for these later. For now, not supported.
                    if (MetaDataUtil.hasLocalIndexTable(connection, physicalTableName.getBytes())) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_TABLE_WITH_LOCAL_INDEX)
                                .setSchemaName(schemaName).setTableName(tableName).build().buildException();
                    }
                    if (table.isAppendOnlySchema()) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_TABLE_WITH_APPEND_ONLY_SCHEMA)
                                .setSchemaName(schemaName).setTableName(tableName).build().buildException();
                    }
                    if (table.isTransactional()) {
                        throw new SQLExceptionInfo.Builder(CANNOT_TRANSFORM_TRANSACTIONAL_TABLE)
                                .setSchemaName(schemaName).setTableName(tableName).build().buildException();
                    }
                }

                // If changing isImmutableRows to true or it's not being changed and is already true
                boolean willBeImmutableRows = Boolean.TRUE.equals(metaPropertiesEvaluated.getIsImmutableRows()) || (metaPropertiesEvaluated.getIsImmutableRows() == null && table.isImmutableRows());
                boolean willBeTxnl = metaProperties.getNonTxToTx();
                Long timeStamp = TransactionUtil.getTableTimestamp(connection, table.isTransactional() || willBeTxnl, table.isTransactional() ? table.getTransactionProvider() : metaPropertiesEvaluated.getTransactionProvider());
                int numPkColumnsAdded = 0;
                Set<String> colFamiliesForPColumnsToBeAdded = new LinkedHashSet<>();
                Set<String> families = new LinkedHashSet<>();
                PTable tableForCQCounters = tableType == PTableType.VIEW
                        ? connection.getTable(table.getPhysicalName().getString())
                        : table;
                EncodedCQCounter cqCounterToUse = tableForCQCounters.getEncodedCQCounter();
                Map<String, Integer> changedCqCounters = new HashMap<>(numCols);
                if (numCols > 0 ) {
                    StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver);
                    short nextKeySeq = SchemaUtil.getMaxKeySeq(table);
                    for ( ColumnDef colDef : columnDefs) {
                        if (colDef != null && !colDef.isNull()) {
                            if (colDef.isPK()) {
                                throw new SQLExceptionInfo.Builder(SQLExceptionCode.NOT_NULLABLE_COLUMN_IN_ROW_KEY)
                                .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                            } else if (!willBeImmutableRows) {
                                throw new SQLExceptionInfo.Builder(SQLExceptionCode.KEY_VALUE_NOT_NULL)
                                .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                            }
                        }
                        if (colDef != null && colDef.isPK() && table.getType() == VIEW && table.getViewType() != MAPPED) {
                            throwIfLastPKOfParentIsVariableLength(getParentOfView(table), schemaName, tableName, colDef);
                        }
                        if (colDef != null && colDef.isRowTimestamp()) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY)
                            .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                        }
                        if (!colDef.validateDefault(context, null)) {
                            colDef = new ColumnDef(colDef, null); // Remove DEFAULT as it's not necessary
                        }
                        if (!colDef.isPK() && table.hasConditionalTTL()) {
                            // Only 1 column family is allowed if the table has conditional TTL
                            PColumnFamily family = table.getColumnFamilies().get(0);
                            String tableFamilyName = family.getName().getString();
                            String colFamilyName = colDef.getColumnDefName().getFamilyName();
                            if (colFamilyName == null) {
                                colFamilyName = table.getDefaultFamilyName() == null ?
                                        DEFAULT_COLUMN_FAMILY :
                                        table.getDefaultFamilyName().getString();
                            }
                            if (!colFamilyName.equals(tableFamilyName)) {
                                throw new SQLExceptionInfo.Builder(
                                        CANNOT_SET_CONDITIONAL_TTL_ON_TABLE_WITH_MULTIPLE_COLUMN_FAMILIES)
                                        .setMessage(String.format("Cannot add column %s", colDef))
                                        .build().buildException();
                            }
                        }
                        String familyName = null;
                        Integer encodedCQ = null;
                        if (!colDef.isPK()) {
                            String colDefFamily = colDef.getColumnDefName().getFamilyName();
                            ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
                            String defaultColumnFamily = tableForCQCounters.getDefaultFamilyName() != null && !Strings.isNullOrEmpty(tableForCQCounters.getDefaultFamilyName().getString()) ?
                                    tableForCQCounters.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY;
                                if (table.getType() == PTableType.INDEX && table.getIndexType() == IndexType.LOCAL) {
                                    defaultColumnFamily = QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + defaultColumnFamily;
                                }
                            if (storageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                                familyName = colDefFamily != null ? colDefFamily : defaultColumnFamily;
                            } else {
                                familyName = defaultColumnFamily;
                            }
                            encodedCQ = table.isAppendOnlySchema() ? Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + position) : cqCounterToUse.getNextQualifier(familyName);
                            if (!table.isAppendOnlySchema() && cqCounterToUse.increment(familyName)) {
                                changedCqCounters.put(familyName,
                                    cqCounterToUse.getNextQualifier(familyName));
                            }
                        }
                        byte[] columnQualifierBytes = null;
                        try {
                            columnQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(), encodedCQ, table, colDef.isPK());
                        }
                        catch (QualifierOutOfRangeException e) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
                            .setSchemaName(schemaName)
                            .setTableName(tableName).build().buildException();
                        }
                        PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true, columnQualifierBytes, willBeImmutableRows);
                        HashMap<PTable, PColumn> indexToIndexColumnMap = null;
                        if (cascade) {
                            indexToIndexColumnMap = getPTablePColumnHashMapForCascade(indexesPTable, willBeImmutableRows,
                                            colDef, familyName, indexToColumnSizeMap);
                        }

                        columns.add(column);
                        String pkName = null;
                        Short keySeq = null;

                        // TODO: support setting properties on other families?
                        if (column.getFamilyName() == null) {
                            ++numPkColumnsAdded;
                            pkName = table.getPKName() == null ? null : table.getPKName().getString();
                            keySeq = ++nextKeySeq;
                        } else {
                            families.add(column.getFamilyName().getString());
                        }
                        colFamiliesForPColumnsToBeAdded.add(column.getFamilyName() == null ? null : column.getFamilyName().getString());
                        addColumnMutation(connection, schemaName, tableName, column, null, pkName, keySeq, table.getBucketNum() != null);
                        // add new columns for given indexes one by one
                        if (cascade) {
                            for (PTable index: indexesPTable) {
                                LOGGER.info("Adding column "+column.getName().getString()+" to "+index.getTableName().toString());
                                addColumnMutation(connection, schemaName, index.getTableName().getString(), indexToIndexColumnMap.get(index), null, "", keySeq, index.getBucketNum() != null);
                            }
                        }
                    }

                    // Add any new PK columns to end of index PK
                    if (numPkColumnsAdded > 0) {
                        // create PK column list that includes the newly created columns
                        List<PColumn> pkColumns = Lists.newArrayListWithExpectedSize(table.getPKColumns().size()+numPkColumnsAdded);
                        pkColumns.addAll(table.getPKColumns());
                        for (int i=0; i<numCols; ++i) {
                            if (columnDefs.get(i).isPK()) {
                                pkColumns.add(columns.get(i));
                            }
                        }
                        int pkSlotPosition = table.getPKColumns().size()-1;
                        for (PTable index : table.getIndexes()) {
                            short nextIndexKeySeq = SchemaUtil.getMaxKeySeq(index);
                            int indexPosition = index.getColumns().size();
                            for (int i=0; i<numCols; ++i) {
                                ColumnDef colDef = columnDefs.get(i);
                                if (colDef.isPK()) {
                                    PDataType indexColDataType = IndexUtil.getIndexColumnDataType(colDef.isNull(), colDef.getDataType());
                                    ColumnName indexColName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, colDef.getColumnDefName().getColumnName()));
                                    Expression expression = new RowKeyColumnExpression(columns.get(i), new RowKeyValueAccessor(pkColumns, pkSlotPosition));
                                    ColumnDef indexColDef = FACTORY.columnDef(indexColName, indexColDataType.getSqlTypeName(), colDef.isNull(), colDef.getMaxLength(), colDef.getScale(), true, colDef.getSortOrder(), expression.toString(), colDef.isRowTimestamp());
                                    PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, null, willBeImmutableRows);
                                    addColumnMutation(connection, schemaName, index.getTableName().getString(), indexColumn, index.getParentTableName().getString(), index.getPKName() == null ? null : index.getPKName().getString(), ++nextIndexKeySeq, index.getBucketNum() != null);
                                }
                            }
                        }
                        ++pkSlotPosition;
                    }
                    columnMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                    connection.rollback();
                } else {
                    // Check that HBase configured properly for mutable secondary indexing
                    // if we're changing from an immutable table to a mutable table and we
                    // have existing indexes.
                    if (Boolean.FALSE.equals(metaPropertiesEvaluated.getIsImmutableRows()) && !table.getIndexes().isEmpty()) {
                        int hbaseVersion = connection.getQueryServices().getLowestClusterHBaseVersion();
                        if (hbaseVersion < MetaDataProtocol.MUTABLE_SI_VERSION_THRESHOLD) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES)
                            .setSchemaName(schemaName).setTableName(tableName).build().buildException();
                        }
                        if (!connection.getQueryServices().hasIndexWALCodec() && !table.isTransactional()) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG)
                            .setSchemaName(schemaName).setTableName(tableName).build().buildException();
                        }
                    }
                    if (Boolean.TRUE.equals(metaPropertiesEvaluated.getMultiTenant())) {
                        throwIfInsufficientColumns(schemaName, tableName, table.getPKColumns(), table.getBucketNum()!=null, metaPropertiesEvaluated.getMultiTenant());
                    }
                }

                if (!table.getIndexes().isEmpty() &&
                        (numPkColumnsAdded>0 || metaProperties.getNonTxToTx() ||
                                metaPropertiesEvaluated.getUpdateCacheFrequency() != null)) {
                    for (PTable index : table.getIndexes()) {
                        incrementTableSeqNum(index, index.getType(), numPkColumnsAdded,
                                metaProperties.getNonTxToTx() ? Boolean.TRUE : null,
                                metaPropertiesEvaluated.getUpdateCacheFrequency(),
                                metaPropertiesEvaluated.getPhysicalTableName(),
                                metaPropertiesEvaluated.getSchemaVersion(),
                                metaProperties.getColumnEncodedBytesProp());
                    }
                    tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                    connection.rollback();
                }

                if (cascade) {
                    for (PTable index : indexesPTable) {
                        incrementTableSeqNum(index, index.getType(), columnDefs.size(),
                                Boolean.FALSE,
                                metaPropertiesEvaluated.getUpdateCacheFrequency(),
                                metaPropertiesEvaluated.getPhysicalTableName(),
                                metaPropertiesEvaluated.getSchemaVersion(),
                                metaPropertiesEvaluated.getColumnEncodedBytes());
                    }
                    tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                    connection.rollback();
                }

                long seqNum = 0;
                if (changingPhoenixTableProperty || columnDefs.size() > 0) {
                    seqNum = incrementTableSeqNum(table, tableType, columnDefs.size(), metaPropertiesEvaluated);

                    tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                    connection.rollback();
                }

                PTable transformingNewTable = null;
                if (isTransformNeeded) {
                   try {
                       transformingNewTable = TransformClient.addTransform(connection, tenantIdToUse, table, metaProperties, seqNum, PTable.TransformType.METADATA_TRANSFORM);
                    } catch (SQLException ex) {
                       connection.rollback();
                       throw ex;
                   }
                }

                // Force the table header row to be first
                Collections.reverse(tableMetaData);
                // Add column metadata afterwards, maintaining the order so columns have more predictable ordinal position
                tableMetaData.addAll(columnMetaData);
                if (!changedCqCounters.isEmpty()) {
                    try (PreparedStatement linkStatement = connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNTER)) {
                        for (Entry<String, Integer> entry : changedCqCounters.entrySet()) {
                            linkStatement.setString(1, tenantIdToUse);
                            linkStatement.setString(2, tableForCQCounters.getSchemaName().getString());
                            linkStatement.setString(3, tableForCQCounters.getTableName().getString());
                            linkStatement.setString(4, entry.getKey());
                            linkStatement.setInt(5, entry.getValue());
                            linkStatement.execute();
                        }
                    }

                    // When a view adds its own columns, then we need to increase the sequence number of the base table
                    // too since we want clients to get the latest PTable of the base table.
                    if (tableType == VIEW) {
                        try (PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM)) {
                            incrementStatement.setString(1, null);
                            incrementStatement.setString(2, tableForCQCounters.getSchemaName().getString());
                            incrementStatement.setString(3, tableForCQCounters.getTableName().getString());
                            incrementStatement.setLong(4, tableForCQCounters.getSequenceNumber() + 1);
                            incrementStatement.execute();
                        }
                    }
                    tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                    connection.rollback();
                }

                byte[] family = families.size() > 0 ?
                        families.iterator().next().getBytes(StandardCharsets.UTF_8) : null;

                // Figure out if the empty column family is changing as a result of adding the new column
                byte[] emptyCF = null;
                byte[] projectCF = null;
                if (table.getType() != PTableType.VIEW && family != null) {
                    if (table.getColumnFamilies().isEmpty()) {
                        emptyCF = family;
                    } else {
                        try {
                            table.getColumnFamily(family);
                        } catch (ColumnFamilyNotFoundException e) {
                            projectCF = family;
                            emptyCF = SchemaUtil.getEmptyColumnFamily(table);
                        }
                    }
                }

                if (EncodedColumnsUtil.usesEncodedColumnNames(table)
                        && stmtProperties.isEmpty() && !acquiredBaseTableMutex) {
                    // For tables that use column encoding acquire a mutex on
                    // the base table as we need to update the encoded column
                    // qualifier counter on the base table. Not applicable to
                    // ALTER TABLE/VIEW SET <property> statements because
                    // we don't update the column qualifier counter while
                    // setting property, hence the check: stmtProperties.isEmpty()
                    acquiredBaseTableMutex = writeCell(null, physicalSchemaName,
                        physicalTableName, null);
                    if (!acquiredBaseTableMutex) {
                        throw new ConcurrentTableMutationException(
                            physicalSchemaName, physicalTableName);
                    }
                }
                for (PColumn pColumn : columns) {
                    // acquire the mutex using the global physical table name to
                    // prevent creating the same column on a table or view with
                    // a conflicting type etc
                    boolean acquiredMutex = writeCell(null, physicalSchemaName, physicalTableName,
                        pColumn.toString());
                    if (!acquiredMutex && !acquiredColumnMutexSet.contains(pColumn.toString())) {
                        throw new ConcurrentTableMutationException(physicalSchemaName, physicalTableName);
                    }
                    acquiredColumnMutexSet.add(pColumn.toString());
                }
                MetaDataMutationResult result = connection.getQueryServices().addColumn(tableMetaData, table,
                        getParentTable(table), transformingNewTable, properties, colFamiliesForPColumnsToBeAdded, columns);

                try {
                    MutationCode code = processMutationResult(schemaName, tableName, result);
                    if (code == MutationCode.COLUMN_ALREADY_EXISTS) {
                        addTableToCache(result, false);
                        if (!ifNotExists) {
                            throw new ColumnAlreadyExistsException(schemaName, tableName, SchemaUtil.findExistingColumn(result.getTable(), columns));
                        }
                        return new MutationState(0, 0, connection);
                    }
                    // Only update client side cache if we aren't adding a PK column to a table with indexes or
                    // transitioning a table from non transactional to transactional.
                    // We could update the cache manually then too, it'd just be a pain.
                    String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
                    long resolvedTimeStamp = TransactionUtil.getResolvedTime(connection, result);
                    if (table.getIndexes().isEmpty() || (numPkColumnsAdded==0 && ! metaProperties.getNonTxToTx())) {
                        addTableToCache(result, false, resolvedTimeStamp);
                        table = result.getTable();
                    } else  {
                        // remove the table from the cache, it will be fetched from the server the
                        // next time it is resolved
                        connection.removeTable(tenantId, fullTableName, null, resolvedTimeStamp);
                    }
                    // Delete rows in view index if we haven't dropped it already
                    // We only need to do this if the multiTenant transitioned to false
                    if (table.getType() == PTableType.TABLE
                            && Boolean.FALSE.equals(metaPropertiesEvaluated.getMultiTenant())
                            && MetaDataUtil.hasViewIndexTable(connection, table.getPhysicalName())) {
                        connection.setAutoCommit(true);
                        MetaDataUtil.deleteViewIndexSequences(connection, table.getPhysicalName(), table.isNamespaceMapped());
                        // If we're not dropping metadata, then make sure no rows are left in
                        // our view index physical table.
                        // TODO: remove this, as the DROP INDEX commands run when the DROP VIEW
                        // commands are run would remove all rows already.
                        if (!connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA)) {
                            Long scn = connection.getSCN();
                            long ts = (scn == null ? result.getMutationTime() : scn);
                            byte[] viewIndexPhysicalName = MetaDataUtil
                                    .getViewIndexPhysicalName(table.getPhysicalName().getBytes());
                            String viewIndexSchemaName = SchemaUtil.getSchemaNameFromFullName(viewIndexPhysicalName);
                            String viewIndexTableName = SchemaUtil.getTableNameFromFullName(viewIndexPhysicalName);
                            PName viewIndexName = PNameFactory.newName(SchemaUtil.getTableName(viewIndexSchemaName, viewIndexTableName));

                            PTable viewIndexTable = new PTableImpl.Builder()
                                    .setName(viewIndexName)
                                    .setKey(new PTableKey(tenantId, viewIndexName.getString()))
                                    .setSchemaName(PNameFactory.newName(viewIndexSchemaName))
                                    .setTableName(PNameFactory.newName(viewIndexTableName))
                                    .setType(PTableType.VIEW)
                                    .setViewType(ViewType.MAPPED)
                                    .setTimeStamp(ts)
                                    .setPkColumns(Collections.<PColumn>emptyList())
                                    .setAllColumns(Collections.<PColumn>emptyList())
                                    .setRowKeySchema(RowKeySchema.EMPTY_SCHEMA)
                                    .setIndexes(Collections.<PTable>emptyList())
                                    .setFamilyAttributes(table.getColumnFamilies())
                                    .setPhysicalNames(Collections.<PName>emptyList())
                                    .setNamespaceMapped(table.isNamespaceMapped())
                                    .setImmutableStorageScheme(table.getImmutableStorageScheme())
                                    .setQualifierEncodingScheme(table.getEncodingScheme())
                                    .setUseStatsForParallelization(table.useStatsForParallelization())
                                    .build();
                            List<TableRef> tableRefs = Collections.singletonList(new TableRef(null, viewIndexTable, ts, false));
                            MutationPlan plan = new PostDDLCompiler(connection).compile(tableRefs, null, null,
                                    Collections.<PColumn>emptyList(), ts);
                            connection.getQueryServices().updateData(plan);
                        }
                    }
                    if (transformingNewTable != null) {
                        connection.removeTable(tenantId, fullTableName, null, resolvedTimeStamp);
                        connection.getQueryServices().clearCache();
                    }
                    if (emptyCF != null) {
                        Long scn = connection.getSCN();
                        connection.setAutoCommit(true);
                        // Delete everything in the column. You'll still be able to do queries at earlier timestamps
                        long ts = (scn == null ? result.getMutationTime() : scn);
                        MutationPlan plan = new PostDDLCompiler(connection).compile(Collections.singletonList(new TableRef(null, table, ts, false)), emptyCF, projectCF == null ? null : Collections.singletonList(projectCF), null, ts);
                        return connection.getQueryServices().updateData(plan);
                    }
                    return new MutationState(0, 0, connection);
                } catch (ConcurrentTableMutationException e) {
                    if (retried) {
                        throw e;
                    }
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(LogUtil.addCustomAnnotations("Caught ConcurrentTableMutationException for table " + SchemaUtil.getTableName(schemaName, tableName) + ". Will try again...", connection));
                    }
                    retried = true;
                } catch(Throwable e) {
                    TableMetricsManager.updateMetricsForSystemCatalogTableMethod(tableName,
                            NUM_METADATA_LOOKUP_FAILURES, 1);
                    throw e;
                }
            }
        } finally {
            connection.setAutoCommit(wasAutoCommit);
            if (acquiredBaseTableMutex) {
                // release the mutex on the physical table (used to prevent concurrent conflicting
                // add column changes)
                deleteCell(null, physicalSchemaName, physicalTableName, null);
            }
            deleteMutexCells(physicalSchemaName, physicalTableName, acquiredColumnMutexSet);
        }
    }