in phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java [598:778]
private Pair<Integer, ResultSet> executeMutation(final CompilableStatement stmt,
final boolean doRetryOnMetaNotFoundError, final AuditQueryLogger queryLogger,
final ReturnResult returnResult) throws SQLException {
if (connection.isReadOnly()) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.READ_ONLY_CONNECTION).
build().buildException();
}
GLOBAL_MUTATION_SQL_COUNTER.increment();
try {
return CallRunner
.run(
new CallRunner.CallableThrowable<Pair<Integer, ResultSet>, SQLException>() {
@Override
public Pair<Integer, ResultSet> call() throws SQLException {
boolean success = false;
String tableName = null;
boolean isUpsert = false;
boolean isAtomicUpsert = false;
boolean isDelete = false;
MutationState state = null;
MutationPlan plan = null;
final long startExecuteMutationTime = EnvironmentEdgeManager.currentTimeMillis();
clearResultSet();
try {
PhoenixConnection conn = getConnection();
if (conn.getQueryServices().isUpgradeRequired() && !conn.isRunningUpgrade()
&& stmt.getOperation() != Operation.UPGRADE) {
throw new UpgradeRequiredException();
}
state = connection.getMutationState();
isUpsert = stmt instanceof ExecutableUpsertStatement;
isDelete = stmt instanceof ExecutableDeleteStatement;
if (isDelete && connection.getAutoCommit() &&
returnResult == ReturnResult.ROW) {
// used only if single row deletion needs to atomically
// return row that is deleted.
plan = ((ExecutableDeleteStatement) stmt).compilePlan(
PhoenixStatement.this,
Sequence.ValueOp.VALIDATE_SEQUENCE, returnResult);
} else {
plan = stmt.compilePlan(PhoenixStatement.this,
Sequence.ValueOp.VALIDATE_SEQUENCE);
}
isAtomicUpsert = isUpsert && ((ExecutableUpsertStatement)stmt).getOnDupKeyPairs() != null;
if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null) {
if (!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString())) {
tableName = plan.getTargetRef().getTable().getPhysicalName().toString();
}
if (plan.getTargetRef().getTable().isTransactional()) {
state.startTransaction(plan.getTargetRef().getTable().getTransactionProvider());
}
}
Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator();
state.sendUncommitted(tableRefs);
state.checkpointIfNeccessary(plan);
checkIfDDLStatementandMutationState(stmt, state);
MutationState lastState = plan.execute();
state.join(lastState);
// Unfortunately, JDBC uses an int for update count, so we
// just max out at Integer.MAX_VALUE
int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE,
lastState.getUpdateCount());
Result result = null;
if (connection.getAutoCommit()) {
boolean singleRowUpdate = isSingleRowUpdatePlan(isUpsert,
isDelete, plan);
if (singleRowUpdate) {
state.setReturnResult(returnResult);
}
connection.commit();
if (isAtomicUpsert) {
lastUpdateCount = connection.getMutationState()
.getNumUpdatedRowsForAutoCommit();
}
result = connection.getMutationState().getResult();
connection.getMutationState().clearResult();
result = getResult(singleRowUpdate, isDelete, plan,
lastState, result);
}
setLastQueryPlan(null);
setLastUpdateCount(lastUpdateCount);
setLastUpdateOperation(stmt.getOperation());
setLastUpdateTable(tableName == null ? TABLE_UNKNOWN : tableName);
connection.incrementStatementExecutionCounter();
if (queryLogger.isAuditLoggingEnabled()) {
queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt));
queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.COMPLETED.toString());
queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, lastUpdateCount);
queryLogger.syncAudit();
}
success = true;
return new Pair<>(lastUpdateCount,
result == null || result.isEmpty() ?
null : TupleUtil.getResultSet(new ResultTuple(result),
tableName, connection));
}
//Force update cache and retry if meta not found error occurs
catch (MetaDataEntityNotFoundException e) {
if (doRetryOnMetaNotFoundError && e.getTableName() != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Reloading table {} data from server", e.getTableName());
}
if (new MetaDataClient(connection).updateCache(connection.getTenantId(),
e.getSchemaName(), e.getTableName(), true).wasUpdated()) {
return executeMutation(stmt, false, queryLogger,
returnResult);
}
}
throw e;
}catch (RuntimeException e) {
// FIXME: Expression.evaluate does not throw SQLException
// so this will unwrap throws from that.
if (e.getCause() instanceof SQLException) {
throw (SQLException) e.getCause();
}
throw e;
} finally {
// Regardless of whether the mutation was successfully handled or not,
// update the time spent so far. If needed, we can separate out the
// success times and failure times.
if (tableName != null) {
// Counts for both ddl and dml
TableMetricsManager.updateMetricsMethod(tableName,
MUTATION_SQL_COUNTER, 1);
// Only count dml operations
if (isUpsert || isDelete) {
long executeMutationTimeSpent =
EnvironmentEdgeManager.currentTimeMillis() - startExecuteMutationTime;
TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
UPSERT_SQL_COUNTER : DELETE_SQL_COUNTER, 1);
TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
UPSERT_SQL_QUERY_TIME : DELETE_SQL_QUERY_TIME, executeMutationTimeSpent);
if (isAtomicUpsert) {
TableMetricsManager.updateMetricsMethod(tableName,
ATOMIC_UPSERT_SQL_COUNTER, 1);
TableMetricsManager.updateMetricsMethod(tableName,
ATOMIC_UPSERT_SQL_QUERY_TIME, executeMutationTimeSpent);
}
if (success) {
TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
UPSERT_SUCCESS_SQL_COUNTER : DELETE_SUCCESS_SQL_COUNTER, 1);
} else {
TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
UPSERT_FAILED_SQL_COUNTER : DELETE_FAILED_SQL_COUNTER, 1);
//Failures are updated for executeMutation phase and for autocommit=true case here.
TableMetricsManager.updateMetricsMethod(tableName, isUpsert ? UPSERT_AGGREGATE_FAILURE_SQL_COUNTER:
DELETE_AGGREGATE_FAILURE_SQL_COUNTER, 1);
}
if (plan instanceof DeleteCompiler.ServerSelectDeleteMutationPlan
|| plan instanceof UpsertCompiler.ServerUpsertSelectMutationPlan) {
TableMetricsManager.updateLatencyHistogramForMutations(
tableName, executeMutationTimeSpent, false);
// We won't have size histograms for delete mutations when auto commit is set to true and
// if plan is of ServerSelectDeleteMutationPlan or ServerUpsertSelectMutationPlan
// since the update happens on server.
} else {
state.addExecuteMutationTime(
executeMutationTimeSpent, tableName);
}
}
}
}
}
}, PhoenixContextExecutor.inContext(),
Tracing.withTracing(connection, this.toString()));
} catch (Exception e) {
if (queryLogger.isAuditLoggingEnabled()) {
queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt));
queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString());
queryLogger.syncAudit();
}
Throwables.propagateIfInstanceOf(e, SQLException.class);
Throwables.propagate(e);
throw new IllegalStateException(); // Can't happen as Throwables.propagate() always throws
}
}