in phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java [2460:3827]
private PTable createTableInternal(
CreateTableStatement statement,
byte[][] splits,
final PTable parent,
String viewStatement,
ViewType viewType,
PDataType viewIndexIdType,
final byte[] rowKeyMatcher,
final byte[][] viewColumnConstants,
final BitSet isViewColumnReferenced,
boolean allocateIndexId,
IndexType indexType,
Date asyncCreatedDate,
Set<PTable.CDCChangeScope> cdcIncludeScopes,
Map<String, Object> tableProps,
Map<String, Object> commonFamilyProps) throws SQLException {
final PTableType tableType = statement.getTableType();
boolean wasAutoCommit = connection.getAutoCommit();
TableName tableNameNode = null;
boolean allowSystemCatalogRollback =
connection.getQueryServices().getProps().getBoolean(
QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK,
QueryServicesOptions.DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK);
Set<String> acquiredColumnMutexSet = Sets.newHashSetWithExpectedSize(3);
String parentPhysicalName =
(parent!=null && parent.getPhysicalName()!=null) ? parent.getPhysicalName().getString() : null;
String parentPhysicalSchemaName = parentPhysicalName!=null ?
SchemaUtil.getSchemaNameFromFullName(parentPhysicalName) : null;
String parentPhysicalTableName = parentPhysicalName!=null ?
SchemaUtil.getTableNameFromFullName(parentPhysicalName) : null;
connection.rollback();
try {
connection.setAutoCommit(false);
List<Mutation> tableMetaData = Lists.newArrayListWithExpectedSize(statement.getColumnDefs().size() + 3);
tableNameNode = statement.getTableName();
final String schemaName = connection.getSchema() != null && tableNameNode.getSchemaName() == null ? connection.getSchema() : tableNameNode.getSchemaName();
final String tableName = tableNameNode.getTableName();
String parentTableName = null;
PName tenantId = connection.getTenantId();
String tenantIdStr = tenantId == null ? null : tenantId.getString();
Long scn = connection.getSCN();
long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
boolean multiTenant = false;
boolean storeNulls = false;
TransactionFactory.Provider transactionProvider = (parent!= null) ? parent.getTransactionProvider() : null;
Integer saltBucketNum = null;
String defaultFamilyName = null;
boolean isImmutableRows = false;
boolean isAppendOnlySchema = false;
List<PName> physicalNames = Collections.emptyList();
boolean addSaltColumn = false;
boolean rowKeyOrderOptimizable = true;
Long timestamp = null;
boolean isNamespaceMapped = parent == null
? SchemaUtil.isNamespaceMappingEnabled(tableType, connection.getQueryServices().getProps())
: parent.isNamespaceMapped();
boolean isLocalIndex = indexType == IndexType.LOCAL;
QualifierEncodingScheme encodingScheme = NON_ENCODED_QUALIFIERS;
ImmutableStorageScheme immutableStorageScheme = ONE_CELL_PER_COLUMN;
int baseTableColumnCount =
tableType == PTableType.VIEW ? parent.getColumns().size()
: QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
TTLExpression ttl = TTL_EXPRESSION_NOT_DEFINED;
TTLExpression ttlFromHierarchy = TTL_EXPRESSION_NOT_DEFINED;
TTLExpression ttlProp = (TTLExpression) TableProperty.TTL.getValue(tableProps);
// Validate TTL prop value if set
if (ttlProp != null) {
if (!isViewTTLEnabled() && tableType == VIEW) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.
VIEW_TTL_NOT_ENABLED)
.setSchemaName(schemaName)
.setTableName(tableName)
.build()
.buildException();
}
if (tableType != TABLE && (tableType != VIEW || viewType != UPDATABLE)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.
TTL_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY)
.setSchemaName(schemaName)
.setTableName(tableName)
.build()
.buildException();
}
ttlFromHierarchy = checkAndGetTTLFromHierarchy(parent, tableName);
if (!ttlFromHierarchy.equals(TTL_EXPRESSION_NOT_DEFINED)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.
TTL_ALREADY_DEFINED_IN_HIERARCHY)
.setSchemaName(schemaName)
.setTableName(tableName)
.build()
.buildException();
}
try {
ttlProp.validateTTLOnCreate(connection, statement, parent, tableProps);
} catch (IllegalArgumentException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.ILLEGAL_DATA)
.setMessage(e.getMessage())
.setSchemaName(schemaName)
.setTableName(tableName)
.build()
.buildException();
}
ttl = ttlProp;
} else {
ttlFromHierarchy = checkAndGetTTLFromHierarchy(parent, tableName);
if (!ttlFromHierarchy.equals(TTL_EXPRESSION_NOT_DEFINED)) {
ttlFromHierarchy.validateTTLOnCreate(connection,
statement,
parent,
tableProps);
}
}
Boolean isChangeDetectionEnabledProp =
(Boolean) TableProperty.CHANGE_DETECTION_ENABLED.getValue(tableProps);
verifyChangeDetectionTableType(tableType, isChangeDetectionEnabledProp);
String schemaVersion = (String) TableProperty.SCHEMA_VERSION.getValue(tableProps);
String streamingTopicName = (String) TableProperty.STREAMING_TOPIC_NAME.getValue(tableProps);
String cdcIncludeScopesStr = cdcIncludeScopes == null ? null :
CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes);
if (parent != null && tableType == PTableType.INDEX) {
timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider);
isImmutableRows = parent.isImmutableRows();
isAppendOnlySchema = parent.isAppendOnlySchema();
// Index on view
// TODO: Can we support a multi-tenant index directly on a multi-tenant
// table instead of only a view? We don't have anywhere to put the link
// from the table to the index, though.
if (isLocalIndex || (parent.getType() == PTableType.VIEW && parent.getViewType() != ViewType.MAPPED)) {
PName physicalName = parent.getPhysicalName();
saltBucketNum = parent.getBucketNum();
addSaltColumn = (saltBucketNum != null && !isLocalIndex);
defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
if (isLocalIndex) {
defaultFamilyName =
parent.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY
: IndexUtil.getLocalIndexColumnFamily(parent.getDefaultFamilyName().getString());
saltBucketNum = null;
// Set physical name of local index table
physicalNames = Collections.singletonList(PNameFactory.newName(physicalName.getBytes()));
} else {
defaultFamilyName = parent.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY : parent.getDefaultFamilyName().getString();
// Set physical name of view index table
// Parent is a view and this is an index so we need to get _IDX_+logical name of base table.
// parent.getPhysicalName is Schema.Physical of base and we can't use it since the _IDX_ table is logical name of the base.
// parent.getName is the view name. parent.getBaseTableLogicalName is the logical name of the base table
PName parentName = parent.getBaseTableLogicalName();
physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(parentName, isNamespaceMapped)));
}
}
multiTenant = parent.isMultiTenant();
storeNulls = parent.getStoreNulls();
parentTableName = parent.getTableName().getString();
// Pass through data table sequence number so we can check it hasn't changed
try (PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM)) {
incrementStatement.setString(1, tenantIdStr);
incrementStatement.setString(2, schemaName);
incrementStatement.setString(3, parentTableName);
incrementStatement.setLong(4, parent.getSequenceNumber());
incrementStatement.execute();
// Get list of mutations and add to table meta data that will be passed to server
// to guarantee order. This row will always end up last
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
connection.rollback();
}
// Add row linking from data table row to index table row
try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK)) {
linkStatement.setString(1, tenantIdStr);
linkStatement.setString(2, schemaName);
linkStatement.setString(3, parentTableName);
linkStatement.setString(4, tableName);
linkStatement.setByte(5, LinkType.INDEX_TABLE.getSerializedValue());
linkStatement.setLong(6, parent.getSequenceNumber());
linkStatement.setString(7, PTableType.INDEX.getSerializedValue());
linkStatement.execute();
}
// Add row linking index table to parent table for indexes on views
if (parent.getType() == PTableType.VIEW) {
try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_VIEW_INDEX_PARENT_LINK)) {
linkStatement.setString(1, tenantIdStr);
linkStatement.setString(2, schemaName);
linkStatement.setString(3, tableName);
linkStatement.setString(4, parent.getName().getString());
linkStatement.setByte(5, LinkType.VIEW_INDEX_PARENT_TABLE.getSerializedValue());
linkStatement.execute();
}
}
}
PrimaryKeyConstraint pkConstraint = statement.getPrimaryKeyConstraint();
String pkName = null;
List<Pair<ColumnName,SortOrder>> pkColumnsNames = Collections.<Pair<ColumnName,SortOrder>>emptyList();
Iterator<Pair<ColumnName,SortOrder>> pkColumnsIterator = Collections.emptyIterator();
if (pkConstraint != null) {
pkColumnsNames = pkConstraint.getColumnNames();
pkColumnsIterator = pkColumnsNames.iterator();
pkName = pkConstraint.getName();
}
// Although unusual, it's possible to set a mapped VIEW as having immutable rows.
// This tells Phoenix that you're managing the index maintenance yourself.
if (tableType != PTableType.INDEX && (tableType != PTableType.VIEW || viewType == ViewType.MAPPED)) {
// TODO remove TableProperty.IMMUTABLE_ROWS at the next major release
Boolean isImmutableRowsProp = statement.immutableRows()!=null? statement.immutableRows() :
(Boolean) TableProperty.IMMUTABLE_ROWS.getValue(tableProps);
if (isImmutableRowsProp == null) {
isImmutableRows = connection.getQueryServices().getProps().getBoolean(QueryServices.IMMUTABLE_ROWS_ATTRIB, QueryServicesOptions.DEFAULT_IMMUTABLE_ROWS);
} else {
isImmutableRows = isImmutableRowsProp;
}
}
if (tableType == PTableType.TABLE) {
Boolean isAppendOnlySchemaProp = (Boolean) TableProperty.APPEND_ONLY_SCHEMA.getValue(tableProps);
isAppendOnlySchema = isAppendOnlySchemaProp!=null ? isAppendOnlySchemaProp : false;
}
// Can't set any of these on views or shared indexes on views
if (tableType != PTableType.VIEW && tableType != PTableType.CDC && !allocateIndexId) {
saltBucketNum = (Integer) TableProperty.SALT_BUCKETS.getValue(tableProps);
if (saltBucketNum != null) {
if (saltBucketNum < 0 || saltBucketNum > SaltingUtil.MAX_BUCKET_NUM) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_BUCKET_NUM).build().buildException();
}
}
// Salt the index table if the data table is salted
if (saltBucketNum == null) {
if (parent != null) {
saltBucketNum = parent.getBucketNum();
}
} else if (saltBucketNum.intValue() == 0) {
saltBucketNum = null; // Provides a way for an index to not be salted if its data table is salted
}
addSaltColumn = (saltBucketNum != null);
}
// Can't set MULTI_TENANT or DEFAULT_COLUMN_FAMILY_NAME on an INDEX or a non mapped VIEW
if (tableType != PTableType.INDEX && (tableType != PTableType.VIEW || viewType == ViewType.MAPPED)) {
Boolean multiTenantProp = (Boolean) tableProps.get(PhoenixDatabaseMetaData.MULTI_TENANT);
multiTenant = Boolean.TRUE.equals(multiTenantProp);
defaultFamilyName = (String)TableProperty.DEFAULT_COLUMN_FAMILY.getValue(tableProps);
}
boolean disableWAL = false;
Boolean disableWALProp = (Boolean) TableProperty.DISABLE_WAL.getValue(tableProps);
if (disableWALProp != null) {
disableWAL = disableWALProp;
}
long updateCacheFrequency = (Long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue(
connection.getQueryServices().getProps().get(
QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB));
if (tableType == PTableType.INDEX && parent != null) {
updateCacheFrequency = parent.getUpdateCacheFrequency();
}
Long updateCacheFrequencyProp = (Long) TableProperty.UPDATE_CACHE_FREQUENCY.getValue(tableProps);
if (tableType != PTableType.INDEX && updateCacheFrequencyProp != null) {
updateCacheFrequency = updateCacheFrequencyProp;
}
String physicalTableName = (String) TableProperty.PHYSICAL_TABLE_NAME.getValue(tableProps);
String autoPartitionSeq = (String) TableProperty.AUTO_PARTITION_SEQ.getValue(tableProps);
Long guidePostsWidth = (Long) TableProperty.GUIDE_POSTS_WIDTH.getValue(tableProps);
// We only allow setting guide post width for a base table
if (guidePostsWidth != null && tableType != PTableType.TABLE) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_GUIDE_POST_WIDTH)
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
}
Boolean storeNullsProp = (Boolean) TableProperty.STORE_NULLS.getValue(tableProps);
if (storeNullsProp == null) {
if (parent == null) {
storeNulls = connection.getQueryServices().getProps().getBoolean(
QueryServices.DEFAULT_STORE_NULLS_ATTRIB,
QueryServicesOptions.DEFAULT_STORE_NULLS);
tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.valueOf(storeNulls));
}
} else {
storeNulls = storeNullsProp;
}
Boolean transactionalProp = (Boolean) TableProperty.TRANSACTIONAL.getValue(tableProps);
TransactionFactory.Provider transactionProviderProp = (TransactionFactory.Provider) TableProperty.TRANSACTION_PROVIDER.getValue(tableProps);
if ((transactionalProp != null || transactionProviderProp != null) && parent != null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL)
.setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
}
if (parent == null) {
boolean transactional;
if (transactionProviderProp != null) {
transactional = true;
} else if (transactionalProp == null) {
transactional = connection.getQueryServices().getProps().getBoolean(
QueryServices.DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB,
QueryServicesOptions.DEFAULT_TABLE_ISTRANSACTIONAL);
} else {
transactional = transactionalProp;
}
if (transactional) {
if (transactionProviderProp == null) {
transactionProvider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(
connection.getQueryServices().getProps().get(
QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB,
QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER));
} else {
transactionProvider = transactionProviderProp;
}
}
}
boolean transactionsEnabled = connection.getQueryServices().getProps().getBoolean(
QueryServices.TRANSACTIONS_ENABLED,
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
// can't create a transactional table if transactions are not enabled
if (!transactionsEnabled && transactionProvider != null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_IF_TXNS_DISABLED)
.setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
}
// can't create a transactional table if it has a row timestamp column
if (pkConstraint.getNumColumnsWithRowTimestamp() > 0 && transactionProvider != null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP)
.setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
}
if ((isPhoenixTTLEnabled() ? ttlProp != null
: TableProperty.TTL.getValue(commonFamilyProps) != null)
&& transactionProvider != null
&& transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL)) {
throw new SQLExceptionInfo.Builder(PhoenixTransactionProvider.Feature.SET_TTL.getCode())
.setMessage(transactionProvider.name())
.setSchemaName(schemaName)
.setTableName(tableName)
.build()
.buildException();
}
// Put potentially inferred value into tableProps as it's used by the createTable call below
// to determine which coprocessors to install on the new table.
tableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, transactionProvider);
if (transactionProvider != null) {
// If TTL set, use transaction context TTL property name instead
// Note: After PHOENIX-6627, is PhoenixTransactionContext.PROPERTY_TTL still useful?
Object transactionTTL = commonFamilyProps.remove(ColumnFamilyDescriptorBuilder.TTL);
if (transactionTTL != null) {
commonFamilyProps.put(PhoenixTransactionContext.PROPERTY_TTL, transactionTTL);
}
}
Boolean useStatsForParallelizationProp =
(Boolean) TableProperty.USE_STATS_FOR_PARALLELIZATION.getValue(tableProps);
boolean sharedTable = statement.getTableType() == PTableType.VIEW || allocateIndexId;
if (transactionProvider != null) {
// We turn on storeNulls for transactional tables for compatibility. This was required
// when Tephra was a supported txn engine option. After PHOENIX-6627, this may no longer
// be necessary.
// Tephra would have converted normal delete markers on the server which could mess up
// our secondary index code as the changes get committed prior to the
// maintenance code being able to see the prior state to update the rows correctly.
// A future tnx engine might do the same?
if (Boolean.FALSE.equals(storeNullsProp)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL)
.setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
}
storeNulls = true;
tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.TRUE);
if (!sharedTable) {
Integer maxVersionsProp = (Integer) commonFamilyProps.get(HConstants.VERSIONS);
if (maxVersionsProp == null) {
if (parent != null) {
TableDescriptor desc = connection.getQueryServices().getTableDescriptor(parent.getPhysicalName().getBytes());
if (desc != null) {
maxVersionsProp = desc.getColumnFamily(SchemaUtil.getEmptyColumnFamily(parent)).getMaxVersions();
}
}
if (maxVersionsProp == null) {
maxVersionsProp = connection.getQueryServices().getProps().getInt(
QueryServices.MAX_VERSIONS_TRANSACTIONAL_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL);
}
commonFamilyProps.put(HConstants.VERSIONS, maxVersionsProp);
}
}
}
timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider) : timestamp;
// Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views
if (sharedTable) {
if (tableProps.get(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME) != null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE)
.setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
}
if (SchemaUtil.hasHTableDescriptorProps(tableProps)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build()
.buildException();
}
}
List<ColumnDef> colDefs = statement.getColumnDefs();
LinkedHashMap<PColumn,PColumn> columns;
LinkedHashSet<PColumn> pkColumns;
if (tenantId != null && !sharedTable) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TENANT_SPECIFIC_TABLE)
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
}
if (autoPartitionSeq!=null) {
int autoPartitionColIndex = multiTenant ? 1 : 0;
PDataType dataType = colDefs.get(autoPartitionColIndex).getDataType();
if (!PLong.INSTANCE.isCastableTo(dataType)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.SEQUENCE_NOT_CASTABLE_TO_AUTO_PARTITION_ID_COLUMN)
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
}
}
if (tableType == PTableType.VIEW) {
physicalNames = Collections.singletonList(PNameFactory.newName(parent.getPhysicalName().getString()));
if (viewType == ViewType.MAPPED) {
columns = Maps.newLinkedHashMap();
pkColumns = newLinkedHashSetWithExpectedSize(colDefs.size());
} else {
// Propagate property values to VIEW.
// TODO: formalize the known set of these properties
// Manually transfer the ROW_KEY_ORDER_OPTIMIZABLE_BYTES from parent as we don't
// want to add this hacky flag to the schema (see PHOENIX-2067).
rowKeyOrderOptimizable = parent.rowKeyOrderOptimizable();
if (rowKeyOrderOptimizable) {
UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetaData, SchemaUtil.getTableKey(tenantIdStr, schemaName, tableName), clientTimeStamp);
}
multiTenant = parent.isMultiTenant();
saltBucketNum = parent.getBucketNum();
isAppendOnlySchema = parent.isAppendOnlySchema();
isImmutableRows = parent.isImmutableRows();
if (updateCacheFrequencyProp == null) {
// set to the parent value if the property is not set on the view
updateCacheFrequency = parent.getUpdateCacheFrequency();
}
disableWAL = (disableWALProp == null ? parent.isWALDisabled() : disableWALProp);
defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
// TODO PHOENIX-4766 Add an options to stop sending parent metadata when creating views
List<PColumn> allColumns = parent.getColumns();
if (saltBucketNum != null) { // Don't include salt column in columns, as it should not have it when created
allColumns = allColumns.subList(1, allColumns.size());
}
columns = new LinkedHashMap<PColumn,PColumn>(allColumns.size() + colDefs.size());
for (PColumn column : allColumns) {
columns.put(column, column);
}
pkColumns = newLinkedHashSet(parent.getPKColumns());
// Add row linking view to its parent
try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_VIEW_LINK)) {
linkStatement.setString(1, tenantIdStr);
linkStatement.setString(2, schemaName);
linkStatement.setString(3, tableName);
linkStatement.setString(4, parent.getName().getString());
linkStatement.setByte(5, LinkType.PARENT_TABLE.getSerializedValue());
linkStatement.setString(6, parent.getTenantId() == null ? null : parent.getTenantId().getString());
linkStatement.execute();
}
// Add row linking parent to view
// TODO From 4.16 write the child links to SYSTEM.CHILD_LINK directly
try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_CHILD_LINK)) {
linkStatement.setString(1, parent.getTenantId() == null ? null : parent.getTenantId().getString());
linkStatement.setString(2, parent.getSchemaName() == null ? null : parent.getSchemaName().getString());
linkStatement.setString(3, parent.getTableName().getString());
linkStatement.setString(4, tenantIdStr);
linkStatement.setString(5, SchemaUtil.getTableName(schemaName, tableName));
linkStatement.setByte(6, LinkType.CHILD_TABLE.getSerializedValue());
linkStatement.execute();
}
}
} else {
columns = new LinkedHashMap<PColumn,PColumn>(colDefs.size());
pkColumns = newLinkedHashSetWithExpectedSize(colDefs.size() + 1); // in case salted
}
if (tableType == PTableType.CDC) {
if (parent.getType() == VIEW) {
physicalNames = Collections.singletonList(
PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(
parent.getBaseTableLogicalName(), isNamespaceMapped)));
}
else {
physicalNames = Collections.singletonList(
PNameFactory.newName(SchemaUtil.getTableName(schemaName,
CDCUtil.getCDCIndexName(tableName))));
}
}
// Don't add link for mapped view, as it just points back to itself and causes the drop to
// fail because it looks like there's always a view associated with it.
if (!physicalNames.isEmpty()) {
// Upsert physical name for mapped view only if the full physical table name is different than the full table name
// Otherwise, we end up with a self-referencing link and then cannot ever drop the view.
if (viewType != ViewType.MAPPED
|| (!physicalNames.get(0).getString().equals(SchemaUtil.getTableName(schemaName, tableName))
&& !physicalNames.get(0).getString().equals(SchemaUtil.getPhysicalHBaseTableName(
schemaName, tableName, isNamespaceMapped).getString()))) {
// Add row linking from data table row to physical table row
try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK)) {
for (PName physicalName : physicalNames) {
linkStatement.setString(1, tenantIdStr);
linkStatement.setString(2, schemaName);
linkStatement.setString(3, tableName);
linkStatement.setString(4, physicalName.getString());
linkStatement.setByte(5, LinkType.PHYSICAL_TABLE.getSerializedValue());
if (tableType == PTableType.VIEW) {
if (parent.getType() == PTableType.TABLE) {
linkStatement.setString(4, SchemaUtil.getTableName(parent.getSchemaName().getString(), parent.getTableName().getString()));
linkStatement.setLong(6, parent.getSequenceNumber());
} else { //This is a grandchild view, find the physical base table
PTable logicalTable = connection.getTable(new PTableKey(null, SchemaUtil.replaceNamespaceSeparator(physicalName)));
linkStatement.setString(4, SchemaUtil.getTableName(logicalTable.getSchemaName().getString(), logicalTable.getTableName().getString()));
linkStatement.setLong(6, logicalTable.getSequenceNumber());
}
// Set link to logical name
linkStatement.setString(7, null);
} else {
linkStatement.setLong(6, parent.getSequenceNumber());
linkStatement.setString(7, PTableType.INDEX.getSerializedValue());
}
linkStatement.execute();
}
}
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
connection.rollback();
}
}
Map<String, PName> familyNames = Maps.newLinkedHashMap();
boolean rowTimeStampColumnAlreadyFound = false;
int positionOffset = columns.size();
if (saltBucketNum != null) {
positionOffset++;
if (addSaltColumn) {
pkColumns.add(SaltingUtil.SALTING_COLUMN);
}
}
int pkPositionOffset = pkColumns.size();
int position = positionOffset;
EncodedCQCounter cqCounter = NULL_COUNTER;
Map<String, Integer> changedCqCounters = new HashMap<>(colDefs.size());
// Check for duplicate column qualifiers
Map<String, Set<Integer>> inputCqCounters = new HashMap<>();
PTable viewPhysicalTable = null;
if (tableType == PTableType.VIEW) {
/*
* We can't control what column qualifiers are used in HTable mapped to Phoenix views. So we are not
* able to encode column names.
*/
if (viewType != MAPPED) {
/*
* For regular phoenix views, use the storage scheme of the physical table since they all share the
* the same HTable. Views always use the base table's column qualifier counter for doling out
* encoded column qualifier.
*/
viewPhysicalTable = connection.getTable(physicalNames.get(0).getString());
immutableStorageScheme = viewPhysicalTable.getImmutableStorageScheme();
encodingScheme = viewPhysicalTable.getEncodingScheme();
if (EncodedColumnsUtil.usesEncodedColumnNames(viewPhysicalTable)) {
cqCounter = viewPhysicalTable.getEncodedCQCounter();
}
}
}
// System tables have hard-coded column qualifiers. So we can't use column encoding for them.
else if (!SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))|| SchemaUtil.isLogTable(schemaName, tableName)) {
/*
* Indexes inherit the storage scheme of the parent data tables. Otherwise, we always attempt to
* create tables with encoded column names.
*
* Also of note is the case with shared indexes i.e. local indexes and view indexes. In these cases,
* column qualifiers for covered columns don't have to be unique because rows of the logical indexes are
* partitioned by the virtue of indexId present in the row key. As such, different shared indexes can use
* potentially overlapping column qualifiers.
*
*/
if (parent != null) {
Byte encodingSchemeSerializedByte = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
// Table has encoding scheme defined
if (encodingSchemeSerializedByte != null) {
encodingScheme = getEncodingScheme(tableProps, schemaName, tableName, transactionProvider);
} else {
encodingScheme = parent.getEncodingScheme();
}
ImmutableStorageScheme immutableStorageSchemeProp = (ImmutableStorageScheme) TableProperty.IMMUTABLE_STORAGE_SCHEME.getValue(tableProps);
if (immutableStorageSchemeProp == null) {
immutableStorageScheme = parent.getImmutableStorageScheme();
} else {
checkImmutableStorageSchemeForIndex(immutableStorageSchemeProp, schemaName, tableName, transactionProvider);
immutableStorageScheme = immutableStorageSchemeProp;
}
if (immutableStorageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
if (encodingScheme == NON_ENCODED_QUALIFIERS) {
if (encodingSchemeSerializedByte != null) {
// encoding scheme is set as non-encoded on purpose, so we should fail
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES)
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
} else {
// encoding scheme is inherited from parent but it is not compatible with Single Cell.
encodingScheme =
QualifierEncodingScheme.fromSerializedValue(
(byte) QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES);
}
}
}
if (tableType != CDC &&
parent.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS &&
immutableStorageScheme == ONE_CELL_PER_COLUMN) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE)
.setSchemaName(schemaName).setTableName(tableName).build()
.buildException();
}
LOGGER.info(String.format("STORAGE--ENCODING: %s--%s", immutableStorageScheme, encodingScheme));
} else {
encodingScheme = getEncodingScheme(tableProps, schemaName, tableName, transactionProvider);
ImmutableStorageScheme immutableStorageSchemeProp =
(ImmutableStorageScheme) TableProperty.IMMUTABLE_STORAGE_SCHEME
.getValue(tableProps);
if (immutableStorageSchemeProp == null) {
// Ignore default if transactional and column encoding is not supported
if (transactionProvider == null ||
!transactionProvider.getTransactionProvider().isUnsupported(
PhoenixTransactionProvider.Feature.COLUMN_ENCODING)) {
if (multiTenant) {
immutableStorageScheme =
ImmutableStorageScheme
.valueOf(connection
.getQueryServices()
.getProps()
.get(
QueryServices.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
QueryServicesOptions.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME));
} else {
if (isImmutableRows) {
immutableStorageScheme =
ImmutableStorageScheme
.valueOf(connection
.getQueryServices()
.getProps()
.get(
QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB,
QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME));
} else {
immutableStorageScheme = ONE_CELL_PER_COLUMN;
}
}
}
} else {
immutableStorageScheme = isImmutableRows ? immutableStorageSchemeProp : ONE_CELL_PER_COLUMN;
checkImmutableStorageSchemeForIndex(immutableStorageScheme, schemaName, tableName, transactionProvider);
}
if (immutableStorageScheme != ONE_CELL_PER_COLUMN
&& encodingScheme == NON_ENCODED_QUALIFIERS) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES)
.setSchemaName(schemaName).setTableName(tableName).build()
.buildException();
}
}
cqCounter = encodingScheme != NON_ENCODED_QUALIFIERS ? new EncodedCQCounter() : NULL_COUNTER;
if (encodingScheme != NON_ENCODED_QUALIFIERS && statement.getFamilyCQCounters() != null)
{
for (Map.Entry<String, Integer> cq : statement.getFamilyCQCounters().entrySet()) {
if (cq.getValue() < QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_CQ)
.setSchemaName(schemaName)
.setTableName(tableName).build().buildException();
}
cqCounter.setValue(cq.getKey(), cq.getValue());
changedCqCounters.put(cq.getKey(), cqCounter.getNextQualifier(cq.getKey()));
inputCqCounters.putIfAbsent(cq.getKey(), new HashSet<Integer>());
}
}
}
boolean wasPKDefined = false;
// Keep track of all columns that are newly added to a view
Set<Integer> viewNewColumnPositions =
Sets.newHashSetWithExpectedSize(colDefs.size());
Set<String> pkColumnNames = new HashSet<>();
for (PColumn pColumn : pkColumns) {
pkColumnNames.add(pColumn.getName().toString());
}
for (ColumnDef colDef : colDefs) {
rowTimeStampColumnAlreadyFound = checkAndValidateRowTimestampCol(colDef, pkConstraint, rowTimeStampColumnAlreadyFound, tableType);
if (colDef.isPK()) { // i.e. the column is declared as CREATE TABLE COLNAME DATATYPE PRIMARY KEY...
if (wasPKDefined) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_ALREADY_EXISTS)
.setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
}
wasPKDefined = true;
} else {
// do not allow setting NOT-NULL constraint on non-primary columns.
if ( !colDef.isNull() && !isImmutableRows &&
( wasPKDefined || !SchemaUtil.isPKColumn(pkConstraint, colDef))) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.KEY_VALUE_NOT_NULL)
.setSchemaName(schemaName)
.setTableName(tableName)
.setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
}
}
ColumnName columnDefName = colDef.getColumnDefName();
String colDefFamily = columnDefName.getFamilyName();
boolean isPkColumn = SchemaUtil.isPKColumn(pkConstraint, colDef);
String cqCounterFamily = null;
if (!isPkColumn) {
if (immutableStorageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS && encodingScheme != NON_ENCODED_QUALIFIERS) {
// For this scheme we track column qualifier counters at the column family level.
cqCounterFamily = colDefFamily != null ? colDefFamily : (defaultFamilyName != null ? defaultFamilyName : DEFAULT_COLUMN_FAMILY);
} else {
// For other schemes, column qualifier counters are tracked using the default column family.
cqCounterFamily = defaultFamilyName != null ? defaultFamilyName : DEFAULT_COLUMN_FAMILY;
}
}
// Use position as column qualifier if APPEND_ONLY_SCHEMA to prevent gaps in
// the column encoding (PHOENIX-4737).
Integer encodedCQ = null;
if (!isPkColumn) {
if (colDef.getEncodedQualifier() != null && encodingScheme != NON_ENCODED_QUALIFIERS) {
if (cqCounter.getNextQualifier(cqCounterFamily) > ENCODED_CQ_COUNTER_INITIAL_VALUE &&
!inputCqCounters.containsKey(cqCounterFamily)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.MISSING_CQ)
.setSchemaName(schemaName)
.setTableName(tableName).build().buildException();
}
if (statement.getFamilyCQCounters() == null ||
statement.getFamilyCQCounters().get(cqCounterFamily) == null) {
if (colDef.getEncodedQualifier() >= cqCounter.getNextQualifier(cqCounterFamily)) {
cqCounter.setValue(cqCounterFamily, colDef.getEncodedQualifier());
cqCounter.increment(cqCounterFamily);
}
changedCqCounters.put(cqCounterFamily, cqCounter.getNextQualifier(cqCounterFamily));
}
encodedCQ = colDef.getEncodedQualifier();
if (encodedCQ < QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE ||
encodedCQ >= cqCounter.getNextQualifier(cqCounterFamily)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_CQ)
.setSchemaName(schemaName)
.setTableName(tableName).build().buildException();
}
inputCqCounters.putIfAbsent(cqCounterFamily, new HashSet<Integer>());
Set<Integer> familyCounters = inputCqCounters.get(cqCounterFamily);
if (!familyCounters.add(encodedCQ)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.DUPLICATE_CQ)
.setSchemaName(schemaName)
.setTableName(tableName).build().buildException();
}
} else {
if (inputCqCounters.containsKey(cqCounterFamily)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.MISSING_CQ)
.setSchemaName(schemaName)
.setTableName(tableName).build().buildException();
}
if (isAppendOnlySchema) {
encodedCQ = Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + position);
} else {
encodedCQ = cqCounter.getNextQualifier(cqCounterFamily);
}
}
}
byte[] columnQualifierBytes = null;
try {
columnQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(columnDefName.getColumnName(), encodedCQ, encodingScheme, isPkColumn);
}
catch (QualifierOutOfRangeException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
.setSchemaName(schemaName)
.setTableName(tableName).build().buildException();
}
PColumn column = newColumn(position++, colDef, pkConstraint, defaultFamilyName, false, columnQualifierBytes, isImmutableRows);
if (!isAppendOnlySchema && colDef.getEncodedQualifier() == null
&& cqCounter.increment(cqCounterFamily)) {
changedCqCounters.put(cqCounterFamily, cqCounter.getNextQualifier(cqCounterFamily));
}
if (SchemaUtil.isPKColumn(column)) {
// TODO: remove this constraint?
if (pkColumnsIterator.hasNext() && !column.getName().getString().equals(pkColumnsIterator.next().getFirst().getColumnName())) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_OUT_OF_ORDER)
.setSchemaName(schemaName)
.setTableName(tableName)
.setColumnName(column.getName().getString())
.build().buildException();
}
if (tableType == PTableType.VIEW && viewType != ViewType.MAPPED) {
throwIfLastPKOfParentIsVariableLength(parent, schemaName, tableName, colDef);
}
if (!pkColumns.add(column)) {
throw new ColumnAlreadyExistsException(schemaName, tableName, column.getName().getString());
}
}
// check for duplicate column
if (isDuplicateColumn(columns, pkColumnNames, column)) {
throw new ColumnAlreadyExistsException(schemaName, tableName,
column.getName().getString());
} else if (tableType == VIEW) {
viewNewColumnPositions.add(column.getPosition());
}
if (isPkColumn) {
pkColumnNames.add(column.getName().toString());
}
if ((colDef.getDataType() == PVarbinary.INSTANCE || colDef.getDataType().isArrayType())
&& SchemaUtil.isPKColumn(column)
&& pkColumnsIterator.hasNext()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.VARBINARY_IN_ROW_KEY)
.setSchemaName(schemaName)
.setTableName(tableName)
.setColumnName(column.getName().getString())
.build().buildException();
}
if (column.getFamilyName() != null) {
familyNames.put(
IndexUtil.getActualColumnFamilyName(column.getFamilyName().getString()),
column.getFamilyName());
}
}
// We need a PK definition for a TABLE or mapped VIEW
if (!wasPKDefined && pkColumnsNames.isEmpty() && tableType != PTableType.VIEW && viewType != ViewType.MAPPED) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_MISSING)
.setSchemaName(schemaName)
.setTableName(tableName)
.build().buildException();
}
if (!pkColumnsNames.isEmpty() && pkColumnsNames.size() != pkColumns.size() - pkPositionOffset) { // Then a column name in the primary key constraint wasn't resolved
Iterator<Pair<ColumnName,SortOrder>> pkColumnNamesIterator = pkColumnsNames.iterator();
while (pkColumnNamesIterator.hasNext()) {
ColumnName colName = pkColumnNamesIterator.next().getFirst();
ColumnDef colDef = findColumnDefOrNull(colDefs, colName);
if (colDef == null) {
throw new ColumnNotFoundException(schemaName, tableName, null, colName.getColumnName());
}
if (colDef.getColumnDefName().getFamilyName() != null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_WITH_FAMILY_NAME)
.setSchemaName(schemaName)
.setTableName(tableName)
.setColumnName(colDef.getColumnDefName().getColumnName() )
.setFamilyName(colDef.getColumnDefName().getFamilyName())
.build().buildException();
}
}
// The above should actually find the specific one, but just in case...
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_PRIMARY_KEY_CONSTRAINT)
.setSchemaName(schemaName)
.setTableName(tableName)
.build().buildException();
}
if (!statement.getProps().isEmpty()) {
for (String familyName : statement.getProps().keySet()) {
if (!familyName.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) {
if (familyNames.get(familyName) == null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.PROPERTIES_FOR_FAMILY)
.setFamilyName(familyName).build().buildException();
} else if (statement.getTableType() == PTableType.VIEW) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WITH_PROPERTIES).build().buildException();
}
}
}
}
throwIfInsufficientColumns(schemaName, tableName, pkColumns, saltBucketNum!=null, multiTenant);
List<Pair<byte[],Map<String,Object>>> familyPropList = Lists.newArrayListWithExpectedSize(familyNames.size());
populateFamilyPropsList(familyNames, commonFamilyProps, statement, defaultFamilyName, isLocalIndex, familyPropList);
// Bootstrapping for our SYSTEM.TABLE that creates itself before it exists
if (SchemaUtil.isMetaTable(schemaName,tableName)) {
// TODO: what about stats for system catalog?
PName newSchemaName = PNameFactory.newName(schemaName);
// Column names and qualifiers and hardcoded for system tables.
PTable table = new PTableImpl.Builder()
.setType(tableType)
.setTimeStamp(MetaDataProtocol.MIN_TABLE_TIMESTAMP)
.setIndexDisableTimestamp(0L)
.setSequenceNumber(PTable.INITIAL_SEQ_NUM)
.setImmutableRows(isImmutableRows)
.setDisableWAL(Boolean.TRUE.equals(disableWAL))
.setMultiTenant(false)
.setStoreNulls(false)
.setViewIndexIdType(viewIndexIdType)
.setIndexType(indexType)
.setUpdateCacheFrequency(0)
.setNamespaceMapped(isNamespaceMapped)
.setAutoPartitionSeqName(autoPartitionSeq)
.setAppendOnlySchema(isAppendOnlySchema)
.setImmutableStorageScheme(ONE_CELL_PER_COLUMN)
.setQualifierEncodingScheme(NON_ENCODED_QUALIFIERS)
.setBaseColumnCount(QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT)
.setEncodedCQCounter(PTable.EncodedCQCounter.NULL_COUNTER)
.setUseStatsForParallelization(true)
.setExcludedColumns(ImmutableList.<PColumn>of())
.setTenantId(tenantId)
.setSchemaName(newSchemaName)
.setTableName(PNameFactory.newName(tableName))
.setPkName(PNameFactory.newName(QueryConstants.SYSTEM_TABLE_PK_NAME))
.setDefaultFamilyName(defaultFamilyName == null ? null :
PNameFactory.newName(defaultFamilyName))
.setRowKeyOrderOptimizable(true)
.setIndexes(Collections.<PTable>emptyList())
.setPhysicalNames(ImmutableList.<PName>of())
.setColumns(columns.values())
.setLastDDLTimestamp(0L)
.setIndexWhere(statement.getWhereClause() == null ? null
: statement.getWhereClause().toString())
.setRowKeyMatcher(rowKeyMatcher)
.setTTL(TTL_EXPRESSION_NOT_DEFINED)
.build();
connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
}
// Update column qualifier counters
if (EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme)) {
// Store the encoded column counter for phoenix entities that have their own hbase
// tables i.e. base tables and indexes.
String schemaNameToUse = tableType == VIEW ? viewPhysicalTable.getSchemaName().getString() : schemaName;
String tableNameToUse = tableType == VIEW ? viewPhysicalTable.getTableName().getString() : tableName;
boolean sharedIndex = tableType == PTableType.INDEX && (indexType == IndexType.LOCAL || parent.getType() == PTableType.VIEW);
// For local indexes and indexes on views, pass on the the tenant id since all their meta-data rows have
// tenant ids in there.
String tenantIdToUse = connection.getTenantId() != null && sharedIndex ? connection.getTenantId().getString() : null;
// When a view adds its own columns, then we need to increase the sequence number of the base table
// too since we want clients to get the latest PTable of the base table.
for (Entry<String, Integer> entry : changedCqCounters.entrySet()) {
try (PreparedStatement linkStatement = connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNTER)) {
linkStatement.setString(1, tenantIdToUse);
linkStatement.setString(2, schemaNameToUse);
linkStatement.setString(3, tableNameToUse);
linkStatement.setString(4, entry.getKey());
linkStatement.setInt(5, entry.getValue());
linkStatement.execute();
}
}
if (tableType == VIEW && !changedCqCounters.isEmpty()) {
try (PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM)) {
incrementStatement.setString(1, null);
incrementStatement.setString(2, viewPhysicalTable.getSchemaName().getString());
incrementStatement.setString(3, viewPhysicalTable.getTableName().getString());
incrementStatement.setLong(4, viewPhysicalTable.getSequenceNumber() + 1);
incrementStatement.execute();
}
}
if (connection.getMutationState().toMutations(timestamp).hasNext()) {
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
connection.rollback();
}
}
short nextKeySeq = 0;
List<Mutation> columnMetadata = Lists.newArrayListWithExpectedSize(columns.size());
boolean isRegularView = (tableType == PTableType.VIEW && viewType!=ViewType.MAPPED);
for (Map.Entry<PColumn, PColumn> entry : columns.entrySet()) {
PColumn column = entry.getValue();
final int columnPosition = column.getPosition();
// For client-side cache, we need to update the column
// set the autoPartition column attributes
if (parent != null && parent.getAutoPartitionSeqName() != null
&& parent.getPKColumns().get(MetaDataUtil.getAutoPartitionColIndex(parent)).equals(column)) {
entry.setValue(column = new DelegateColumn(column) {
@Override
public byte[] getViewConstant() {
// set to non-null value so that we will generate a Put that
// will be set correctly on the server
return QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
}
@Override
public boolean isViewReferenced() {
return true;
}
});
} else if (isViewColumnReferenced != null) {
if (viewColumnConstants != null && columnPosition < viewColumnConstants.length) {
entry.setValue(column = new DelegateColumn(column) {
@Override
public byte[] getViewConstant() {
return viewColumnConstants[columnPosition];
}
@Override
public boolean isViewReferenced() {
return isViewColumnReferenced.get(columnPosition);
}
});
} else {
entry.setValue(column = new DelegateColumn(column) {
@Override
public boolean isViewReferenced() {
return isViewColumnReferenced.get(columnPosition);
}
});
}
// if the base table column is referenced in the view
// or if we are adding a new column during view creation
if (isViewColumnReferenced.get(columnPosition) ||
viewNewColumnPositions.contains(
columnPosition)) {
// acquire the mutex using the global physical table
// name to prevent this column from being dropped
// while the view is being created or to prevent
// a conflicting column from being added to a parent
// in case the view creation adds new columns
boolean acquiredMutex = writeCell(
null,
parentPhysicalSchemaName,
parentPhysicalTableName,
column.toString());
if (!acquiredMutex) {
throw new ConcurrentTableMutationException(
parentPhysicalSchemaName,
parentPhysicalTableName);
}
acquiredColumnMutexSet.add(column.toString());
}
}
Short keySeq = SchemaUtil.isPKColumn(column) ? ++nextKeySeq : null;
// Prior to PHOENIX-3534 we were sending the parent table column metadata while creating a
// child view, now that we combine columns by resolving the parent table hierarchy we
// don't need to include the parent table columns while creating a view
// If QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK is true we continue
// to store the parent table column metadata along with the child view metadata
// so that we can rollback the upgrade if required.
if (allowSystemCatalogRollback || !isRegularView
|| columnPosition >= baseTableColumnCount) {
addColumnMutation(connection, schemaName, tableName, column, parentTableName,
pkName, keySeq, saltBucketNum != null);
columnMetadata.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
connection.rollback();
}
}
// add the columns in reverse order since we reverse the list later
Collections.reverse(columnMetadata);
tableMetaData.addAll(columnMetadata);
String dataTableName = parent == null || tableType == PTableType.VIEW ? null : parent.getTableName().getString();
PIndexState defaultCreateState;
String defaultCreateStateString = connection.getClientInfo(INDEX_CREATE_DEFAULT_STATE);
if (defaultCreateStateString == null) {
defaultCreateStateString = connection.getQueryServices().getConfiguration().get(
INDEX_CREATE_DEFAULT_STATE, QueryServicesOptions.DEFAULT_CREATE_INDEX_STATE);
}
defaultCreateState = PIndexState.valueOf(defaultCreateStateString);
if (defaultCreateState == PIndexState.CREATE_DISABLE) {
if (indexType == IndexType.LOCAL || sharedTable) {
defaultCreateState = PIndexState.BUILDING;
}
}
PIndexState indexState = parent == null ||
(tableType == PTableType.VIEW || tableType == PTableType.CDC) ?
null : defaultCreateState;
if (indexState == null && tableProps.containsKey(INDEX_STATE)) {
indexState = PIndexState.fromSerializedValue(tableProps.get(INDEX_STATE).toString());
}
PreparedStatement tableUpsert = connection.prepareStatement(CREATE_TABLE);
tableUpsert.setString(1, tenantIdStr);
tableUpsert.setString(2, schemaName);
tableUpsert.setString(3, tableName);
tableUpsert.setString(4, tableType.getSerializedValue());
tableUpsert.setLong(5, PTable.INITIAL_SEQ_NUM);
tableUpsert.setInt(6, position);
if (saltBucketNum != null) {
tableUpsert.setInt(7, saltBucketNum);
} else {
tableUpsert.setNull(7, Types.INTEGER);
}
tableUpsert.setString(8, pkName);
tableUpsert.setString(9, dataTableName);
tableUpsert.setString(10, indexState == null ? null : indexState.getSerializedValue());
tableUpsert.setBoolean(11, isImmutableRows);
tableUpsert.setString(12, defaultFamilyName);
if (parent != null && parent.getAutoPartitionSeqName() != null && viewStatement==null) {
// set to non-null value so that we will generate a Put that
// will be set correctly on the server
tableUpsert.setString(13, QueryConstants.EMPTY_COLUMN_VALUE);
}
else {
tableUpsert.setString(13, viewStatement);
}
tableUpsert.setBoolean(14, disableWAL);
tableUpsert.setBoolean(15, multiTenant);
if (viewType == null) {
tableUpsert.setNull(16, Types.TINYINT);
} else {
tableUpsert.setByte(16, viewType.getSerializedValue());
}
if (indexType == null) {
tableUpsert.setNull(17, Types.TINYINT);
} else {
tableUpsert.setByte(17, indexType.getSerializedValue());
}
tableUpsert.setBoolean(18, storeNulls);
if (parent != null && tableType == PTableType.VIEW) {
tableUpsert.setInt(19, parent.getColumns().size());
} else {
tableUpsert.setInt(19, BASE_TABLE_BASE_COLUMN_COUNT);
}
if (transactionProvider == null) {
tableUpsert.setNull(20, Types.TINYINT);
} else {
tableUpsert.setByte(20, transactionProvider.getCode());
}
tableUpsert.setLong(21, updateCacheFrequency);
tableUpsert.setBoolean(22, isNamespaceMapped);
if (autoPartitionSeq == null) {
tableUpsert.setNull(23, Types.VARCHAR);
} else {
tableUpsert.setString(23, autoPartitionSeq);
}
tableUpsert.setBoolean(24, isAppendOnlySchema);
if (guidePostsWidth == null) {
tableUpsert.setNull(25, Types.BIGINT);
} else {
tableUpsert.setLong(25, guidePostsWidth);
}
tableUpsert.setByte(26, immutableStorageScheme.getSerializedMetadataValue());
tableUpsert.setByte(27, encodingScheme.getSerializedMetadataValue());
if (useStatsForParallelizationProp == null) {
tableUpsert.setNull(28, Types.BOOLEAN);
} else {
tableUpsert.setBoolean(28, useStatsForParallelizationProp);
}
if (indexType == IndexType.LOCAL ||
(parent != null && parent.getType() == PTableType.VIEW
&& tableType == PTableType.INDEX)) {
tableUpsert.setInt(29, viewIndexIdType.getSqlType());
} else {
tableUpsert.setNull(29, Types.NULL);
}
if (isChangeDetectionEnabledProp == null) {
tableUpsert.setNull(30, Types.BOOLEAN);
} else {
tableUpsert.setBoolean(30, isChangeDetectionEnabledProp);
}
if (physicalTableName == null){
tableUpsert.setNull(31, Types.VARCHAR);
} else {
tableUpsert.setString(31, physicalTableName);
}
if (schemaVersion == null) {
tableUpsert.setNull(32, Types.VARCHAR);
} else {
tableUpsert.setString(32, schemaVersion);
}
if (streamingTopicName == null) {
tableUpsert.setNull(33, Types.VARCHAR);
} else {
tableUpsert.setString(33, streamingTopicName);
}
if (tableType == INDEX && statement.getWhereClause() != null) {
tableUpsert.setString(34, statement.getWhereClause().toString());
} else {
tableUpsert.setNull(34, Types.VARCHAR);
}
if (cdcIncludeScopesStr == null) {
tableUpsert.setNull(35, Types.VARCHAR);
} else {
tableUpsert.setString(35, cdcIncludeScopesStr);
}
if (ttl == null || ttl.equals(TTL_EXPRESSION_NOT_DEFINED)) {
tableUpsert.setNull(36, Types.VARCHAR);
} else {
tableUpsert.setString(36, ttl.getTTLExpression());
}
if ((rowKeyMatcher == null) ||
Bytes.compareTo(rowKeyMatcher, HConstants.EMPTY_BYTE_ARRAY) == 0) {
tableUpsert.setNull(37, PDataType.VARBINARY_ENCODED_TYPE);
} else {
tableUpsert.setBytes(37, rowKeyMatcher);
}
tableUpsert.execute();
if (asyncCreatedDate != null) {
try (PreparedStatement setAsync = connection.prepareStatement(SET_ASYNC_CREATED_DATE)) {
setAsync.setString(1, tenantIdStr);
setAsync.setString(2, schemaName);
setAsync.setString(3, tableName);
setAsync.setDate(4, asyncCreatedDate);
setAsync.execute();
}
} else {
Date syncCreatedDate = new Date(EnvironmentEdgeManager.currentTimeMillis());
try (PreparedStatement setSync = connection.prepareStatement(SET_INDEX_SYNC_CREATED_DATE)) {
setSync.setString(1, tenantIdStr);
setSync.setString(2, schemaName);
setSync.setString(3, tableName);
setSync.setDate(4, syncCreatedDate);
setSync.execute();
}
}
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
connection.rollback();
/*
* The table metadata must be in the following order:
* 1) table header row
* 2) ordered column rows
* 3) parent table header row
*/
Collections.reverse(tableMetaData);
if (indexType != IndexType.LOCAL) {
splits = SchemaUtil.processSplits(splits, pkColumns, saltBucketNum, connection.getQueryServices().getProps().getBoolean(
QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER));
}
// Modularized this code for unit testing
PName parentName = physicalNames !=null && physicalNames.size() > 0 ? physicalNames.get(0) : null;
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("createTable tableName=" + tableName + " parent=" + (parent == null ? "" : parent.getTableName() + "-" + parent.getPhysicalName()) + " parent physical=" + parentName + "-" + (physicalNames.size() > 0 ? physicalNames.get(0) : "null") + " viewType " + viewType + allocateIndexId);
}
MetaDataMutationResult result = connection.getQueryServices().createTable(tableMetaData
,viewType == ViewType.MAPPED || allocateIndexId ? physicalNames.get(0).getBytes()
: null, tableType, tableProps, familyPropList, splits, isNamespaceMapped,
allocateIndexId, UpgradeUtil.isNoUpgradeSet(connection.getClientInfo()), parent);
MutationCode code = result.getMutationCode();
try {
if (code != MutationCode.TABLE_NOT_FOUND) {
boolean tableAlreadyExists = handleCreateTableMutationCode(result, code, statement,
schemaName, tableName, parent);
if (tableAlreadyExists) {
return null;
}
}
// If the parent table of the view has the auto partition sequence name attribute,
// set the view statement and relevant partition column attributes correctly
if (parent != null && parent.getAutoPartitionSeqName() != null) {
final PColumn autoPartitionCol = parent.getPKColumns().get(MetaDataUtil
.getAutoPartitionColIndex(parent));
final Long autoPartitionNum = Long.valueOf(result.getAutoPartitionNum());
columns.put(autoPartitionCol, new DelegateColumn(autoPartitionCol) {
@Override
public byte[] getViewConstant() {
PDataType dataType = autoPartitionCol.getDataType();
Object val = dataType.toObject(autoPartitionNum, PLong.INSTANCE);
byte[] bytes = new byte[dataType.getByteSize() + 1];
dataType.toBytes(val, bytes, 0);
return bytes;
}
@Override
public boolean isViewReferenced() {
return true;
}
});
String viewPartitionClause = QueryUtil.getViewPartitionClause(MetaDataUtil
.getAutoPartitionColumnName(parent), autoPartitionNum);
if (viewStatement != null) {
viewStatement = viewStatement + " AND " + viewPartitionClause;
} else {
viewStatement = QueryUtil.getViewStatement(parent.getSchemaName().getString(),
parent.getTableName().getString(), viewPartitionClause);
}
}
PName newSchemaName = PNameFactory.newName(schemaName);
/*
* It doesn't hurt for the PTable of views to have the cqCounter. However, views always
* rely on the parent table's counter to dole out encoded column qualifiers. So setting
* the counter as NULL_COUNTER for extra safety.
*/
EncodedCQCounter cqCounterToBe = tableType == PTableType.VIEW ? NULL_COUNTER : cqCounter;
PTable table = new PTableImpl.Builder()
.setType(tableType)
.setState(indexState)
.setTimeStamp(timestamp != null ? timestamp : result.getMutationTime())
.setIndexDisableTimestamp(0L)
.setSequenceNumber(PTable.INITIAL_SEQ_NUM)
.setImmutableRows(isImmutableRows)
.setViewStatement(viewStatement)
.setDisableWAL(Boolean.TRUE.equals(disableWAL))
.setMultiTenant(multiTenant)
.setStoreNulls(storeNulls)
.setViewType(viewType)
.setViewIndexIdType(viewIndexIdType)
.setViewIndexId(result.getViewIndexId())
.setIndexType(indexType)
.setTransactionProvider(transactionProvider)
.setUpdateCacheFrequency(updateCacheFrequency)
.setNamespaceMapped(isNamespaceMapped)
.setAutoPartitionSeqName(autoPartitionSeq)
.setAppendOnlySchema(isAppendOnlySchema)
.setImmutableStorageScheme(immutableStorageScheme)
.setQualifierEncodingScheme(encodingScheme)
.setBaseColumnCount(baseTableColumnCount)
.setEncodedCQCounter(cqCounterToBe)
.setUseStatsForParallelization(useStatsForParallelizationProp)
.setExcludedColumns(ImmutableList.<PColumn>of())
.setTenantId(tenantId)
.setSchemaName(newSchemaName)
.setTableName(PNameFactory.newName(tableName))
.setPkName(pkName == null ? null : PNameFactory.newName(pkName))
.setDefaultFamilyName(defaultFamilyName == null ?
null : PNameFactory.newName(defaultFamilyName))
.setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
.setBucketNum(saltBucketNum)
.setIndexes(Collections.<PTable>emptyList())
.setParentSchemaName((parent == null) ? null : parent.getSchemaName())
.setParentTableName((parent == null) ? null : parent.getTableName())
.setPhysicalNames(ImmutableList.copyOf(physicalNames))
.setColumns(columns.values())
.setViewModifiedUpdateCacheFrequency(tableType == PTableType.VIEW &&
parent != null &&
parent.getUpdateCacheFrequency() != updateCacheFrequency)
.setViewModifiedUseStatsForParallelization(tableType == PTableType.VIEW &&
parent != null &&
parent.useStatsForParallelization()
!= useStatsForParallelizationProp)
.setLastDDLTimestamp(result.getTable() != null ?
result.getTable().getLastDDLTimestamp() : null)
.setIsChangeDetectionEnabled(isChangeDetectionEnabledProp)
.setSchemaVersion(schemaVersion)
.setExternalSchemaId(result.getTable() != null ?
result.getTable().getExternalSchemaId() : null)
.setStreamingTopicName(streamingTopicName)
.setIndexWhere(statement.getWhereClause() == null ? null
: statement.getWhereClause().toString())
.setCDCIncludeScopes(cdcIncludeScopes)
.setTTL(ttl == null || ttl.equals(TTL_EXPRESSION_NOT_DEFINED) ?
TTLExpressionFactory.create(ttlFromHierarchy) :
TTLExpressionFactory.create(ttl))
.setRowKeyMatcher(rowKeyMatcher)
.build();
result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
addTableToCache(result, false);
return table;
} catch (Throwable e) {
TableMetricsManager.updateMetricsForSystemCatalogTableMethod(tableNameNode.toString(),
NUM_METADATA_LOOKUP_FAILURES, 1);
throw e;
}
} finally {
connection.setAutoCommit(wasAutoCommit);
deleteMutexCells(parentPhysicalSchemaName, parentPhysicalTableName,
acquiredColumnMutexSet);
}
}