in phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewTTLIT.java [2184:2522]
protected void testMajorCompactWithVariousTenantIdTypesAndRegions(PDataType tenantType) throws Exception {
resetEnvironmentEdgeManager();
boolean isIndex1Local = false;
boolean isIndex2Local = false;
// View TTL is set in seconds (for e.g 10 secs)
int viewTTL = VIEW_TTL_10_SECS;
String tenantTypeName = tenantType.getSqlTypeName();
TableOptions tableOptions = TableOptions.withDefaults();
tableOptions.setTableProps("");
tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true,DEFAULT_COLUMN_FAMILY='0'");
tableOptions.setTablePKColumns(Arrays.asList("OID", "KP"));
tableOptions.setTablePKColumnTypes(Arrays.asList(tenantTypeName, "CHAR(3)"));
GlobalViewOptions globalViewOptions = GlobalViewOptions.withDefaults();
TenantViewOptions tenantViewOptions = new TenantViewOptions();
tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
tenantViewOptions
.setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
DataOptions dataOptions = DataOptions.withDefaults();
OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
testCaseWhenAllCFMatchAndAllDefault
.setTableCFs(Lists.newArrayList((String) null, null, null));
testCaseWhenAllCFMatchAndAllDefault
.setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
testCaseWhenAllCFMatchAndAllDefault
.setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
schemaBuilder
.withTableOptions(tableOptions)
.withGlobalViewOptions(globalViewOptions)
.withTenantViewOptions(tenantViewOptions)
.withDataOptions(dataOptions)
.withOtherOptions(testCaseWhenAllCFMatchAndAllDefault);
int tenantNum = dataOptions.getTenantNumber();
try (Connection globalConnection = DriverManager.getConnection(getUrl())) {
String entityTableName = SchemaUtil.getTableName(
dataOptions.getSchemaName(), dataOptions.getTableName());
String CO_BASE_TBL_TEMPLATE =
"CREATE TABLE IF NOT EXISTS %s " +
"(OID %s NOT NULL,KP CHAR(3) NOT NULL, " +
"COL1 VARCHAR, COL2 VARCHAR, COL3 VARCHAR " +
"CONSTRAINT pk PRIMARY KEY (OID,KP)) " +
"MULTI_TENANT=true,COLUMN_ENCODED_BYTES=0,DEFAULT_COLUMN_FAMILY='0' " +
"SPLIT ON (?, ?, ?)";
String createBaseTableSQL = String.format(CO_BASE_TBL_TEMPLATE,
entityTableName, tenantType.getSqlTypeName());
try (PreparedStatement pstmt = globalConnection.prepareStatement(createBaseTableSQL)) {
switch (tenantType.getSqlType()) {
case Types.VARCHAR:
case Types.CHAR:
pstmt.setString(1, String.format("00D0t%04d", tenantNum + 3 ));
pstmt.setString(2, String.format("00D0t%04d", tenantNum + 5 ));
pstmt.setString(3, String.format("00D0t%04d", tenantNum + 7 ));
break;
case Types.INTEGER:
pstmt.setInt(1, 300000);
pstmt.setInt(2, 500000);
pstmt.setInt(3, 700000);
break;
case Types.BIGINT:
pstmt.setLong(1, 30000000000l);
pstmt.setLong(2, 50000000000l);
pstmt.setLong(3, 70000000000l);
break;
}
pstmt.execute();
}
schemaBuilder.setTableCreated();
PTableKey
tableKey =
new PTableKey(null, SchemaUtil.normalizeFullTableName(entityTableName));
schemaBuilder.setBaseTable(
globalConnection.unwrap(PhoenixConnection.class).getTable(tableKey));
}
OtherOptions otherOptions = OtherOptions.withDefaults();
otherOptions.setTenantViewCFs(asList(null, null, null, null));
Set<Integer> globalSet = new HashSet<>(Arrays.asList(new Integer[] { 1, 2, 3, 4}));
Set<Integer> hasGlobalTTLSet = new HashSet<>(Arrays.asList(new Integer[] { 2, 3 }));
Set<Integer> tenantSet = new HashSet<>(Arrays.asList(new Integer[] { 1, 4, 6, 8}));
Set<Integer> hasTenantTTLSet = new HashSet<>(Arrays.asList(new Integer[] { 1, 6 }));
int numGlobalIndex = 2;
int numTenantIndex = 2;
int nonCompactedGlobalSet = globalSet.size() - hasGlobalTTLSet.size(); // == 2;
int nonCompactedTenantSet = (globalSet.size() * tenantSet.size()) // Total # views
- (hasGlobalTTLSet.size() * tenantSet.size()) // # global views compacted
- (hasTenantTTLSet.size() * nonCompactedGlobalSet); // #tenant views compacted
int nonCompactedTableRows = nonCompactedTenantSet * DEFAULT_NUM_ROWS;
int nonCompactedTenantIndexRows = nonCompactedTenantSet * DEFAULT_NUM_ROWS * numTenantIndex ;
int nonCompactedGlobalIndexRows = nonCompactedGlobalSet * DEFAULT_NUM_ROWS * numGlobalIndex * tenantSet.size();
String baseGlobalViewName = dataOptions.getGlobalViewName();
long earliestTimestamp = EnvironmentEdgeManager.currentTimeMillis();
for (int globalView : globalSet) {
for (int tenant : tenantSet) {
String globalViewName = String.format("%s_%d", baseGlobalViewName, globalView);
dataOptions.setGlobalViewName(globalViewName);
dataOptions.setKeyPrefix(String.format("KP%d", globalView));
dataOptions.setTenantViewName(String.format("Z%d%d", globalView, tenant));
globalViewOptions.setTableProps("");
tenantViewOptions.setTableProps("");
// Set TTL only when hasGlobalTTLSet OR hasTenantTTLSet
// View TTL is set to 10s => 10000 ms
if (hasGlobalTTLSet.contains(globalView)) {
globalViewOptions.setTableProps(String.format("TTL='%s'",
getTTLExpression(viewTTL)));
} else if (hasTenantTTLSet.contains(tenant)) {
tenantViewOptions.setTableProps(String.format("TTL='%s'",
getTTLExpression(viewTTL)));
}
if (schemaBuilder.getDataOptions() != null) {
// build schema for tenant
switch (tenantType.getSqlType()) {
case Types.VARCHAR:
case Types.CHAR:
// ensure a new tenantid is generated for every iteration of the test
schemaBuilder.getDataOptions().setTenantId(dataOptions.getNextTenantId());
break;
case Types.INTEGER:
// ensure a new tenantid is generated for every iteration of the test
schemaBuilder.getDataOptions().setTenantId(Integer.toString(
dataOptions.getNextTenantNumber()*100000));
break;
case Types.BIGINT:
// ensure a new tenantid is generated for every iteration of the test
schemaBuilder.getDataOptions().setTenantId(Long.toString(
dataOptions.getNextTenantNumber()*10000000000l));
break;
}
}
schemaBuilder
.withTableOptions(tableOptions)
.withGlobalViewOptions(globalViewOptions)
.withTenantViewOptions(tenantViewOptions)
.withDataOptions(dataOptions)
.withOtherOptions(otherOptions)
.buildWithNewTenant();
try (Connection globalConn = DriverManager.getConnection(getUrl());
final Statement statement = globalConn.createStatement()) {
String index1Name = String.format("IDX_%s_%s",
schemaBuilder.getEntityGlobalViewName().replaceAll("\\.", "_"),
"COL4");
final String index1Str = String.format("CREATE %s INDEX IF NOT EXISTS "
+ "%s ON %s (%s) INCLUDE (%s)", isIndex1Local ? "LOCAL" : "", index1Name,
schemaBuilder.getEntityGlobalViewName(), "COL4", "COL5"
);
statement.execute(index1Str);
String index2Name = String.format("IDX_%s_%s",
schemaBuilder.getEntityGlobalViewName().replaceAll("\\.", "_"),
"COL5");
final String index2Str = String.format("CREATE %s INDEX IF NOT EXISTS "
+ "%s ON %s (%s) INCLUDE (%s)", isIndex2Local ? "LOCAL" : "", index2Name,
schemaBuilder.getEntityGlobalViewName(), "COL5", "COL6"
);
statement.execute(index2Str);
}
String tenantConnectUrl =
getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
try (Connection tenantConn = DriverManager.getConnection(tenantConnectUrl);
final Statement statement = tenantConn.createStatement()) {
PhoenixConnection phxConn = tenantConn.unwrap(PhoenixConnection.class);
String index1Name = String.format("IDX_%s_%s",
schemaBuilder.getEntityTenantViewName().replaceAll("\\.", "_"),
"COL9");
final String index1Str = String.format("CREATE %s INDEX IF NOT EXISTS "
+ "%s ON %s (%s) INCLUDE (%s)", isIndex1Local ? "LOCAL" : "",
index1Name, schemaBuilder.getEntityTenantViewName(), "COL9",
"COL8");
statement.execute(index1Str);
String index2Name = String.format("IDX_%s_%s",
schemaBuilder.getEntityTenantViewName().replaceAll("\\.", "_"),
"COL7");
final String index2Str = String.format("CREATE %s INDEX IF NOT EXISTS "
+ "%s ON %s (%s) INCLUDE (%s)", isIndex2Local ? "LOCAL" : "",
index2Name, schemaBuilder.getEntityTenantViewName(), "COL7",
"COL8");
statement.execute(index2Str);
String defaultViewIndexName = String.format("IDX_%s", SchemaUtil
.getTableNameFromFullName(
schemaBuilder.getEntityTenantViewName()));
}
// Define the test data.
DataSupplier
dataSupplier = new DataSupplier() {
@Override public List<Object> getValues(int rowIndex) {
Random rnd = new Random();
String id = String.format(ID_FMT, rowIndex);
String zid = String.format(ZID_FMT, rowIndex);
String col1 = String.format(COL1_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
String col2 = String.format(COL2_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
String col3 = String.format(COL3_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
return Lists.newArrayList(
new Object[] { id, col1, col2, col3, col4, col5, col6,
zid, col7, col8, col9 });
}
};
// Create a test data reader/writer for the above schema.
DataWriter
dataWriter = new BasicDataWriter();
DataReader
dataReader = new BasicDataReader();
List<String> columns =
Lists.newArrayList("ID",
"COL1", "COL2", "COL3", "COL4", "COL5",
"COL6", "ZID", "COL7", "COL8", "COL9");
List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
try (Connection writeConnection = DriverManager
.getConnection(tenantConnectUrl)) {
writeConnection.setAutoCommit(true);
dataWriter.setConnection(writeConnection);
dataWriter.setDataSupplier(dataSupplier);
dataWriter.setUpsertColumns(columns);
dataWriter.setRowKeyColumns(rowKeyColumns);
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
org.apache.phoenix.thirdparty.com.google.common.collect.Table<String, String, Object>
upsertedData =
upsertData(dataWriter, DEFAULT_NUM_ROWS);
dataReader.setValidationColumns(columns);
dataReader.setRowKeyColumns(rowKeyColumns);
dataReader.setDML(String
.format("SELECT %s from %s", Joiner.on(",").join(columns),
schemaBuilder.getEntityTenantViewName()));
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis();
if (hasGlobalTTLSet.contains(globalView) || hasTenantTTLSet.contains(tenant)) {
LOGGER.debug("Validating {}, {}, {}", schemaBuilder.getDataOptions().getTenantId(), globalView, tenant);
// Validate data before and after ttl expiration.
validateExpiredRowsAreNotReturnedUsingData(viewTTL, upsertedData,
dataReader, schemaBuilder);
} else {
validateRowsAreNotMaskedUsingCounts(
scnTimestamp,
dataReader,
schemaBuilder);
}
// Case : count(1) sql (uses index)
dataReader.setValidationColumns(Arrays.asList("num_rows"));
dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
dataReader.setDML(String
.format("SELECT count(1) as num_rows from %s HAVING count(1) > 0",
schemaBuilder.getEntityTenantViewName()));
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
// Validate data before and after ttl expiration.
if (hasGlobalTTLSet.contains(globalView) || hasTenantTTLSet.contains(tenant)) {
LOGGER.debug("Validating {}, {}, {}", schemaBuilder.getDataOptions().getTenantId(), globalView, tenant);
validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, schemaBuilder);
} else {
validateRowsAreNotMaskedUsingCounts(scnTimestamp, dataReader, schemaBuilder);
}
// Case : group by sql (does not use index)
dataReader.setValidationColumns(Arrays.asList("num_rows"));
dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
dataReader.setDML(String
.format("SELECT count(1) as num_rows from %s GROUP BY ID HAVING count(1) > 0",
schemaBuilder.getEntityTenantViewName()));
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
// Validate data before and after ttl expiration.
if (hasGlobalTTLSet.contains(globalView) || hasTenantTTLSet.contains(tenant)) {
LOGGER.debug("Validating {}, {}, {}", schemaBuilder.getDataOptions().getTenantId(), globalView, tenant);
validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, schemaBuilder);
} else {
validateRowsAreNotMaskedUsingCounts(scnTimestamp, dataReader, schemaBuilder);
}
}
}
}
PTable table = schemaBuilder.getBaseTable();
// validate multi-tenanted base table
validateAfterMajorCompaction(
table.getSchemaName().toString(),
table.getTableName().toString(),
false,
earliestTimestamp,
VIEW_TTL_10_SECS,
false,
nonCompactedTableRows
);
// validate multi-tenanted index table
validateAfterMajorCompaction(
table.getSchemaName().toString(),
table.getTableName().toString(),
true,
earliestTimestamp,
VIEW_TTL_10_SECS,
false,
(nonCompactedTenantIndexRows + nonCompactedGlobalIndexRows)
);
}