private PTable createTableInternal()

in phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java [2460:3827]


    private PTable createTableInternal(
            CreateTableStatement statement,
            byte[][] splits,
            final PTable parent,
            String viewStatement,
            ViewType viewType,
            PDataType viewIndexIdType,
            final byte[] rowKeyMatcher,
            final byte[][] viewColumnConstants,
            final BitSet isViewColumnReferenced,
            boolean allocateIndexId,
            IndexType indexType,
            Date asyncCreatedDate,
            Set<PTable.CDCChangeScope> cdcIncludeScopes,
            Map<String, Object> tableProps,
            Map<String, Object> commonFamilyProps) throws SQLException {
        final PTableType tableType = statement.getTableType();
        boolean wasAutoCommit = connection.getAutoCommit();
        TableName tableNameNode = null;
        boolean allowSystemCatalogRollback =
                connection.getQueryServices().getProps().getBoolean(
                    QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK,
                    QueryServicesOptions.DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK);
        Set<String> acquiredColumnMutexSet = Sets.newHashSetWithExpectedSize(3);
        String parentPhysicalName =
                (parent!=null &&  parent.getPhysicalName()!=null) ? parent.getPhysicalName().getString() : null;
        String parentPhysicalSchemaName = parentPhysicalName!=null ?
                SchemaUtil.getSchemaNameFromFullName(parentPhysicalName) : null;
        String parentPhysicalTableName = parentPhysicalName!=null ?
                SchemaUtil.getTableNameFromFullName(parentPhysicalName) : null;
        connection.rollback();
        try {
            connection.setAutoCommit(false);
            List<Mutation> tableMetaData = Lists.newArrayListWithExpectedSize(statement.getColumnDefs().size() + 3);

            tableNameNode = statement.getTableName();
            final String schemaName = connection.getSchema() != null && tableNameNode.getSchemaName() == null ? connection.getSchema() : tableNameNode.getSchemaName();
            final String tableName = tableNameNode.getTableName();
            String parentTableName = null;
            PName tenantId = connection.getTenantId();
            String tenantIdStr = tenantId == null ? null : tenantId.getString();
            Long scn = connection.getSCN();
            long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
            boolean multiTenant = false;
            boolean storeNulls = false;
            TransactionFactory.Provider transactionProvider = (parent!= null) ? parent.getTransactionProvider() : null;
            Integer saltBucketNum = null;
            String defaultFamilyName = null;
            boolean isImmutableRows = false;
            boolean isAppendOnlySchema = false;
            List<PName> physicalNames = Collections.emptyList();
            boolean addSaltColumn = false;
            boolean rowKeyOrderOptimizable = true;
            Long timestamp = null;
            boolean isNamespaceMapped = parent == null
                    ? SchemaUtil.isNamespaceMappingEnabled(tableType, connection.getQueryServices().getProps())
                    : parent.isNamespaceMapped();
            boolean isLocalIndex = indexType == IndexType.LOCAL;
            QualifierEncodingScheme encodingScheme = NON_ENCODED_QUALIFIERS;
            ImmutableStorageScheme immutableStorageScheme = ONE_CELL_PER_COLUMN;
            int baseTableColumnCount =
                    tableType == PTableType.VIEW ? parent.getColumns().size()
                            : QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;

            TTLExpression ttl = TTL_EXPRESSION_NOT_DEFINED;
            TTLExpression ttlFromHierarchy = TTL_EXPRESSION_NOT_DEFINED;
            TTLExpression ttlProp = (TTLExpression) TableProperty.TTL.getValue(tableProps);

            // Validate TTL prop value if set
            if (ttlProp != null) {
                if (!isViewTTLEnabled() && tableType == VIEW) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.
                        VIEW_TTL_NOT_ENABLED)
                        .setSchemaName(schemaName)
                        .setTableName(tableName)
                        .build()
                        .buildException();
                }

                if (tableType != TABLE && (tableType != VIEW || viewType != UPDATABLE)) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.
                        TTL_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY)
                        .setSchemaName(schemaName)
                        .setTableName(tableName)
                        .build()
                        .buildException();
                }
                ttlFromHierarchy = checkAndGetTTLFromHierarchy(parent, tableName);
                if (!ttlFromHierarchy.equals(TTL_EXPRESSION_NOT_DEFINED)) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.
                            TTL_ALREADY_DEFINED_IN_HIERARCHY)
                            .setSchemaName(schemaName)
                            .setTableName(tableName)
                            .build()
                            .buildException();
                }
                try {
                    ttlProp.validateTTLOnCreate(connection, statement, parent, tableProps);
                } catch (IllegalArgumentException e) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.ILLEGAL_DATA)
                            .setMessage(e.getMessage())
                            .setSchemaName(schemaName)
                            .setTableName(tableName)
                            .build()
                            .buildException();
                }
                ttl = ttlProp;
            } else {
                ttlFromHierarchy = checkAndGetTTLFromHierarchy(parent, tableName);
                if (!ttlFromHierarchy.equals(TTL_EXPRESSION_NOT_DEFINED)) {
                    ttlFromHierarchy.validateTTLOnCreate(connection,
                            statement,
                            parent,
                            tableProps);
                }
            }

            Boolean isChangeDetectionEnabledProp =
                (Boolean) TableProperty.CHANGE_DETECTION_ENABLED.getValue(tableProps);
            verifyChangeDetectionTableType(tableType, isChangeDetectionEnabledProp);

            String schemaVersion = (String) TableProperty.SCHEMA_VERSION.getValue(tableProps);
            String streamingTopicName = (String) TableProperty.STREAMING_TOPIC_NAME.getValue(tableProps);

            String cdcIncludeScopesStr = cdcIncludeScopes == null ? null :
                    CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes);

            if (parent != null && tableType == PTableType.INDEX) {
                timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider);
                isImmutableRows = parent.isImmutableRows();
                isAppendOnlySchema = parent.isAppendOnlySchema();

                // Index on view
                // TODO: Can we support a multi-tenant index directly on a multi-tenant
                // table instead of only a view? We don't have anywhere to put the link
                // from the table to the index, though.
                if (isLocalIndex || (parent.getType() == PTableType.VIEW && parent.getViewType() != ViewType.MAPPED)) {
                    PName physicalName = parent.getPhysicalName();

                    saltBucketNum = parent.getBucketNum();
                    addSaltColumn = (saltBucketNum != null && !isLocalIndex);
                    defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
                    if (isLocalIndex) {
                        defaultFamilyName =
                                parent.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY
                                        : IndexUtil.getLocalIndexColumnFamily(parent.getDefaultFamilyName().getString());
                        saltBucketNum = null;
                        // Set physical name of local index table
                        physicalNames = Collections.singletonList(PNameFactory.newName(physicalName.getBytes()));
                    } else {
                        defaultFamilyName = parent.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY : parent.getDefaultFamilyName().getString();
                        // Set physical name of view index table
                        // Parent is a view and this is an index so we need to get _IDX_+logical name of base table.
                        // parent.getPhysicalName is Schema.Physical of base and we can't use it since the _IDX_ table is logical name of the base.
                        // parent.getName is the view name. parent.getBaseTableLogicalName is the logical name of the base table
                        PName parentName = parent.getBaseTableLogicalName();
                        physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(parentName, isNamespaceMapped)));
                    }
                }

                multiTenant = parent.isMultiTenant();
                storeNulls = parent.getStoreNulls();
                parentTableName = parent.getTableName().getString();
                // Pass through data table sequence number so we can check it hasn't changed
                try (PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM)) {
                    incrementStatement.setString(1, tenantIdStr);
                    incrementStatement.setString(2, schemaName);
                    incrementStatement.setString(3, parentTableName);
                    incrementStatement.setLong(4, parent.getSequenceNumber());
                    incrementStatement.execute();
                    // Get list of mutations and add to table meta data that will be passed to server
                    // to guarantee order. This row will always end up last
                    tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
                    connection.rollback();
                }

                // Add row linking from data table row to index table row
                try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK)) {
                    linkStatement.setString(1, tenantIdStr);
                    linkStatement.setString(2, schemaName);
                    linkStatement.setString(3, parentTableName);
                    linkStatement.setString(4, tableName);
                    linkStatement.setByte(5, LinkType.INDEX_TABLE.getSerializedValue());
                    linkStatement.setLong(6, parent.getSequenceNumber());
                    linkStatement.setString(7, PTableType.INDEX.getSerializedValue());
                    linkStatement.execute();
                }

                // Add row linking index table to parent table for indexes on views
                if (parent.getType() == PTableType.VIEW) {
                    try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_VIEW_INDEX_PARENT_LINK)) {
                        linkStatement.setString(1, tenantIdStr);
                        linkStatement.setString(2, schemaName);
                        linkStatement.setString(3, tableName);
                        linkStatement.setString(4, parent.getName().getString());
                        linkStatement.setByte(5, LinkType.VIEW_INDEX_PARENT_TABLE.getSerializedValue());
                        linkStatement.execute();
                    }
                }
            }

            PrimaryKeyConstraint pkConstraint = statement.getPrimaryKeyConstraint();
            String pkName = null;
            List<Pair<ColumnName,SortOrder>> pkColumnsNames = Collections.<Pair<ColumnName,SortOrder>>emptyList();
            Iterator<Pair<ColumnName,SortOrder>> pkColumnsIterator = Collections.emptyIterator();
            if (pkConstraint != null) {
                pkColumnsNames = pkConstraint.getColumnNames();
                pkColumnsIterator = pkColumnsNames.iterator();
                pkName = pkConstraint.getName();
            }

            // Although unusual, it's possible to set a mapped VIEW as having immutable rows.
            // This tells Phoenix that you're managing the index maintenance yourself.
            if (tableType != PTableType.INDEX && (tableType != PTableType.VIEW || viewType == ViewType.MAPPED)) {
            	// TODO remove TableProperty.IMMUTABLE_ROWS at the next major release
            	Boolean isImmutableRowsProp = statement.immutableRows()!=null? statement.immutableRows() :
            		(Boolean) TableProperty.IMMUTABLE_ROWS.getValue(tableProps);
                if (isImmutableRowsProp == null) {
                    isImmutableRows = connection.getQueryServices().getProps().getBoolean(QueryServices.IMMUTABLE_ROWS_ATTRIB, QueryServicesOptions.DEFAULT_IMMUTABLE_ROWS);
                } else {
                    isImmutableRows = isImmutableRowsProp;
                }
            }
            if (tableType == PTableType.TABLE) {
                Boolean isAppendOnlySchemaProp = (Boolean) TableProperty.APPEND_ONLY_SCHEMA.getValue(tableProps);
                isAppendOnlySchema = isAppendOnlySchemaProp!=null ? isAppendOnlySchemaProp : false;
            }

            // Can't set any of these on views or shared indexes on views
            if (tableType != PTableType.VIEW && tableType != PTableType.CDC && !allocateIndexId) {
                saltBucketNum = (Integer) TableProperty.SALT_BUCKETS.getValue(tableProps);
                if (saltBucketNum != null) {
                    if (saltBucketNum < 0 || saltBucketNum > SaltingUtil.MAX_BUCKET_NUM) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_BUCKET_NUM).build().buildException();
                    }
                }
                // Salt the index table if the data table is salted
                if (saltBucketNum == null) {
                    if (parent != null) {
                        saltBucketNum = parent.getBucketNum();
                    }
                } else if (saltBucketNum.intValue() == 0) {
                    saltBucketNum = null; // Provides a way for an index to not be salted if its data table is salted
                }
                addSaltColumn = (saltBucketNum != null);
            }

            // Can't set MULTI_TENANT or DEFAULT_COLUMN_FAMILY_NAME on an INDEX or a non mapped VIEW
            if (tableType != PTableType.INDEX && (tableType != PTableType.VIEW || viewType == ViewType.MAPPED)) {
                Boolean multiTenantProp = (Boolean) tableProps.get(PhoenixDatabaseMetaData.MULTI_TENANT);
                multiTenant = Boolean.TRUE.equals(multiTenantProp);
                defaultFamilyName = (String)TableProperty.DEFAULT_COLUMN_FAMILY.getValue(tableProps);
            }

            boolean disableWAL = false;
            Boolean disableWALProp = (Boolean) TableProperty.DISABLE_WAL.getValue(tableProps);
            if (disableWALProp != null) {
                disableWAL = disableWALProp;
            }
            long updateCacheFrequency = (Long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue(
                    connection.getQueryServices().getProps().get(
                            QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB));
            if (tableType == PTableType.INDEX && parent != null) {
                updateCacheFrequency = parent.getUpdateCacheFrequency();
            }
            Long updateCacheFrequencyProp = (Long) TableProperty.UPDATE_CACHE_FREQUENCY.getValue(tableProps);
            if (tableType != PTableType.INDEX && updateCacheFrequencyProp != null) {
                updateCacheFrequency = updateCacheFrequencyProp;
            }

            String physicalTableName = (String) TableProperty.PHYSICAL_TABLE_NAME.getValue(tableProps);
            String autoPartitionSeq = (String) TableProperty.AUTO_PARTITION_SEQ.getValue(tableProps);
            Long guidePostsWidth = (Long) TableProperty.GUIDE_POSTS_WIDTH.getValue(tableProps);

            // We only allow setting guide post width for a base table
            if (guidePostsWidth != null && tableType != PTableType.TABLE) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_GUIDE_POST_WIDTH)
                        .setSchemaName(schemaName).setTableName(tableName).build().buildException();
            }

            Boolean storeNullsProp = (Boolean) TableProperty.STORE_NULLS.getValue(tableProps);
            if (storeNullsProp == null) {
                if (parent == null) {
                    storeNulls = connection.getQueryServices().getProps().getBoolean(
                                    QueryServices.DEFAULT_STORE_NULLS_ATTRIB,
                                    QueryServicesOptions.DEFAULT_STORE_NULLS);
                    tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.valueOf(storeNulls));
                }
            } else {
                storeNulls = storeNullsProp;
            }
            Boolean transactionalProp = (Boolean) TableProperty.TRANSACTIONAL.getValue(tableProps);
            TransactionFactory.Provider transactionProviderProp = (TransactionFactory.Provider) TableProperty.TRANSACTION_PROVIDER.getValue(tableProps);
            if ((transactionalProp != null || transactionProviderProp != null) && parent != null) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL)
                .setSchemaName(schemaName).setTableName(tableName)
                .build().buildException();
            }
            if (parent == null) {
                boolean transactional;
                if (transactionProviderProp != null) {
                    transactional = true;
                } else if (transactionalProp == null) {
                    transactional = connection.getQueryServices().getProps().getBoolean(
                                    QueryServices.DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB,
                                    QueryServicesOptions.DEFAULT_TABLE_ISTRANSACTIONAL);
                } else {
                    transactional = transactionalProp;
                }
                if (transactional) {
                    if (transactionProviderProp == null) {
                        transactionProvider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(
                                connection.getQueryServices().getProps().get(
                                        QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB,
                                        QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER));
                    } else {
                        transactionProvider = transactionProviderProp;
                    }
                }
            }
            boolean transactionsEnabled = connection.getQueryServices().getProps().getBoolean(
                                            QueryServices.TRANSACTIONS_ENABLED,
                                            QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
            // can't create a transactional table if transactions are not enabled
            if (!transactionsEnabled && transactionProvider != null) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_IF_TXNS_DISABLED)
                .setSchemaName(schemaName).setTableName(tableName)
                .build().buildException();
            }
            // can't create a transactional table if it has a row timestamp column
            if (pkConstraint.getNumColumnsWithRowTimestamp() > 0 && transactionProvider != null) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP)
                .setSchemaName(schemaName).setTableName(tableName)
                .build().buildException();
            }
            if ((isPhoenixTTLEnabled() ? ttlProp != null
                    : TableProperty.TTL.getValue(commonFamilyProps) != null)
                    && transactionProvider != null 
                    && transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL)) {
                throw new SQLExceptionInfo.Builder(PhoenixTransactionProvider.Feature.SET_TTL.getCode())
                .setMessage(transactionProvider.name())
                .setSchemaName(schemaName)
                .setTableName(tableName)
                .build()
                .buildException();
            }

            // Put potentially inferred value into tableProps as it's used by the createTable call below
            // to determine which coprocessors to install on the new table.
            tableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, transactionProvider);
            if (transactionProvider != null) {
                // If TTL set, use transaction context TTL property name instead
                // Note: After PHOENIX-6627, is PhoenixTransactionContext.PROPERTY_TTL still useful?
                Object transactionTTL = commonFamilyProps.remove(ColumnFamilyDescriptorBuilder.TTL);
                if (transactionTTL != null) {
                    commonFamilyProps.put(PhoenixTransactionContext.PROPERTY_TTL, transactionTTL);
                }
            }

            Boolean useStatsForParallelizationProp =
                    (Boolean) TableProperty.USE_STATS_FOR_PARALLELIZATION.getValue(tableProps);

            boolean sharedTable = statement.getTableType() == PTableType.VIEW || allocateIndexId;
            if (transactionProvider != null) {
                // We turn on storeNulls for transactional tables for compatibility. This was required
                // when Tephra was a supported txn engine option. After PHOENIX-6627, this may no longer
                // be necessary.
                // Tephra would have converted normal delete markers on the server which could mess up
                // our secondary index code as the changes get committed prior to the
                // maintenance code being able to see the prior state to update the rows correctly.
                // A future tnx engine might do the same?
                if (Boolean.FALSE.equals(storeNullsProp)) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL)
                    .setSchemaName(schemaName).setTableName(tableName)
                    .build().buildException();
                }
                storeNulls = true;
                tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.TRUE);

                if (!sharedTable) {
                    Integer maxVersionsProp = (Integer) commonFamilyProps.get(HConstants.VERSIONS);
                    if (maxVersionsProp == null) {
                        if (parent != null) {
                            TableDescriptor desc = connection.getQueryServices().getTableDescriptor(parent.getPhysicalName().getBytes());
                            if (desc != null) {
                                maxVersionsProp = desc.getColumnFamily(SchemaUtil.getEmptyColumnFamily(parent)).getMaxVersions();
                            }
                        }
                        if (maxVersionsProp == null) {
                            maxVersionsProp = connection.getQueryServices().getProps().getInt(
                                    QueryServices.MAX_VERSIONS_TRANSACTIONAL_ATTRIB,
                                    QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL);
                        }
                        commonFamilyProps.put(HConstants.VERSIONS, maxVersionsProp);
                    }
                }
            }
            timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider) : timestamp;

            // Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views
            if (sharedTable) {
                if (tableProps.get(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME) != null) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE)
                    .setSchemaName(schemaName).setTableName(tableName)
                    .build().buildException();
                }
                if (SchemaUtil.hasHTableDescriptorProps(tableProps)) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build()
                            .buildException();
                }
            }

            List<ColumnDef> colDefs = statement.getColumnDefs();
            LinkedHashMap<PColumn,PColumn> columns;
            LinkedHashSet<PColumn> pkColumns;

            if (tenantId != null && !sharedTable) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TENANT_SPECIFIC_TABLE)
                    .setSchemaName(schemaName).setTableName(tableName).build().buildException();
            }
            if (autoPartitionSeq!=null) {
                int autoPartitionColIndex = multiTenant ? 1 : 0;
                PDataType dataType = colDefs.get(autoPartitionColIndex).getDataType();
                if (!PLong.INSTANCE.isCastableTo(dataType)) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.SEQUENCE_NOT_CASTABLE_TO_AUTO_PARTITION_ID_COLUMN)
                    .setSchemaName(schemaName).setTableName(tableName).build().buildException();
                }
            }

            if (tableType == PTableType.VIEW) {
                physicalNames = Collections.singletonList(PNameFactory.newName(parent.getPhysicalName().getString()));
                if (viewType == ViewType.MAPPED) {
                    columns = Maps.newLinkedHashMap();
                    pkColumns = newLinkedHashSetWithExpectedSize(colDefs.size());
                } else {
                    // Propagate property values to VIEW.
                    // TODO: formalize the known set of these properties
                    // Manually transfer the ROW_KEY_ORDER_OPTIMIZABLE_BYTES from parent as we don't
                    // want to add this hacky flag to the schema (see PHOENIX-2067).
                    rowKeyOrderOptimizable = parent.rowKeyOrderOptimizable();
                    if (rowKeyOrderOptimizable) {
                        UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetaData, SchemaUtil.getTableKey(tenantIdStr, schemaName, tableName), clientTimeStamp);
                    }
                    multiTenant = parent.isMultiTenant();
                    saltBucketNum = parent.getBucketNum();
                    isAppendOnlySchema = parent.isAppendOnlySchema();
                    isImmutableRows = parent.isImmutableRows();
                    if (updateCacheFrequencyProp == null) {
                        // set to the parent value if the property is not set on the view
                        updateCacheFrequency = parent.getUpdateCacheFrequency();
                    }
                    disableWAL = (disableWALProp == null ? parent.isWALDisabled() : disableWALProp);
                    defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
                    // TODO PHOENIX-4766 Add an options to stop sending parent metadata when creating views
                    List<PColumn> allColumns = parent.getColumns();
                    if (saltBucketNum != null) { // Don't include salt column in columns, as it should not have it when created
                        allColumns = allColumns.subList(1, allColumns.size());
                    }
                    columns = new LinkedHashMap<PColumn,PColumn>(allColumns.size() + colDefs.size());
                    for (PColumn column : allColumns) {
                        columns.put(column, column);
                    }
                    pkColumns = newLinkedHashSet(parent.getPKColumns());

                    // Add row linking view to its parent
                    try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_VIEW_LINK)) {
                        linkStatement.setString(1, tenantIdStr);
                        linkStatement.setString(2, schemaName);
                        linkStatement.setString(3, tableName);
                        linkStatement.setString(4, parent.getName().getString());
                        linkStatement.setByte(5, LinkType.PARENT_TABLE.getSerializedValue());
                        linkStatement.setString(6, parent.getTenantId() == null ? null : parent.getTenantId().getString());
                        linkStatement.execute();
                    }
                    // Add row linking parent to view
                    // TODO From 4.16 write the child links to SYSTEM.CHILD_LINK directly
                    try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_CHILD_LINK)) {
                        linkStatement.setString(1, parent.getTenantId() == null ? null : parent.getTenantId().getString());
                        linkStatement.setString(2, parent.getSchemaName() == null ? null : parent.getSchemaName().getString());
                        linkStatement.setString(3, parent.getTableName().getString());
                        linkStatement.setString(4, tenantIdStr);
                        linkStatement.setString(5, SchemaUtil.getTableName(schemaName, tableName));
                        linkStatement.setByte(6, LinkType.CHILD_TABLE.getSerializedValue());
                        linkStatement.execute();
                    }
                }
            } else {
                columns = new LinkedHashMap<PColumn,PColumn>(colDefs.size());
                pkColumns = newLinkedHashSetWithExpectedSize(colDefs.size() + 1); // in case salted
            }

            if (tableType == PTableType.CDC) {
                if (parent.getType() == VIEW) {
                    physicalNames = Collections.singletonList(
                            PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(
                                    parent.getBaseTableLogicalName(), isNamespaceMapped)));
                }
                else {
                    physicalNames = Collections.singletonList(
                            PNameFactory.newName(SchemaUtil.getTableName(schemaName,
                                    CDCUtil.getCDCIndexName(tableName))));
                }
            }

            // Don't add link for mapped view, as it just points back to itself and causes the drop to
            // fail because it looks like there's always a view associated with it.
            if (!physicalNames.isEmpty()) {
                // Upsert physical name for mapped view only if the full physical table name is different than the full table name
                // Otherwise, we end up with a self-referencing link and then cannot ever drop the view.
                if (viewType != ViewType.MAPPED
                    || (!physicalNames.get(0).getString().equals(SchemaUtil.getTableName(schemaName, tableName))
                    && !physicalNames.get(0).getString().equals(SchemaUtil.getPhysicalHBaseTableName(
                        schemaName, tableName, isNamespaceMapped).getString()))) {
                    // Add row linking from data table row to physical table row
                    try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK)) {
                        for (PName physicalName : physicalNames) {
                            linkStatement.setString(1, tenantIdStr);
                            linkStatement.setString(2, schemaName);
                            linkStatement.setString(3, tableName);
                            linkStatement.setString(4, physicalName.getString());
                            linkStatement.setByte(5, LinkType.PHYSICAL_TABLE.getSerializedValue());
                            if (tableType == PTableType.VIEW) {
                                if (parent.getType() == PTableType.TABLE) {
                                    linkStatement.setString(4, SchemaUtil.getTableName(parent.getSchemaName().getString(), parent.getTableName().getString()));
                                    linkStatement.setLong(6, parent.getSequenceNumber());
                                } else { //This is a grandchild view, find the physical base table
                                    PTable logicalTable = connection.getTable(new PTableKey(null, SchemaUtil.replaceNamespaceSeparator(physicalName)));
                                    linkStatement.setString(4, SchemaUtil.getTableName(logicalTable.getSchemaName().getString(), logicalTable.getTableName().getString()));
                                    linkStatement.setLong(6, logicalTable.getSequenceNumber());
                                }
                                // Set link to logical name
                                linkStatement.setString(7, null);
                            } else {
                                linkStatement.setLong(6, parent.getSequenceNumber());
                                linkStatement.setString(7, PTableType.INDEX.getSerializedValue());
                            }
                            linkStatement.execute();
                        }
                    }
                    tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
                    connection.rollback();
                }
            }

            Map<String, PName> familyNames = Maps.newLinkedHashMap();
            boolean rowTimeStampColumnAlreadyFound = false;
            int positionOffset = columns.size();
            if (saltBucketNum != null) {
                positionOffset++;
                if (addSaltColumn) {
                    pkColumns.add(SaltingUtil.SALTING_COLUMN);
                }
            }
            int pkPositionOffset = pkColumns.size();
            int position = positionOffset;
            EncodedCQCounter cqCounter = NULL_COUNTER;
            Map<String, Integer> changedCqCounters = new HashMap<>(colDefs.size());
            // Check for duplicate column qualifiers
            Map<String, Set<Integer>> inputCqCounters = new HashMap<>();
            PTable viewPhysicalTable = null;
            if (tableType == PTableType.VIEW) {
                /*
                 * We can't control what column qualifiers are used in HTable mapped to Phoenix views. So we are not
                 * able to encode column names.
                 */
                if (viewType != MAPPED) {
                    /*
                     * For regular phoenix views, use the storage scheme of the physical table since they all share the
                     * the same HTable. Views always use the base table's column qualifier counter for doling out
                     * encoded column qualifier.
                     */
                    viewPhysicalTable = connection.getTable(physicalNames.get(0).getString());
                    immutableStorageScheme = viewPhysicalTable.getImmutableStorageScheme();
                    encodingScheme = viewPhysicalTable.getEncodingScheme();
					if (EncodedColumnsUtil.usesEncodedColumnNames(viewPhysicalTable)) {
                        cqCounter  = viewPhysicalTable.getEncodedCQCounter();
                    }
                }
            }
            // System tables have hard-coded column qualifiers. So we can't use column encoding for them.
            else if (!SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))|| SchemaUtil.isLogTable(schemaName, tableName)) {
                /*
                 * Indexes inherit the storage scheme of the parent data tables. Otherwise, we always attempt to
                 * create tables with encoded column names.
                 *
                 * Also of note is the case with shared indexes i.e. local indexes and view indexes. In these cases,
                 * column qualifiers for covered columns don't have to be unique because rows of the logical indexes are
                 * partitioned by the virtue of indexId present in the row key. As such, different shared indexes can use
                 * potentially overlapping column qualifiers.
                 *
                 */
                if (parent != null) {
                    Byte encodingSchemeSerializedByte = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
                    // Table has encoding scheme defined
                    if (encodingSchemeSerializedByte != null) {
                        encodingScheme = getEncodingScheme(tableProps, schemaName, tableName, transactionProvider);
                    } else {
                        encodingScheme = parent.getEncodingScheme();
                    }

                    ImmutableStorageScheme immutableStorageSchemeProp = (ImmutableStorageScheme) TableProperty.IMMUTABLE_STORAGE_SCHEME.getValue(tableProps);
                    if (immutableStorageSchemeProp == null) {
                        immutableStorageScheme = parent.getImmutableStorageScheme();
                    } else {
                        checkImmutableStorageSchemeForIndex(immutableStorageSchemeProp, schemaName, tableName, transactionProvider);
                        immutableStorageScheme = immutableStorageSchemeProp;
                    }

                    if (immutableStorageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                        if (encodingScheme == NON_ENCODED_QUALIFIERS) {
                            if (encodingSchemeSerializedByte != null) {
                                // encoding scheme is set as non-encoded on purpose, so we should fail
                                throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES)
                                        .setSchemaName(schemaName).setTableName(tableName).build().buildException();
                            } else {
                                // encoding scheme is inherited from parent but it is not compatible with Single Cell.
                                encodingScheme =
                                        QualifierEncodingScheme.fromSerializedValue(
                                                (byte) QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES);
                            }
                        }
                    }

                    if (tableType != CDC &&
                            parent.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS &&
                            immutableStorageScheme == ONE_CELL_PER_COLUMN) {
                        throw new SQLExceptionInfo.Builder(
                                SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE)
                                .setSchemaName(schemaName).setTableName(tableName).build()
                                .buildException();
                    }
                    LOGGER.info(String.format("STORAGE--ENCODING: %s--%s", immutableStorageScheme, encodingScheme));
                } else {
                    encodingScheme = getEncodingScheme(tableProps, schemaName, tableName, transactionProvider);

                    ImmutableStorageScheme immutableStorageSchemeProp =
                            (ImmutableStorageScheme) TableProperty.IMMUTABLE_STORAGE_SCHEME
                                    .getValue(tableProps);
                    if (immutableStorageSchemeProp == null) {
                        // Ignore default if transactional and column encoding is not supported
                        if (transactionProvider == null ||
                                !transactionProvider.getTransactionProvider().isUnsupported(
                                        PhoenixTransactionProvider.Feature.COLUMN_ENCODING)) {
                            if (multiTenant) {
                                immutableStorageScheme =
                                        ImmutableStorageScheme
                                                .valueOf(connection
                                                        .getQueryServices()
                                                        .getProps()
                                                        .get(
                                                                QueryServices.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
                                                                QueryServicesOptions.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME));
                            } else {
                                if (isImmutableRows) {
                                    immutableStorageScheme =
                                            ImmutableStorageScheme
                                                    .valueOf(connection
                                                            .getQueryServices()
                                                            .getProps()
                                                            .get(
                                                                    QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
                                                                    QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME));
                                } else {
                                    immutableStorageScheme = ONE_CELL_PER_COLUMN;
                                }
                            }
                        }
                    } else {
                        immutableStorageScheme = isImmutableRows ? immutableStorageSchemeProp : ONE_CELL_PER_COLUMN;
                        checkImmutableStorageSchemeForIndex(immutableStorageScheme, schemaName, tableName, transactionProvider);
                    }
                    if (immutableStorageScheme != ONE_CELL_PER_COLUMN
                            && encodingScheme == NON_ENCODED_QUALIFIERS) {
                        throw new SQLExceptionInfo.Builder(
                                SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES)
                                .setSchemaName(schemaName).setTableName(tableName).build()
                                .buildException();
                    }
                }
                cqCounter = encodingScheme != NON_ENCODED_QUALIFIERS ? new EncodedCQCounter() : NULL_COUNTER;
                if (encodingScheme != NON_ENCODED_QUALIFIERS && statement.getFamilyCQCounters() != null)
                {
                    for (Map.Entry<String, Integer> cq : statement.getFamilyCQCounters().entrySet()) {
                        if (cq.getValue() < QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_CQ)
                                    .setSchemaName(schemaName)
                                    .setTableName(tableName).build().buildException();
                        }
                        cqCounter.setValue(cq.getKey(), cq.getValue());
                        changedCqCounters.put(cq.getKey(), cqCounter.getNextQualifier(cq.getKey()));
                        inputCqCounters.putIfAbsent(cq.getKey(), new HashSet<Integer>());
                    }
                }
            }

            boolean wasPKDefined = false;
            // Keep track of all columns that are newly added to a view
            Set<Integer> viewNewColumnPositions =
                    Sets.newHashSetWithExpectedSize(colDefs.size());
            Set<String> pkColumnNames = new HashSet<>();
            for (PColumn pColumn : pkColumns) {
                pkColumnNames.add(pColumn.getName().toString());
            }
            for (ColumnDef colDef : colDefs) {
                rowTimeStampColumnAlreadyFound = checkAndValidateRowTimestampCol(colDef, pkConstraint, rowTimeStampColumnAlreadyFound, tableType);
                if (colDef.isPK()) { // i.e. the column is declared as CREATE TABLE COLNAME DATATYPE PRIMARY KEY...
                    if (wasPKDefined) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_ALREADY_EXISTS)
                            .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                    }
                    wasPKDefined = true;
                } else {
                    // do not allow setting NOT-NULL constraint on non-primary columns.
                    if (  !colDef.isNull() && !isImmutableRows &&
                        ( wasPKDefined || !SchemaUtil.isPKColumn(pkConstraint, colDef))) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.KEY_VALUE_NOT_NULL)
                                .setSchemaName(schemaName)
                                .setTableName(tableName)
                                .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                    }
                }
                ColumnName columnDefName = colDef.getColumnDefName();
                String colDefFamily = columnDefName.getFamilyName();
                boolean isPkColumn = SchemaUtil.isPKColumn(pkConstraint, colDef);
                String cqCounterFamily = null;
                if (!isPkColumn) {
                    if (immutableStorageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS && encodingScheme != NON_ENCODED_QUALIFIERS) {
                        // For this scheme we track column qualifier counters at the column family level.
                        cqCounterFamily = colDefFamily != null ? colDefFamily : (defaultFamilyName != null ? defaultFamilyName : DEFAULT_COLUMN_FAMILY);
                    } else {
                        // For other schemes, column qualifier counters are tracked using the default column family.
                        cqCounterFamily = defaultFamilyName != null ? defaultFamilyName : DEFAULT_COLUMN_FAMILY;
                    }
                }
                // Use position as column qualifier if APPEND_ONLY_SCHEMA to prevent gaps in
                // the column encoding (PHOENIX-4737).
                Integer encodedCQ = null;
                if (!isPkColumn) {
                    if (colDef.getEncodedQualifier() != null && encodingScheme != NON_ENCODED_QUALIFIERS) {
                        if (cqCounter.getNextQualifier(cqCounterFamily) > ENCODED_CQ_COUNTER_INITIAL_VALUE &&
                                !inputCqCounters.containsKey(cqCounterFamily)) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MISSING_CQ)
                                    .setSchemaName(schemaName)
                                    .setTableName(tableName).build().buildException();
                        }

                        if (statement.getFamilyCQCounters() == null ||
                                statement.getFamilyCQCounters().get(cqCounterFamily) == null) {
                            if (colDef.getEncodedQualifier() >= cqCounter.getNextQualifier(cqCounterFamily)) {
                                cqCounter.setValue(cqCounterFamily, colDef.getEncodedQualifier());
                                cqCounter.increment(cqCounterFamily);
                            }
                            changedCqCounters.put(cqCounterFamily, cqCounter.getNextQualifier(cqCounterFamily));
                        }

                        encodedCQ = colDef.getEncodedQualifier();
                        if (encodedCQ < QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE ||
                                encodedCQ >= cqCounter.getNextQualifier(cqCounterFamily)) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_CQ)
                                    .setSchemaName(schemaName)
                                    .setTableName(tableName).build().buildException();
                        }

                        inputCqCounters.putIfAbsent(cqCounterFamily, new HashSet<Integer>());
                        Set<Integer> familyCounters = inputCqCounters.get(cqCounterFamily);
                        if (!familyCounters.add(encodedCQ)) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.DUPLICATE_CQ)
                                    .setSchemaName(schemaName)
                                    .setTableName(tableName).build().buildException();
                        }
                    } else {
                        if (inputCqCounters.containsKey(cqCounterFamily)) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MISSING_CQ)
                                    .setSchemaName(schemaName)
                                    .setTableName(tableName).build().buildException();
                        }

                        if (isAppendOnlySchema) {
                            encodedCQ = Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + position);
                        } else {
                            encodedCQ = cqCounter.getNextQualifier(cqCounterFamily);
                        }
                    }
                }
                byte[] columnQualifierBytes = null;
                try {
                    columnQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(columnDefName.getColumnName(), encodedCQ, encodingScheme, isPkColumn);
                }
                catch (QualifierOutOfRangeException e) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
                    .setSchemaName(schemaName)
                    .setTableName(tableName).build().buildException();
                }
                PColumn column = newColumn(position++, colDef, pkConstraint, defaultFamilyName, false, columnQualifierBytes, isImmutableRows);
                if (!isAppendOnlySchema && colDef.getEncodedQualifier() == null
                        && cqCounter.increment(cqCounterFamily)) {
                    changedCqCounters.put(cqCounterFamily, cqCounter.getNextQualifier(cqCounterFamily));
                }
                if (SchemaUtil.isPKColumn(column)) {
                    // TODO: remove this constraint?
                    if (pkColumnsIterator.hasNext() && !column.getName().getString().equals(pkColumnsIterator.next().getFirst().getColumnName())) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_OUT_OF_ORDER)
                            .setSchemaName(schemaName)
                            .setTableName(tableName)
                            .setColumnName(column.getName().getString())
                            .build().buildException();
                    }
                    if (tableType == PTableType.VIEW && viewType != ViewType.MAPPED) {
                        throwIfLastPKOfParentIsVariableLength(parent, schemaName, tableName, colDef);
                    }
                    if (!pkColumns.add(column)) {
                        throw new ColumnAlreadyExistsException(schemaName, tableName, column.getName().getString());
                    }
                }
                // check for duplicate column
                if (isDuplicateColumn(columns, pkColumnNames, column)) {
                    throw new ColumnAlreadyExistsException(schemaName, tableName,
                            column.getName().getString());
                } else if (tableType == VIEW) {
                    viewNewColumnPositions.add(column.getPosition());
                }
                if (isPkColumn) {
                    pkColumnNames.add(column.getName().toString());
                }
                if ((colDef.getDataType() == PVarbinary.INSTANCE || colDef.getDataType().isArrayType())
                        && SchemaUtil.isPKColumn(column)
                        && pkColumnsIterator.hasNext()) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.VARBINARY_IN_ROW_KEY)
                        .setSchemaName(schemaName)
                        .setTableName(tableName)
                        .setColumnName(column.getName().getString())
                        .build().buildException();
                }
                if (column.getFamilyName() != null) {
                    familyNames.put(
                        IndexUtil.getActualColumnFamilyName(column.getFamilyName().getString()),
                        column.getFamilyName());
                }
            }

            // We need a PK definition for a TABLE or mapped VIEW
            if (!wasPKDefined && pkColumnsNames.isEmpty() && tableType != PTableType.VIEW && viewType != ViewType.MAPPED) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_MISSING)
                    .setSchemaName(schemaName)
                    .setTableName(tableName)
                    .build().buildException();
            }
            if (!pkColumnsNames.isEmpty() && pkColumnsNames.size() != pkColumns.size() - pkPositionOffset) { // Then a column name in the primary key constraint wasn't resolved
                Iterator<Pair<ColumnName,SortOrder>> pkColumnNamesIterator = pkColumnsNames.iterator();
                while (pkColumnNamesIterator.hasNext()) {
                    ColumnName colName = pkColumnNamesIterator.next().getFirst();
                    ColumnDef colDef = findColumnDefOrNull(colDefs, colName);
                    if (colDef == null) {
                        throw new ColumnNotFoundException(schemaName, tableName, null, colName.getColumnName());
                    }
                    if (colDef.getColumnDefName().getFamilyName() != null) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_WITH_FAMILY_NAME)
                        .setSchemaName(schemaName)
                        .setTableName(tableName)
                        .setColumnName(colDef.getColumnDefName().getColumnName() )
                        .setFamilyName(colDef.getColumnDefName().getFamilyName())
                        .build().buildException();
                    }
                }
                // The above should actually find the specific one, but just in case...
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_PRIMARY_KEY_CONSTRAINT)
                    .setSchemaName(schemaName)
                    .setTableName(tableName)
                    .build().buildException();
            }

            if (!statement.getProps().isEmpty()) {
                for (String familyName : statement.getProps().keySet()) {
                    if (!familyName.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) {
                        if (familyNames.get(familyName) == null) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.PROPERTIES_FOR_FAMILY)
                                .setFamilyName(familyName).build().buildException();
                        } else if (statement.getTableType() == PTableType.VIEW) {
                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build().buildException();
                        }
                    }
                }
            }
            throwIfInsufficientColumns(schemaName, tableName, pkColumns, saltBucketNum!=null, multiTenant);

            List<Pair<byte[],Map<String,Object>>> familyPropList = Lists.newArrayListWithExpectedSize(familyNames.size());
            populateFamilyPropsList(familyNames, commonFamilyProps, statement, defaultFamilyName, isLocalIndex, familyPropList);

            // Bootstrapping for our SYSTEM.TABLE that creates itself before it exists
            if (SchemaUtil.isMetaTable(schemaName,tableName)) {
                // TODO: what about stats for system catalog?
                PName newSchemaName = PNameFactory.newName(schemaName);
                // Column names and qualifiers and hardcoded for system tables.
                PTable table = new PTableImpl.Builder()
                        .setType(tableType)
                        .setTimeStamp(MetaDataProtocol.MIN_TABLE_TIMESTAMP)
                        .setIndexDisableTimestamp(0L)
                        .setSequenceNumber(PTable.INITIAL_SEQ_NUM)
                        .setImmutableRows(isImmutableRows)
                        .setDisableWAL(Boolean.TRUE.equals(disableWAL))
                        .setMultiTenant(false)
                        .setStoreNulls(false)
                        .setViewIndexIdType(viewIndexIdType)
                        .setIndexType(indexType)
                        .setUpdateCacheFrequency(0)
                        .setNamespaceMapped(isNamespaceMapped)
                        .setAutoPartitionSeqName(autoPartitionSeq)
                        .setAppendOnlySchema(isAppendOnlySchema)
                        .setImmutableStorageScheme(ONE_CELL_PER_COLUMN)
                        .setQualifierEncodingScheme(NON_ENCODED_QUALIFIERS)
                        .setBaseColumnCount(QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT)
                        .setEncodedCQCounter(PTable.EncodedCQCounter.NULL_COUNTER)
                        .setUseStatsForParallelization(true)
                        .setExcludedColumns(ImmutableList.<PColumn>of())
                        .setTenantId(tenantId)
                        .setSchemaName(newSchemaName)
                        .setTableName(PNameFactory.newName(tableName))
                        .setPkName(PNameFactory.newName(QueryConstants.SYSTEM_TABLE_PK_NAME))
                        .setDefaultFamilyName(defaultFamilyName == null ? null :
                                PNameFactory.newName(defaultFamilyName))
                        .setRowKeyOrderOptimizable(true)
                        .setIndexes(Collections.<PTable>emptyList())
                        .setPhysicalNames(ImmutableList.<PName>of())
                        .setColumns(columns.values())
                        .setLastDDLTimestamp(0L)
                        .setIndexWhere(statement.getWhereClause() == null ? null
                                : statement.getWhereClause().toString())
                        .setRowKeyMatcher(rowKeyMatcher)
                        .setTTL(TTL_EXPRESSION_NOT_DEFINED)
                        .build();
                connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
            }

            // Update column qualifier counters
            if (EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme)) {
                // Store the encoded column counter for phoenix entities that have their own hbase
                // tables i.e. base tables and indexes.
                String schemaNameToUse = tableType == VIEW ? viewPhysicalTable.getSchemaName().getString() : schemaName;
                String tableNameToUse = tableType == VIEW ? viewPhysicalTable.getTableName().getString() : tableName;
                boolean sharedIndex = tableType == PTableType.INDEX && (indexType == IndexType.LOCAL || parent.getType() == PTableType.VIEW);
                // For local indexes and indexes on views, pass on the the tenant id since all their meta-data rows have
                // tenant ids in there.
                String tenantIdToUse = connection.getTenantId() != null && sharedIndex ? connection.getTenantId().getString() : null;
                // When a view adds its own columns, then we need to increase the sequence number of the base table
                // too since we want clients to get the latest PTable of the base table.
                for (Entry<String, Integer> entry : changedCqCounters.entrySet()) {
                    try (PreparedStatement linkStatement = connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNTER)) {
                        linkStatement.setString(1, tenantIdToUse);
                        linkStatement.setString(2, schemaNameToUse);
                        linkStatement.setString(3, tableNameToUse);
                        linkStatement.setString(4, entry.getKey());
                        linkStatement.setInt(5, entry.getValue());
                        linkStatement.execute();
                    }
                }
                if (tableType == VIEW && !changedCqCounters.isEmpty()) {
                    try (PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM)) {
                        incrementStatement.setString(1, null);
                        incrementStatement.setString(2, viewPhysicalTable.getSchemaName().getString());
                        incrementStatement.setString(3, viewPhysicalTable.getTableName().getString());
                        incrementStatement.setLong(4, viewPhysicalTable.getSequenceNumber() + 1);
                        incrementStatement.execute();
                    }
                }
                if (connection.getMutationState().toMutations(timestamp).hasNext()) {
                    tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
                    connection.rollback();
                }
            }

            short nextKeySeq = 0;

            List<Mutation> columnMetadata = Lists.newArrayListWithExpectedSize(columns.size());
            boolean isRegularView = (tableType == PTableType.VIEW && viewType!=ViewType.MAPPED);
            for (Map.Entry<PColumn, PColumn> entry : columns.entrySet()) {
                PColumn column = entry.getValue();
                final int columnPosition = column.getPosition();
                // For client-side cache, we need to update the column
                // set the autoPartition column attributes
                if (parent != null && parent.getAutoPartitionSeqName() != null
                        && parent.getPKColumns().get(MetaDataUtil.getAutoPartitionColIndex(parent)).equals(column)) {
                    entry.setValue(column = new DelegateColumn(column) {
                        @Override
                        public byte[] getViewConstant() {
                            // set to non-null value so that we will generate a Put that
                            // will be set correctly on the server
                            return QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
                        }

                        @Override
                        public boolean isViewReferenced() {
                            return true;
                        }
                    });
                } else if (isViewColumnReferenced != null) {
                    if (viewColumnConstants != null && columnPosition < viewColumnConstants.length) {
                        entry.setValue(column = new DelegateColumn(column) {
                            @Override
                            public byte[] getViewConstant() {
                                return viewColumnConstants[columnPosition];
                            }

                            @Override
                            public boolean isViewReferenced() {
                                return isViewColumnReferenced.get(columnPosition);
                            }
                        });
                    } else {
                        entry.setValue(column = new DelegateColumn(column) {
                            @Override
                            public boolean isViewReferenced() {
                                return isViewColumnReferenced.get(columnPosition);
                            }
                        });
                    }

                    // if the base table column is referenced in the view
                    // or if we are adding a new column during view creation
                    if (isViewColumnReferenced.get(columnPosition) ||
                            viewNewColumnPositions.contains(
                                    columnPosition)) {
                        // acquire the mutex using the global physical table
                        // name to prevent this column from being dropped
                        // while the view is being created or to prevent
                        // a conflicting column from being added to a parent
                        // in case the view creation adds new columns
                        boolean acquiredMutex = writeCell(
                                null,
                                parentPhysicalSchemaName,
                                parentPhysicalTableName,
                                column.toString());
                        if (!acquiredMutex) {
                            throw new ConcurrentTableMutationException(
                                    parentPhysicalSchemaName,
                                    parentPhysicalTableName);
                        }
                        acquiredColumnMutexSet.add(column.toString());
                    }
                }
                Short keySeq = SchemaUtil.isPKColumn(column) ? ++nextKeySeq : null;
                // Prior to PHOENIX-3534 we were sending the parent table column metadata while creating a
                // child view, now that we combine columns by resolving the parent table hierarchy we
                // don't need to include the parent table columns while creating a view
                // If QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK is true we continue
                // to store the parent table column metadata along with the child view metadata
                // so that we can rollback the upgrade if required.
                if (allowSystemCatalogRollback || !isRegularView
                        || columnPosition >= baseTableColumnCount) {
                    addColumnMutation(connection, schemaName, tableName, column, parentTableName,
                            pkName, keySeq, saltBucketNum != null);
                    columnMetadata.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
                    connection.rollback();
                }
            }

            // add the columns in reverse order since we reverse the list later
            Collections.reverse(columnMetadata);
            tableMetaData.addAll(columnMetadata);
            String dataTableName = parent == null || tableType == PTableType.VIEW ? null : parent.getTableName().getString();
            PIndexState defaultCreateState;
            String defaultCreateStateString = connection.getClientInfo(INDEX_CREATE_DEFAULT_STATE);
            if (defaultCreateStateString == null)  {
                defaultCreateStateString = connection.getQueryServices().getConfiguration().get(
                     INDEX_CREATE_DEFAULT_STATE, QueryServicesOptions.DEFAULT_CREATE_INDEX_STATE);
            }
            defaultCreateState = PIndexState.valueOf(defaultCreateStateString);
            if (defaultCreateState == PIndexState.CREATE_DISABLE) {
                if  (indexType == IndexType.LOCAL || sharedTable) {
                    defaultCreateState = PIndexState.BUILDING;
                }
            }
            PIndexState indexState = parent == null ||
                    (tableType == PTableType.VIEW || tableType == PTableType.CDC) ?
                    null : defaultCreateState;
            if (indexState == null && tableProps.containsKey(INDEX_STATE)) {
                indexState = PIndexState.fromSerializedValue(tableProps.get(INDEX_STATE).toString());
            }
            PreparedStatement tableUpsert = connection.prepareStatement(CREATE_TABLE);
            tableUpsert.setString(1, tenantIdStr);
            tableUpsert.setString(2, schemaName);
            tableUpsert.setString(3, tableName);
            tableUpsert.setString(4, tableType.getSerializedValue());
            tableUpsert.setLong(5, PTable.INITIAL_SEQ_NUM);
            tableUpsert.setInt(6, position);
            if (saltBucketNum != null) {
                tableUpsert.setInt(7, saltBucketNum);
            } else {
                tableUpsert.setNull(7, Types.INTEGER);
            }
            tableUpsert.setString(8, pkName);
            tableUpsert.setString(9, dataTableName);
            tableUpsert.setString(10, indexState == null ? null : indexState.getSerializedValue());
            tableUpsert.setBoolean(11, isImmutableRows);
            tableUpsert.setString(12, defaultFamilyName);
            if (parent != null && parent.getAutoPartitionSeqName() != null && viewStatement==null) {
                // set to non-null value so that we will generate a Put that
                // will be set correctly on the server
                tableUpsert.setString(13, QueryConstants.EMPTY_COLUMN_VALUE);
            }
            else {
                tableUpsert.setString(13, viewStatement);
            }
            tableUpsert.setBoolean(14, disableWAL);
            tableUpsert.setBoolean(15, multiTenant);
            if (viewType == null) {
                tableUpsert.setNull(16, Types.TINYINT);
            } else {
                tableUpsert.setByte(16, viewType.getSerializedValue());
            }
            if (indexType == null) {
                tableUpsert.setNull(17, Types.TINYINT);
            } else {
                tableUpsert.setByte(17, indexType.getSerializedValue());
            }
            tableUpsert.setBoolean(18, storeNulls);
            if (parent != null && tableType == PTableType.VIEW) {
                tableUpsert.setInt(19, parent.getColumns().size());
            } else {
                tableUpsert.setInt(19, BASE_TABLE_BASE_COLUMN_COUNT);
            }
            if (transactionProvider == null) {
                tableUpsert.setNull(20, Types.TINYINT);
            } else {
                tableUpsert.setByte(20, transactionProvider.getCode());
            }
            tableUpsert.setLong(21, updateCacheFrequency);
            tableUpsert.setBoolean(22, isNamespaceMapped);
            if (autoPartitionSeq == null) {
                tableUpsert.setNull(23, Types.VARCHAR);
            } else {
                tableUpsert.setString(23, autoPartitionSeq);
            }
            tableUpsert.setBoolean(24, isAppendOnlySchema);
            if (guidePostsWidth == null) {
                tableUpsert.setNull(25, Types.BIGINT);
            } else {
                tableUpsert.setLong(25, guidePostsWidth);
            }
            tableUpsert.setByte(26, immutableStorageScheme.getSerializedMetadataValue());
            tableUpsert.setByte(27, encodingScheme.getSerializedMetadataValue());
            if (useStatsForParallelizationProp == null) {
                tableUpsert.setNull(28, Types.BOOLEAN);
            } else {
                tableUpsert.setBoolean(28, useStatsForParallelizationProp);
            }
            if (indexType == IndexType.LOCAL ||
                    (parent != null && parent.getType() == PTableType.VIEW
                            && tableType == PTableType.INDEX)) {
                tableUpsert.setInt(29, viewIndexIdType.getSqlType());
            } else {
                tableUpsert.setNull(29, Types.NULL);
            }

            if (isChangeDetectionEnabledProp == null) {
                tableUpsert.setNull(30, Types.BOOLEAN);
            } else {
                tableUpsert.setBoolean(30, isChangeDetectionEnabledProp);
            }

            if (physicalTableName == null){
                tableUpsert.setNull(31, Types.VARCHAR);
            } else {
                tableUpsert.setString(31, physicalTableName);
            }

            if (schemaVersion == null) {
                tableUpsert.setNull(32, Types.VARCHAR);
            } else {
                tableUpsert.setString(32, schemaVersion);
            }

            if (streamingTopicName == null) {
                tableUpsert.setNull(33, Types.VARCHAR);
            } else {
                tableUpsert.setString(33, streamingTopicName);
            }

            if (tableType == INDEX && statement.getWhereClause() != null) {
                tableUpsert.setString(34, statement.getWhereClause().toString());
            } else {
                tableUpsert.setNull(34, Types.VARCHAR);
            }

            if (cdcIncludeScopesStr == null) {
                tableUpsert.setNull(35, Types.VARCHAR);
            } else {
                tableUpsert.setString(35, cdcIncludeScopesStr);
            }

            if (ttl == null || ttl.equals(TTL_EXPRESSION_NOT_DEFINED)) {
                tableUpsert.setNull(36, Types.VARCHAR);
            } else {
                tableUpsert.setString(36, ttl.getTTLExpression());
            }

            if ((rowKeyMatcher == null) ||
                    Bytes.compareTo(rowKeyMatcher, HConstants.EMPTY_BYTE_ARRAY) == 0) {
                tableUpsert.setNull(37, PDataType.VARBINARY_ENCODED_TYPE);
            } else {
                tableUpsert.setBytes(37, rowKeyMatcher);
            }

            tableUpsert.execute();

            if (asyncCreatedDate != null) {
                try (PreparedStatement setAsync = connection.prepareStatement(SET_ASYNC_CREATED_DATE)) {
                    setAsync.setString(1, tenantIdStr);
                    setAsync.setString(2, schemaName);
                    setAsync.setString(3, tableName);
                    setAsync.setDate(4, asyncCreatedDate);
                    setAsync.execute();
                }
            } else {
                Date syncCreatedDate = new Date(EnvironmentEdgeManager.currentTimeMillis());
                try (PreparedStatement setSync = connection.prepareStatement(SET_INDEX_SYNC_CREATED_DATE)) {
                    setSync.setString(1, tenantIdStr);
                    setSync.setString(2, schemaName);
                    setSync.setString(3, tableName);
                    setSync.setDate(4, syncCreatedDate);
                    setSync.execute();
                }
            }
            tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
            connection.rollback();

            /*
             * The table metadata must be in the following order:
             * 1) table header row
             * 2) ordered column rows
             * 3) parent table header row
             */
            Collections.reverse(tableMetaData);
            
			if (indexType != IndexType.LOCAL) {
                splits = SchemaUtil.processSplits(splits, pkColumns, saltBucketNum, connection.getQueryServices().getProps().getBoolean(
                        QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER));
            }

			// Modularized this code for unit testing
            PName parentName = physicalNames !=null && physicalNames.size() > 0 ? physicalNames.get(0) : null;
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("createTable tableName=" + tableName + " parent=" + (parent == null ? "" : parent.getTableName() + "-" + parent.getPhysicalName()) + " parent physical=" + parentName + "-" + (physicalNames.size() > 0 ? physicalNames.get(0) : "null") + " viewType " + viewType + allocateIndexId);
            }
            MetaDataMutationResult result = connection.getQueryServices().createTable(tableMetaData
                ,viewType == ViewType.MAPPED || allocateIndexId ? physicalNames.get(0).getBytes()
                : null, tableType, tableProps, familyPropList, splits, isNamespaceMapped,
                allocateIndexId, UpgradeUtil.isNoUpgradeSet(connection.getClientInfo()), parent);
            MutationCode code = result.getMutationCode();
            try {
                if (code != MutationCode.TABLE_NOT_FOUND) {
                    boolean tableAlreadyExists = handleCreateTableMutationCode(result, code, statement,
                            schemaName, tableName, parent);
                    if (tableAlreadyExists) {
                        return null;
                    }
                }
                // If the parent table of the view has the auto partition sequence name attribute,
                // set the view statement and relevant partition column attributes correctly
                if (parent != null && parent.getAutoPartitionSeqName() != null) {
                    final PColumn autoPartitionCol = parent.getPKColumns().get(MetaDataUtil
                            .getAutoPartitionColIndex(parent));
                    final Long autoPartitionNum = Long.valueOf(result.getAutoPartitionNum());
                    columns.put(autoPartitionCol, new DelegateColumn(autoPartitionCol) {
                        @Override
                        public byte[] getViewConstant() {
                            PDataType dataType = autoPartitionCol.getDataType();
                            Object val = dataType.toObject(autoPartitionNum, PLong.INSTANCE);
                            byte[] bytes = new byte[dataType.getByteSize() + 1];
                            dataType.toBytes(val, bytes, 0);
                            return bytes;
                        }

                        @Override
                        public boolean isViewReferenced() {
                            return true;
                        }
                    });
                    String viewPartitionClause = QueryUtil.getViewPartitionClause(MetaDataUtil
                            .getAutoPartitionColumnName(parent), autoPartitionNum);
                    if (viewStatement != null) {
                        viewStatement = viewStatement + " AND " + viewPartitionClause;
                    } else {
                        viewStatement = QueryUtil.getViewStatement(parent.getSchemaName().getString(),
                                parent.getTableName().getString(), viewPartitionClause);
                    }
                }
                PName newSchemaName = PNameFactory.newName(schemaName);
                /*
                 * It doesn't hurt for the PTable of views to have the cqCounter. However, views always
                 * rely on the parent table's counter to dole out encoded column qualifiers. So setting
                 * the counter as NULL_COUNTER for extra safety.
                 */
                EncodedCQCounter cqCounterToBe = tableType == PTableType.VIEW ? NULL_COUNTER : cqCounter;
                PTable table = new PTableImpl.Builder()
                        .setType(tableType)
                        .setState(indexState)
                        .setTimeStamp(timestamp != null ? timestamp : result.getMutationTime())
                        .setIndexDisableTimestamp(0L)
                        .setSequenceNumber(PTable.INITIAL_SEQ_NUM)
                        .setImmutableRows(isImmutableRows)
                        .setViewStatement(viewStatement)
                        .setDisableWAL(Boolean.TRUE.equals(disableWAL))
                        .setMultiTenant(multiTenant)
                        .setStoreNulls(storeNulls)
                        .setViewType(viewType)
                        .setViewIndexIdType(viewIndexIdType)
                        .setViewIndexId(result.getViewIndexId())
                        .setIndexType(indexType)
                        .setTransactionProvider(transactionProvider)
                        .setUpdateCacheFrequency(updateCacheFrequency)
                        .setNamespaceMapped(isNamespaceMapped)
                        .setAutoPartitionSeqName(autoPartitionSeq)
                        .setAppendOnlySchema(isAppendOnlySchema)
                        .setImmutableStorageScheme(immutableStorageScheme)
                        .setQualifierEncodingScheme(encodingScheme)
                        .setBaseColumnCount(baseTableColumnCount)
                        .setEncodedCQCounter(cqCounterToBe)
                        .setUseStatsForParallelization(useStatsForParallelizationProp)
                        .setExcludedColumns(ImmutableList.<PColumn>of())
                        .setTenantId(tenantId)
                        .setSchemaName(newSchemaName)
                        .setTableName(PNameFactory.newName(tableName))
                        .setPkName(pkName == null ? null : PNameFactory.newName(pkName))
                        .setDefaultFamilyName(defaultFamilyName == null ?
                                null : PNameFactory.newName(defaultFamilyName))
                        .setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
                        .setBucketNum(saltBucketNum)
                        .setIndexes(Collections.<PTable>emptyList())
                        .setParentSchemaName((parent == null) ? null : parent.getSchemaName())
                        .setParentTableName((parent == null) ? null : parent.getTableName())
                        .setPhysicalNames(ImmutableList.copyOf(physicalNames))
                        .setColumns(columns.values())
                        .setViewModifiedUpdateCacheFrequency(tableType == PTableType.VIEW &&
                                parent != null &&
                                parent.getUpdateCacheFrequency() != updateCacheFrequency)
                        .setViewModifiedUseStatsForParallelization(tableType == PTableType.VIEW &&
                                parent != null &&
                                parent.useStatsForParallelization()
                                        != useStatsForParallelizationProp)
                        .setLastDDLTimestamp(result.getTable() != null ?
                                result.getTable().getLastDDLTimestamp() : null)
                        .setIsChangeDetectionEnabled(isChangeDetectionEnabledProp)
                        .setSchemaVersion(schemaVersion)
                        .setExternalSchemaId(result.getTable() != null ?
                        result.getTable().getExternalSchemaId() : null)
                        .setStreamingTopicName(streamingTopicName)
                        .setIndexWhere(statement.getWhereClause() == null ? null
                                : statement.getWhereClause().toString())
                        .setCDCIncludeScopes(cdcIncludeScopes)
                        .setTTL(ttl == null || ttl.equals(TTL_EXPRESSION_NOT_DEFINED) ?
                                TTLExpressionFactory.create(ttlFromHierarchy) :
                                TTLExpressionFactory.create(ttl))
                        .setRowKeyMatcher(rowKeyMatcher)
                        .build();
                result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
                addTableToCache(result, false);
                return table;
            } catch (Throwable e) {
                TableMetricsManager.updateMetricsForSystemCatalogTableMethod(tableNameNode.toString(),
                        NUM_METADATA_LOOKUP_FAILURES, 1);
                throw e;
            }
        } finally {
            connection.setAutoCommit(wasAutoCommit);
            deleteMutexCells(parentPhysicalSchemaName, parentPhysicalTableName,
                    acquiredColumnMutexSet);
        }
    }