private MetaDataMutationResult mutateColumn()

in phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java [3453:3751]


    private MetaDataMutationResult mutateColumn(
            final List<Mutation> tableMetadata,
            final ColumnMutator mutator, final int clientVersion,
            final PTable parentTable, final PTable transformingNewTable, boolean isAddingOrDroppingColumns) throws IOException {
        byte[][] rowKeyMetaData = new byte[5][];
        MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
        byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
        byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
        byte[] tableOrViewName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
        byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableOrViewName);
        String fullTableName = SchemaUtil.getTableName(schemaName, tableOrViewName);
        // server-side, except for indexing, we always expect the keyvalues to be standard KeyValues
        PTableType expectedType = MetaDataUtil.getTableType(tableMetadata,
                GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
        List<byte[]> tableNamesToDelete = Lists.newArrayList();
        List<SharedTableState> sharedTablesToDelete = Lists.newArrayList();
        long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
        try {
            Region region = env.getRegion();
            MetaDataMutationResult result = checkTableKeyInRegion(key, region);
            if (result != null) {
                return result;
            }

            List<RowLock> locks = Lists.newArrayList();
            try {
                List<PTable> childViews = Lists.newArrayList();
                Optional<MetaDataMutationResult> mutationResult = validateIfMutationAllowedOnParent(
                        parentTable, tableMetadata,
                        expectedType, clientTimeStamp, tenantId, schemaName, tableOrViewName,
                        childViews, clientVersion);
                // only if mutation is allowed, we should get Optional.empty() here
                if (mutationResult.isPresent()) {
                    return mutationResult.get();
                }
                // We take a write row lock for tenantId, schemaName, tableOrViewName
                acquireLock(region, key, locks, false);
                // Invalidate the cache from all the regionservers.
                List<InvalidateServerMetadataCacheRequest> requests = new ArrayList<>();
                requests.add(new InvalidateServerMetadataCacheRequest(tenantId, schemaName,
                        tableOrViewName));
                invalidateServerMetadataCache(requests);
                ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
                List<ImmutableBytesPtr> invalidateList = new ArrayList<>();
                invalidateList.add(cacheKey);
                PTable table = getTableFromCache(cacheKey, clientTimeStamp, clientVersion);
                if (failConcurrentMutateAddColumnOneTimeForTesting) {
                    failConcurrentMutateAddColumnOneTimeForTesting = false;
                    return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
                            EnvironmentEdgeManager.currentTimeMillis(), table);
                }
                if (LOGGER.isDebugEnabled()) {
                    if (table == null) {
                        LOGGER.debug("Table " + Bytes.toStringBinary(key)
                                + " not found in cache. Will build through scan");
                    } else {
                        LOGGER.debug("Table " + Bytes.toStringBinary(key)
                                + " found in cache with timestamp " + table.getTimeStamp()
                                + " seqNum " + table.getSequenceNumber());
                    }
                }
                // Get client timeStamp from mutations
                if (table == null && (table = buildTable(key, cacheKey, region,
                        HConstants.LATEST_TIMESTAMP, clientVersion)) == null) {
                    // if not found then call newerTableExists and add delete marker for timestamp
                    // found
                    table = buildDeletedTable(key, cacheKey, region, clientTimeStamp);
                    if (table != null) {
                        LOGGER.info("Found newer table deleted as of " + table.getTimeStamp()
                                + " versus client timestamp of " + clientTimeStamp);
                        return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
                                EnvironmentEdgeManager.currentTimeMillis(), null);
                    }
                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
                            EnvironmentEdgeManager.currentTimeMillis(), null);
                }

                // if this is a view or view index then we need to include columns and
                // indexes derived from its ancestors
                if (parentTable != null) {
                    Properties props = new Properties();
                    if (tenantId != null) {
                        props.setProperty(TENANT_ID_ATTRIB, Bytes.toString(tenantId));
                    }
                    if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
                        props.setProperty("CurrentSCN", Long.toString(clientTimeStamp));
                    }
                    try (PhoenixConnection connection = getServerConnectionForMetaData(props,
                            env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                        table = ViewUtil.addDerivedColumnsAndIndexesFromParent(connection, table,
                                parentTable);
                    }
                }
                if (transformingNewTable !=null) {
                    table = PTableImpl.builderWithColumns(table, getColumnsToClone(table))
                            .setTransformingNewTable(transformingNewTable).build();
                }

                if (table.getTimeStamp() >= clientTimeStamp) {
                    LOGGER.info("Found newer table as of " + table.getTimeStamp()
                            + " versus client timestamp of " + clientTimeStamp);
                    return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
                            EnvironmentEdgeManager.currentTimeMillis(), table);
                } else if (isTableDeleted(table)) {
                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
                            EnvironmentEdgeManager.currentTimeMillis(), null);
                }
                // lookup TABLE_SEQ_NUM in tableMetaData
                long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1;

                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum "
                            + expectedSeqNum + " and found seqNum " + table.getSequenceNumber()
                            + " with " + table.getColumns().size() + " columns: "
                            + table.getColumns());
                }
                if (expectedSeqNum != table.getSequenceNumber()) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("For table " + Bytes.toStringBinary(key)
                                + " returning CONCURRENT_TABLE_MUTATION due to unexpected seqNum");
                    }
                    return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
                            EnvironmentEdgeManager.currentTimeMillis(), table);
                }

                PTableType type = table.getType();
                if (type == PTableType.INDEX) {
                    // Disallow mutation of an index table
                    return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
                            EnvironmentEdgeManager.currentTimeMillis(), null);
                } else {
                    // We said to drop a table, but found a view or visa versa
                    if (type != expectedType) {
                        return new MetaDataProtocol.MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
                                EnvironmentEdgeManager.currentTimeMillis(), null);
                    }
                }

                if (!childViews.isEmpty()) {
                    // validate the add or drop column mutations
                    result = mutator.validateWithChildViews(table, childViews, tableMetadata,
                            schemaName, tableOrViewName);
                    if (result != null) {
                        return result;
                    }
                }

                getCoprocessorHost().preAlterTable(Bytes.toString(tenantId),
                        SchemaUtil.getTableName(schemaName, tableOrViewName),
                        TableName.valueOf(table.getPhysicalName().getBytes()),
                        getParentPhysicalTableName(table), table.getType());

                result = mutator.validateAndAddMetadata(table, rowKeyMetaData, tableMetadata,
                        region, invalidateList, locks, clientTimeStamp, clientVersion,
                    ((ExtendedCellBuilder) env.getCellBuilder()), isAddingOrDroppingColumns);
                // if the update mutation caused tables to be deleted, the mutation code returned
                // will be MutationCode.TABLE_ALREADY_EXISTS
                if (result != null
                        && result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                    return result;
                }

                // drop any indexes on the base table that need the column that is going to be
                // dropped
                List<Pair<PTable, PColumn>> tableAndDroppedColumnPairs =
                        mutator.getTableAndDroppedColumnPairs();
                Iterator<Pair<PTable, PColumn>> iterator = tableAndDroppedColumnPairs.iterator();
                while (iterator.hasNext()) {
                    Pair<PTable, PColumn> pair = iterator.next();
                    // remove the current table and column being dropped from the list and drop any
                    // indexes that require the column being dropped while holding the row lock
                    if (table.equals(pair.getFirst())) {
                        iterator.remove();
                        result = dropIndexes(env, pair.getFirst(), invalidateList, locks,
                                clientTimeStamp, tableMetadata, pair.getSecond(),
                                tableNamesToDelete, sharedTablesToDelete, clientVersion);
                        if (result != null
                                && result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                            return result;
                        }
                    }
                }

                if (table.isChangeDetectionEnabled() || MetaDataUtil.getChangeDetectionEnabled(tableMetadata)) {
                    long startTime = EnvironmentEdgeManager.currentTimeMillis();
                    try {
                        exportSchema(tableMetadata, key, clientTimeStamp, clientVersion, table);
                        metricsSource.incrementAlterExportCount();
                        metricsSource.updateAlterExportTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
                    } catch (Exception e) {
                        LOGGER.error("Error writing to schema registry", e);
                        metricsSource.incrementAlterExportFailureCount();
                        metricsSource.updateAlterExportFailureTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
                        result = new MetaDataMutationResult(MutationCode.ERROR_WRITING_TO_SCHEMA_REGISTRY,
                            EnvironmentEdgeManager.currentTimeMillis(), table);
                        return result;
                    }
                }
                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                        GlobalCache.getInstance(this.env).getMetaDataCache();

                // The mutations to add a column are written in the following order:
                // 1. Update the encoded column qualifier for the parent table if its on a
                // different region server (for tables that use column qualifier encoding)
                // if the next step fails we end up wasting a few col qualifiers
                // 2. Write the mutations to add the column

                List<Mutation> localMutations =
                        Lists.newArrayListWithExpectedSize(tableMetadata.size());
                List<Mutation> remoteMutations = Lists.newArrayList();
                separateLocalAndRemoteMutations(region, tableMetadata, localMutations,
                        remoteMutations);
                if (!remoteMutations.isEmpty()) {
                    // there should only be remote mutations if we are adding a column to a view
                    // that uses encoded column qualifiers (the remote mutations are to update the
                    // encoded column qualifier counter on the parent table)
                    if (( mutator.getMutateColumnType() == ColumnMutator.MutateColumnType.ADD_COLUMN
                            && type == PTableType.VIEW
                            && table.getEncodingScheme() !=
                            QualifierEncodingScheme.NON_ENCODED_QUALIFIERS)) {
                        processRemoteRegionMutations(
                                SYSTEM_CATALOG_NAME_BYTES, remoteMutations,
                                MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
                        //if we're a view or index, clear the cache for our parent
                        if ((type == PTableType.VIEW || type == INDEX) && table.getParentTableName() != null) {
                            clearRemoteTableFromCache(clientTimeStamp,
                                table.getParentSchemaName() != null
                                    ? table.getParentSchemaName().getBytes()
                                    : ByteUtil.EMPTY_BYTE_ARRAY,
                                table.getParentTableName().getBytes());
                        }
                    } else {
                        String msg = "Found unexpected mutations while adding or dropping column "
                                + "to " + fullTableName;
                        LOGGER.error(msg);
                        for (Mutation m : remoteMutations) {
                            LOGGER.debug("Mutation rowkey : " + Bytes.toStringBinary(m.getRow()));
                            LOGGER.debug("Mutation family cell map : " + m.getFamilyCellMap());
                        }
                        throw new IllegalStateException(msg);
                    }
                }
                // Update SYSTEM.CATALOG indexes only for ordinary table column mutations.
                // Column mutations of indexes are not allowed. See above
                // Add column on SYSTEM.CATALOG should not be processed for index updates,
                // since an index on a future column cannot exist.
                boolean
                        updateCatalogIndexes =
                        !Bytes.toString(schemaName)
                                .equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);

                mutateRowsWithLocks(this.accessCheckEnabled, env, region, localMutations,
                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE,
                        updateCatalogIndexes);
                // Invalidate from cache
                for (ImmutableBytesPtr invalidateKey : invalidateList) {
                    metaDataCache.invalidate(invalidateKey);
                }
                // Get client timeStamp from mutations, since it may get updated by the
                // mutateRowsWithLocks call
                long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
                // if the update mutation caused tables to be deleted just return the result which
                // will contain the table to be deleted
                if (result != null
                        && result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                    return result;
                } else {
                    table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP,
                            clientVersion);
                    if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG && type == PTableType.VIEW) {
                        try (PhoenixConnection connection = getServerConnectionForMetaData(
                                env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                            PTable pTable = connection.getTableNoCache(
                                    table.getParentName().getString());
                            table = ViewUtil.addDerivedColumnsAndIndexesFromParent(connection,
                                    table, pTable);
                        }
                    }
                    return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
                            currentTime, table, tableNamesToDelete, sharedTablesToDelete);
                }
            } finally {
                ServerUtil.releaseRowLocks(locks);
                // drop indexes on views that require the column being dropped. These could be on a
                // different region server so don't hold row locks while dropping them
                for (Pair<PTable, PColumn> pair : mutator.getTableAndDroppedColumnPairs()) {
                    result = dropRemoteIndexes(env, pair.getFirst(), clientTimeStamp,
                            pair.getSecond(), tableNamesToDelete, sharedTablesToDelete);
                    if (result != null
                            && result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                        return result;
                    }
                }
            }
        } catch (Throwable t) {
            ClientUtil.throwIOException(fullTableName, t);
            return null; // impossible
        }
    }