in phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java [351:560]
private PhoenixResultSet executeQuery(final CompilableStatement stmt,
final boolean doRetryOnMetaNotFoundError,
final QueryLogger queryLogger, final boolean noCommit,
boolean shouldValidateLastDdlTimestamp)
throws SQLException {
GLOBAL_SELECT_SQL_COUNTER.increment();
try {
return CallRunner
.run(new CallRunner.CallableThrowable<PhoenixResultSet, SQLException>() {
@Override public PhoenixResultSet call() throws SQLException {
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
boolean success = false;
boolean updateMetrics = true;
boolean pointLookup = false;
String tableName = null;
clearResultSet();
PhoenixResultSet rs = null;
QueryPlan plan = null;
try {
PhoenixConnection conn = getConnection();
conn.checkOpen();
if (conn.getQueryServices().isUpgradeRequired() && !conn
.isRunningUpgrade()
&& stmt.getOperation() != Operation.UPGRADE) {
throw new UpgradeRequiredException();
}
plan = stmt.compilePlan(PhoenixStatement.this,
Sequence.ValueOp.VALIDATE_SEQUENCE);
// Send mutations to hbase, so they are visible to subsequent reads.
// Use original plan for data table so that data and immutable indexes will be sent
// TODO: for joins, we need to iterate through all tables, but we need the original table,
// not the projected table, so plan.getContext().getResolver().getTables() won't work.
if (plan.getContext().getScanRanges().isPointLookup()) {
pointLookup = true;
}
Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator();
connection.getMutationState().sendUncommitted(tableRefs);
plan =
connection.getQueryServices().getOptimizer()
.optimize(PhoenixStatement.this, plan);
setLastQueryPlan(plan);
//verify metadata for the table/view/index in the query plan
//plan.getTableRef can be null in some cases like EXPLAIN <query>
if (shouldValidateLastDdlTimestamp && plan.getTableRef() != null) {
ValidateLastDDLTimestampUtil.validateLastDDLTimestamp(
connection, Arrays.asList(plan.getTableRef()), true);
}
if (plan.getTableRef() != null
&& plan.getTableRef().getTable() != null && !Strings
.isNullOrEmpty(
plan.getTableRef().getTable().getPhysicalName()
.toString())) {
tableName = plan.getTableRef().getTable().getPhysicalName()
.toString();
}
// this will create its own trace internally, so we don't wrap this
// whole thing in tracing
ResultIterator resultIterator = plan.iterator();
if (LOGGER.isDebugEnabled()) {
String explainPlan = QueryUtil.getExplainPlan(resultIterator);
LOGGER.debug(LogUtil.addCustomAnnotations(
"Explain plan: " + explainPlan, connection));
}
StatementContext context = plan.getContext();
context.setQueryLogger(queryLogger);
if (queryLogger.isDebugEnabled()) {
queryLogger.log(QueryLogInfo.EXPLAIN_PLAN_I,
QueryUtil.getExplainPlan(resultIterator));
queryLogger.log(QueryLogInfo.GLOBAL_SCAN_DETAILS_I,
context.getScan() != null ?
context.getScan().toString() :
null);
}
context.getOverallQueryMetrics().startQuery();
rs =
newResultSet(resultIterator, plan.getProjector(),
plan.getContext());
// newResultset sets lastResultset
// ExecutableShowCreateTable/ExecutableShowTablesStatement/ExecutableShowSchemasStatement using a delegateStmt
// to compile a queryPlan, the resultSet will set to the delegateStmt, so need set resultSet
// to the origin statement.
setLastResultSet(rs);
setLastQueryPlan(plan);
setLastUpdateCount(NO_UPDATE);
setLastUpdateTable(tableName == null ? TABLE_UNKNOWN : tableName);
setLastUpdateOperation(stmt.getOperation());
// If transactional, this will move the read pointer forward
if (connection.getAutoCommit() && !noCommit) {
connection.commit();
}
connection.incrementStatementExecutionCounter();
success = true;
}
//Force update cache and retry if meta not found error occurs
catch (MetaDataEntityNotFoundException e) {
if (doRetryOnMetaNotFoundError && e.getTableName() != null) {
String sName = e.getSchemaName();
String tName = e.getTableName();
// when the query plan uses the local index PTable,
// the TNFE can still be for the base table
if (plan != null && plan.getTableRef() != null) {
PTable queryPlanTable = plan.getTableRef().getTable();
if (queryPlanTable != null
&& queryPlanTable.getIndexType()
== IndexType.LOCAL) {
sName = queryPlanTable.getSchemaName().getString();
tName = queryPlanTable.getTableName().getString();
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Reloading table {} data from server",
tName);
}
if (new MetaDataClient(connection)
.updateCache(connection.getTenantId(),
sName, tName, true)
.wasUpdated()) {
updateMetrics = false;
//TODO we can log retry count and error for debugging in LOG table
return executeQuery(stmt, false, queryLogger, noCommit,
shouldValidateLastDdlTimestamp);
}
}
throw e;
} catch (StaleMetadataCacheException e) {
GlobalClientMetrics
.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER
.increment();
updateMetrics = false;
PTable pTable = lastQueryPlan.getTableRef().getTable();
String schemaN = pTable.getSchemaName().toString();
String tableN = pTable.getTableName().toString();
PName tenantId = connection.getTenantId();
LOGGER.debug("Force updating client metadata cache for {}",
ValidateLastDDLTimestampUtil.getInfoString(tenantId,
Arrays.asList(getLastQueryPlan().getTableRef())));
// force update client metadata cache for the table/view
// this also updates the cache for all ancestors in case of a view
new MetaDataClient(connection)
.updateCache(tenantId, schemaN, tableN, true);
// skip last ddl timestamp validation in the retry
return executeQuery(stmt, doRetryOnMetaNotFoundError, queryLogger,
noCommit, false);
}
catch (RuntimeException e) {
// FIXME: Expression.evaluate does not throw SQLException
// so this will unwrap throws from that.
if (e.getCause() instanceof SQLException) {
throw (SQLException) e.getCause();
}
throw e;
} finally {
if (updateMetrics) {
// Regardless of whether the query was successfully handled or not,
// update the time spent so far. If needed, we can separate out the
// success times and failure times.
GLOBAL_QUERY_TIME.update(EnvironmentEdgeManager.currentTimeMillis()
- startTime);
long
executeQueryTimeSpent =
EnvironmentEdgeManager.currentTimeMillis() - startTime;
if (tableName != null) {
TableMetricsManager
.updateMetricsMethod(tableName, SELECT_SQL_COUNTER, 1);
TableMetricsManager
.updateMetricsMethod(tableName, SELECT_SQL_QUERY_TIME,
executeQueryTimeSpent);
if (success) {
TableMetricsManager.updateMetricsMethod(tableName,
SELECT_SUCCESS_SQL_COUNTER, 1);
TableMetricsManager.updateMetricsMethod(tableName,
pointLookup ?
SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER :
SELECT_SCAN_SUCCESS_SQL_COUNTER, 1);
} else {
TableMetricsManager.updateMetricsMethod(tableName,
SELECT_FAILED_SQL_COUNTER, 1);
TableMetricsManager.updateMetricsMethod(tableName,
SELECT_AGGREGATE_FAILURE_SQL_COUNTER, 1);
TableMetricsManager.updateMetricsMethod(tableName,
pointLookup ?
SELECT_POINTLOOKUP_FAILED_SQL_COUNTER :
SELECT_SCAN_FAILED_SQL_COUNTER, 1);
}
}
if (rs != null) {
rs.setQueryTime(executeQueryTimeSpent);
}
}
}
return rs;
}
}, PhoenixContextExecutor.inContext());
} catch (Exception e) {
if (queryLogger.isDebugEnabled()) {
queryLogger
.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString());
queryLogger.sync(null, null);
}
Throwables.propagateIfInstanceOf(e, SQLException.class);
Throwables.propagate(e);
throw new IllegalStateException(); // Can't happen as Throwables.propagate() always throws
}
}