public void createTable()

in phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java [2292:2786]


    public void createTable(RpcController controller, CreateTableRequest request,
                            RpcCallback<MetaDataResponse> done) {
        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
        byte[][] rowKeyMetaData = new byte[3][];
        byte[] schemaName = null;
        byte[] tableName = null;
        String fullTableName = null;
        try {
            int clientVersion = request.getClientVersion();
            List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
            MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
            byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
            schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
            tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
            fullTableName = SchemaUtil.getTableName(schemaName, tableName);
            boolean isNamespaceMapped = MetaDataUtil.isNameSpaceMapped(tableMetadata, GenericKeyValueBuilder.INSTANCE,
                    new ImmutableBytesWritable());
            final IndexType indexType = MetaDataUtil.getIndexType(tableMetadata, GenericKeyValueBuilder.INSTANCE,
                    new ImmutableBytesWritable());
            byte[] parentSchemaName = null;
            byte[] parentTableName = null;
            PTable parentTable = request.hasParentTable() ? PTableImpl.createFromProto(request.getParentTable()) : null;
            PTableType tableType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());

            // Load table to see if it already exists
            byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
            ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
            boolean isChangeDetectionEnabled = MetaDataUtil.getChangeDetectionEnabled(tableMetadata);

            PTable table = null;
            // Get as of latest timestamp so we can detect if we have a newer table that already
            // exists without making an additional query
            table = loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP,
                    clientVersion);
            if (table != null) {
                if (table.getTimeStamp() < clientTimeStamp) {
                    // If the table is older than the client time stamp and it's deleted,
                    // continue
                    if (!isTableDeleted(table)) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        builder.setTable(PTableImpl.toProto(table, clientVersion));
                        done.run(builder.build());
                        return;
                    }
                } else {
                    builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_TABLE_FOUND);
                    builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                    builder.setTable(PTableImpl.toProto(table, clientVersion));
                    done.run(builder.build());
                    return;
                }
            }

            // check if the table was previously dropped, but had child views that have not
            // yet been cleaned up.
            // Note that for old clients connecting to a 4.15 server whose metadata hasn't been
            // upgraded, we disallow dropping a base table that has child views, so in that case
            // this is a no-op (See PHOENIX-5544)
            if (!Bytes.toString(schemaName).equals(QueryConstants.SYSTEM_SCHEMA_NAME)) {
                ServerViewUtil.dropChildViews(env, tenantIdBytes, schemaName, tableName,
                        getSystemTableForChildLinks(clientVersion, env.getConfiguration())
                                .getName());
            }

            byte[] parentTableKey = null;
            Set<TableName> indexes = new HashSet<TableName>();
            ;
            byte[] cPhysicalName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMapped)
                    .getBytes();
            byte[] cParentPhysicalName = null;
            if (tableType == PTableType.VIEW) {
                byte[][] parentSchemaTableNames = new byte[3][];
                byte[][] parentPhysicalSchemaTableNames = new byte[3][];
                getParentAndPhysicalNames(tableMetadata, parentSchemaTableNames, parentPhysicalSchemaTableNames);
                if (parentPhysicalSchemaTableNames[2] != null) {
                    if (parentTable == null) {
                        // This is needed when we connect with a 4.14 client to
                        // a 4.15.0+ server.
                        // In that case we need to resolve the parent table on
                        // the server.
                        parentTable = doGetTable(ByteUtil.EMPTY_BYTE_ARRAY,
                                parentPhysicalSchemaTableNames[1],
                                parentPhysicalSchemaTableNames[2], clientTimeStamp, clientVersion);
                        if (parentTable == null) {
                            builder.setReturnCode(
                                    MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                            done.run(builder.build());
                            return;
                        }
                        if (parentSchemaTableNames[2] != null
                                && Bytes.compareTo(parentSchemaTableNames[2],
                                        parentPhysicalSchemaTableNames[2]) != 0) {
                            // if view is created on view
                            byte[] tenantId = parentSchemaTableNames[0] == null
                                    ? ByteUtil.EMPTY_BYTE_ARRAY
                                    : parentSchemaTableNames[0];
                            parentTable = doGetTable(tenantId, parentSchemaTableNames[1],
                                    parentSchemaTableNames[2], clientTimeStamp, clientVersion);
                            if (parentTable == null) {
                                // it could be a global view
                                parentTable = doGetTable(ByteUtil.EMPTY_BYTE_ARRAY,
                                        parentSchemaTableNames[1], parentSchemaTableNames[2],
                                        clientTimeStamp, clientVersion);
                            }
                        }
                        if (parentTable == null) {
                            builder.setReturnCode(
                                    MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                            done.run(builder.build());
                            return;
                        }
                    }
                    parentTableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
                            parentPhysicalSchemaTableNames[1], parentPhysicalSchemaTableNames[2]);
                    cParentPhysicalName = parentTable.getPhysicalName().getBytes();
                    for (PTable index : parentTable.getIndexes()) {
                        indexes.add(TableName.valueOf(index.getPhysicalName().getBytes()));
                    }
                } else {
                    // Mapped View
                    cParentPhysicalName = SchemaUtil.getPhysicalHBaseTableName(
                            schemaName, tableName, isNamespaceMapped).getBytes();

                }
                parentSchemaName = parentPhysicalSchemaTableNames[1];
                parentTableName = parentPhysicalSchemaTableNames[2];

            } else if (tableType == PTableType.INDEX) {
                parentSchemaName = schemaName;
                /*
                 * For an index we lock the parent table's row which could be a physical table or a view.
                 * If the parent table is a physical table, then the tenantIdBytes is empty because
                 * we allow creating an index with a tenant connection only if the parent table is a view.
                 */
                parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
                parentTableKey = SchemaUtil.getTableKey(tenantIdBytes, parentSchemaName, parentTableName);
                if (parentTable == null) {
                    // This is needed when we connect with a 4.14 client to a 4.15.0+ server.
                    // In that case we need to resolve the parent table on the server.
                    parentTable =
                            doGetTable(tenantIdBytes, parentSchemaName, parentTableName, clientTimeStamp, null,
                                    request.getClientVersion());
                }
                if (IndexType.LOCAL == indexType) {
                    cPhysicalName = parentTable.getPhysicalName().getBytes();
                    cParentPhysicalName = parentTable.getPhysicalName().getBytes();
                } else if (parentTable.getType() == PTableType.VIEW) {
                    // The view index physical table name is constructed from logical name of base table.
                    // For example, _IDX_SC.TBL1 is the view index name and SC.TBL1 is the logical name of the base table.
                    String namepaceMappedParentLogicalName = MetaDataUtil.getNamespaceMappedName(parentTable.getBaseTableLogicalName(), isNamespaceMapped);
                    cPhysicalName = MetaDataUtil.getViewIndexPhysicalName(namepaceMappedParentLogicalName.getBytes(StandardCharsets.UTF_8));
                    cParentPhysicalName = parentTable.getPhysicalName().getBytes();
                } else {
                    cParentPhysicalName = SchemaUtil
                            .getPhysicalHBaseTableName(parentSchemaName, parentTableName, isNamespaceMapped).getBytes();
                }
            }

            getCoprocessorHost().preCreateTable(Bytes.toString(tenantIdBytes),
                    fullTableName,
                    (tableType == PTableType.VIEW) ? null : TableName.valueOf(cPhysicalName),
                    cParentPhysicalName == null ? null : TableName.valueOf(cParentPhysicalName), tableType,
                    /* TODO: During inital create we may not need the family map */
                    Collections.<byte[]>emptySet(), indexes);

            Region region = env.getRegion();
            List<RowLock> locks = Lists.newArrayList();
            // Place a lock using key for the table to be created
            try {
                acquireLock(region, tableKey, locks, false);

                // If the table key resides outside the region, return without doing anything
                MetaDataMutationResult result = checkTableKeyInRegion(tableKey, region);
                if (result != null) {
                    done.run(MetaDataMutationResult.toProto(result));
                    return;
                }

                if (parentTableName != null) {
                    // From 4.15 onwards we only need to lock the parent table :
                    // 1) when creating an index on a table or a view
                    // 2) if allowSplittableSystemCatalogRollback is true we try to lock the parent table to prevent it
                    // from changing concurrently while a view is being created
                    if (tableType == PTableType.INDEX || allowSplittableSystemCatalogRollback) {
                        result = checkTableKeyInRegion(parentTableKey, region);
                        if (result != null) {
                            LOGGER.error("Unable to lock parentTableKey " + Bytes.toStringBinary(parentTableKey));
                            // if allowSplittableSystemCatalogRollback is true and we can't lock the parentTableKey (because
                            // SYSTEM.CATALOG already split) return UNALLOWED_TABLE_MUTATION so that the client
                            // knows the create statement failed
                            MetaDataProtos.MutationCode code = tableType == PTableType.INDEX ?
                                    MetaDataProtos.MutationCode.TABLE_NOT_IN_REGION :
                                    MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION;
                            builder.setReturnCode(code);
                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                            done.run(builder.build());
                            return;
                        }
                        acquireLock(region, parentTableKey, locks, false);
                    }
                    // make sure we haven't gone over our threshold for indexes on this table.
                    if (execeededIndexQuota(tableType, parentTable)) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        done.run(builder.build());
                        return;
                    }
                }

                // Add cell for ROW_KEY_ORDER_OPTIMIZABLE = true, as we know that new tables
                // conform the correct row key. The exception is for a VIEW, which the client
                // sends over depending on its base physical table.
                if (tableType != PTableType.VIEW) {
                    UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, tableKey, clientTimeStamp);
                }
                // If the parent table of the view has the auto partition sequence name attribute, modify the
                // tableMetadata and set the view statement and partition column correctly
                if (parentTable != null && parentTable.getAutoPartitionSeqName() != null) {
                    long autoPartitionNum = 1;
                    try (PhoenixConnection connection = getServerConnectionForMetaData(
                            env.getConfiguration()).unwrap(PhoenixConnection.class);
                         Statement stmt = connection.createStatement()) {
                        String seqName = parentTable.getAutoPartitionSeqName();
                        // Not going through the standard route of using statement.execute() as that code path
                        // is blocked if the metadata hasn't been been upgraded to the new minor release.
                        String seqNextValueSql = String.format("SELECT NEXT VALUE FOR %s", seqName);
                        PhoenixStatement ps = stmt.unwrap(PhoenixStatement.class);
                        QueryPlan plan = ps.compileQuery(seqNextValueSql);
                        ResultIterator resultIterator = plan.iterator();
                        PhoenixResultSet rs = ps.newResultSet(resultIterator, plan.getProjector(), plan.getContext());
                        rs.next();
                        autoPartitionNum = rs.getLong(1);
                    } catch (SequenceNotFoundException e) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.AUTO_PARTITION_SEQUENCE_NOT_FOUND);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        done.run(builder.build());
                        return;
                    }
                    PColumn autoPartitionCol = parentTable.getPKColumns().get(MetaDataUtil.getAutoPartitionColIndex(parentTable));
                    if (!PLong.INSTANCE.isCoercibleTo(autoPartitionCol.getDataType(), autoPartitionNum)) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.CANNOT_COERCE_AUTO_PARTITION_ID);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        done.run(builder.build());
                        return;
                    }
                    builder.setAutoPartitionNum(autoPartitionNum);

                    // set the VIEW STATEMENT column of the header row
                    Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
                    NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
                    List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
                    Cell cell = cells.get(0);
                    String autoPartitionWhere = QueryUtil.getViewPartitionClause(MetaDataUtil.getAutoPartitionColumnName(parentTable), autoPartitionNum);
                    String hbaseVersion = VersionInfo.getVersion();
                    ImmutableBytesPtr ptr = new ImmutableBytesPtr();
                    KeyValueBuilder kvBuilder = KeyValueBuilder.get(hbaseVersion);
                    MetaDataUtil.getMutationValue(tableHeaderPut, VIEW_STATEMENT_BYTES, kvBuilder, ptr);
                    byte[] value = ptr.copyBytesIfNecessary();
                    byte[] viewStatement = null;
                    // if we have an existing where clause add the auto partition where clause to it
                    if (!Bytes.equals(value, QueryConstants.EMPTY_COLUMN_VALUE_BYTES)) {
                        viewStatement = Bytes.add(value, Bytes.toBytes(" AND "), Bytes.toBytes(autoPartitionWhere));
                    } else {
                        viewStatement = Bytes.toBytes(QueryUtil.getViewStatement(parentTable.getSchemaName().getString(), parentTable.getTableName().getString(), autoPartitionWhere));
                    }
                    Cell viewStatementCell =
                            PhoenixKeyValueUtil.newKeyValue(cell.getRowArray(),
                                cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(),
                                cell.getFamilyOffset(), cell.getFamilyLength(),
                                VIEW_STATEMENT_BYTES, 0, VIEW_STATEMENT_BYTES.length,
                                cell.getTimestamp(), viewStatement, 0, viewStatement.length,
                                cell.getType());
                    cells.add(viewStatementCell);

                    // set the IS_VIEW_REFERENCED column of the auto partition column row
                    Put autoPartitionPut = MetaDataUtil.getPutOnlyAutoPartitionColumn(parentTable, tableMetadata);
                    familyCellMap = autoPartitionPut.getFamilyCellMap();
                    cells = familyCellMap.get(TABLE_FAMILY_BYTES);
                    cell = cells.get(0);
                    PDataType dataType = autoPartitionCol.getDataType();
                    Object val = dataType.toObject(autoPartitionNum, PLong.INSTANCE);
                    byte[] bytes = new byte[dataType.getByteSize() + 1];
                    dataType.toBytes(val, bytes, 0);
                    Cell viewConstantCell =
                            PhoenixKeyValueUtil.newKeyValue(cell.getRowArray(),
                                cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(),
                                cell.getFamilyOffset(), cell.getFamilyLength(),
                                VIEW_CONSTANT_BYTES, 0, VIEW_CONSTANT_BYTES.length,
                                cell.getTimestamp(), bytes, 0, bytes.length, cell.getType());
                    cells.add(viewConstantCell);
                }
                Long indexId = null;
                if (request.hasAllocateIndexId() && request.getAllocateIndexId()) {
                    String tenantIdStr = tenantIdBytes.length == 0 ? null : Bytes.toString(tenantIdBytes);
                    try (PhoenixConnection connection = getServerConnectionForMetaData(
                            env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                        PName physicalName = parentTable.getPhysicalName();
                        long seqValue = getViewIndexSequenceValue(connection, tenantIdStr, parentTable);
                        Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
                        NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
                        List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
                        Cell cell = cells.get(0);
                        PDataType<?> dataType = MetaDataUtil.getIndexDataType(tableMetadata,
                                GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
                        Object val = dataType.toObject(seqValue, PLong.INSTANCE);
                        byte[] bytes = new byte[dataType.getByteSize() + 1];
                        dataType.toBytes(val, bytes, 0);
                        Cell indexIdCell =
                                PhoenixKeyValueUtil.newKeyValue(cell.getRowArray(),
                                    cell.getRowOffset(), cell.getRowLength(),
                                    cell.getFamilyArray(), cell.getFamilyOffset(),
                                    cell.getFamilyLength(), VIEW_INDEX_ID_BYTES, 0,
                                    VIEW_INDEX_ID_BYTES.length, cell.getTimestamp(), bytes, 0,
                                    bytes.length, cell.getType());
                        cells.add(indexIdCell);
                        indexId = seqValue;
                    }
                }

                // The mutations to create a table are written in the following order:
                // 1. Write the child link as if the next two steps fail we
                // ignore missing children while processing a parent
                // (this is already done at this point, as a separate client-server RPC
                // to the ChildLinkMetaDataEndpoint coprocessor)
                // 2. Update the encoded column qualifier for the parent table if its on a
                // different region server (for tables that use column qualifier encoding)
                // if the next step fails we end up wasting a few col qualifiers
                // 3. Finally write the mutations to create the table

                if (tableType == PTableType.VIEW) {
                    // If we are connecting with an old client to a server that has new metadata
                    // i.e. it was previously connected to by a 4.15 client, then the client will
                    // also send the parent->child link metadata to SYSTEM.CATALOG rather than using
                    // the new ChildLinkMetaDataEndpoint coprocessor. In this case, we must continue
                    // doing the server-server RPC to send these mutations to SYSTEM.CHILD_LINK.
                    if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG &&
                            getSystemTableForChildLinks(clientVersion, env.getConfiguration()).equals(
                                    SchemaUtil.getPhysicalTableName(SYSTEM_CHILD_LINK_NAME_BYTES,
                                            env.getConfiguration()))) {
                        List<Mutation> childLinkMutations =
                                MetaDataUtil.removeChildLinkMutations(tableMetadata);
                        MetaDataResponse response =
                                processRemoteRegionMutations(
                                        PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES,
                                        childLinkMutations, UNABLE_TO_CREATE_CHILD_LINK);
                        if (response != null) {
                            done.run(response);
                            return;
                        }
                    }
                    // Pass in the parent's PTable so that we only tag cells corresponding to the
                    // view's property in case they are different from the parent
                    ViewUtil.addTagsToPutsForViewAlteredProperties(tableMetadata, parentTable,
                            (ExtendedCellBuilder)env.getCellBuilder());
                }
                //set the last DDL timestamp to the current server time since we're creating the
                // table/index/views.
                tableMetadata.add(MetaDataUtil.getLastDDLTimestampUpdate(tableKey,
                    clientTimeStamp, EnvironmentEdgeManager.currentTimeMillis()));
                if (tableType == INDEX) {
                    // Invalidate the cache on each regionserver for parent table/view.
                    List<InvalidateServerMetadataCacheRequest> requests = new ArrayList<>();
                    requests.add(new InvalidateServerMetadataCacheRequest(tenantIdBytes,
                            parentSchemaName, parentTableName));
                    invalidateServerMetadataCache(requests);
                    long currentTimestamp = EnvironmentEdgeManager.currentTimeMillis();
                    // If table type is index, then update the last ddl timestamp of the parent
                    // table or immediate parent view.
                    tableMetadata.add(MetaDataUtil.getLastDDLTimestampUpdate(parentTableKey,
                            currentTimestamp, currentTimestamp));
                }

                //and if we're doing change detection on this table or view, notify the
                //external schema registry and get its schema id
                if (isChangeDetectionEnabled) {
                    long startTime = EnvironmentEdgeManager.currentTimeMillis();
                    try {
                        exportSchema(tableMetadata, tableKey, clientTimeStamp, clientVersion, null);
                        metricsSource.incrementCreateExportCount();
                        metricsSource.updateCreateExportTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
                    } catch (IOException ie){
                        metricsSource.incrementCreateExportFailureCount();
                        metricsSource.updateCreateExportFailureTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
                        //If we fail to write to the schema registry, fail the entire
                        //CREATE TABLE or VIEW operation so we stay consistent
                        LOGGER.error("Error writing schema to external schema registry", ie);
                        builder.setReturnCode(
                            MetaDataProtos.MutationCode.ERROR_WRITING_TO_SCHEMA_REGISTRY);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        done.run(builder.build());
                        return;
                    }
                }

                // When we drop a view we first drop the view metadata and then drop the parent->child linking row
                List<Mutation> localMutations =
                        Lists.newArrayListWithExpectedSize(tableMetadata.size());
                List<Mutation> remoteMutations = Lists.newArrayListWithExpectedSize(2);
                // check to see if there are any mutations that should not be applied to this region
                separateLocalAndRemoteMutations(region, tableMetadata, localMutations, remoteMutations);
                if (!remoteMutations.isEmpty()) {
                    // there should only be remote mutations if we are creating a view that uses
                    // encoded column qualifiers (the remote mutations are to update the encoded
                    // column qualifier counter on the parent table)
                    if (parentTable != null && tableType == PTableType.VIEW && parentTable
                            .getEncodingScheme() != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS) {
                        // TODO: Avoid doing server-server RPC when we have held row locks
                        MetaDataResponse response =
                                processRemoteRegionMutations(
                                        SYSTEM_CATALOG_NAME_BYTES,
                                        remoteMutations, MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
                        clearRemoteTableFromCache(clientTimeStamp,
                                parentTable.getSchemaName() != null
                                        ? parentTable.getSchemaName().getBytes()
                                        : ByteUtil.EMPTY_BYTE_ARRAY,
                                parentTable.getTableName().getBytes());
                        if (response != null) {
                            done.run(response);
                            return;
                        }
                    } else {
                        String msg = "Found unexpected mutations while creating " + fullTableName;
                        LOGGER.error(msg);
                        for (Mutation m : remoteMutations) {
                            LOGGER.debug("Mutation rowkey : " + Bytes.toStringBinary(m.getRow()));
                            LOGGER.debug("Mutation family cell map : " + m.getFamilyCellMap());
                        }
                        throw new IllegalStateException(msg);
                    }
                }


                // Not sure whether this TODO is relevant anymore. PHOENIX-7107 introduces indexes
                // on system table.
                // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the
                // system table. Basically, we get all the locks that we don't already hold for all the
                // tableMetadata rows. This ensures we don't have deadlock situations (ensuring
                // primary and then index table locks are held, in that order). For now, we just don't support
                // indexing on the system table. This is an issue because of the way we manage batch mutation
                // in the Indexer.

                // Update SYSTEM.CATALOG indexes only for
                // 1. ordinary table/index mutations (create table/index).
                // 2. When creating system indexes itself, no further index processing is required.
                boolean updateCatalogIndexes = !SchemaUtil.isSystemTable(Bytes.toBytes(fullTableName));
                mutateRowsWithLocks(this.accessCheckEnabled, env, region, localMutations,
                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE,
                        updateCatalogIndexes);

                // Invalidate the cache - the next getTable call will add it
                // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                if (parentTableKey != null) {
                    metaDataCache.invalidate(new ImmutableBytesPtr(parentTableKey));
                }
                metaDataCache.invalidate(cacheKey);
                // Get timeStamp from mutations - the above method sets it if it's unset
                long currentTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
                builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
                if (indexId != null) {
                    builder.setViewIndexId(indexId);
                    builder.setViewIndexIdType(PLong.INSTANCE.getSqlType());
                }
                builder.setMutationTime(currentTimeStamp);
                //send the newly built table back because we generated the DDL timestamp server
                // side and the client doesn't have it.
                if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
                    // if a client uses a connection with currentSCN=t to create the table,
                    // the table is created with timestamp 't' but the timestamp range in the scan
                    // used by buildTable does not include 't' due to how SCN is implemented.
                    clientTimeStamp += 1;
                }
                PTable newTable = buildTable(tableKey, cacheKey, region,
                    clientTimeStamp, clientVersion);
                if (newTable != null) {
                    builder.setTable(PTableImpl.toProto(newTable, clientVersion));
                }

                done.run(builder.build());

                updateCreateTableDdlSuccessMetrics(tableType);
                LOGGER.info("{} created successfully, tableName: {}", tableType, fullTableName);
            } finally {
                ServerUtil.releaseRowLocks(locks);
            }
        } catch (Throwable t) {
            LOGGER.error("createTable failed", t);
            ProtobufUtil.setControllerException(controller,
                    ClientUtil.createIOException(fullTableName, t));
        }
    }