private Pair executeMutation()

in phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java [598:778]


    private Pair<Integer, ResultSet> executeMutation(final CompilableStatement stmt,
        final boolean doRetryOnMetaNotFoundError, final AuditQueryLogger queryLogger,
        final ReturnResult returnResult) throws SQLException {
        if (connection.isReadOnly()) {
            throw new SQLExceptionInfo.Builder(
                SQLExceptionCode.READ_ONLY_CONNECTION).
                build().buildException();
        }
	    GLOBAL_MUTATION_SQL_COUNTER.increment();
        try {
            return CallRunner
                   .run(
                           new CallRunner.CallableThrowable<Pair<Integer, ResultSet>, SQLException>() {
                        @Override
                            public Pair<Integer, ResultSet> call() throws SQLException {
                            boolean success = false;
                            String tableName = null;
                            boolean isUpsert = false;
                            boolean isAtomicUpsert = false;
                            boolean isDelete = false;
                            MutationState state = null;
                            MutationPlan plan = null;
                            final long startExecuteMutationTime = EnvironmentEdgeManager.currentTimeMillis();
                            clearResultSet();
                            try {
                                PhoenixConnection conn = getConnection();
                                if (conn.getQueryServices().isUpgradeRequired() && !conn.isRunningUpgrade()
                                        && stmt.getOperation() != Operation.UPGRADE) {
                                    throw new UpgradeRequiredException();
                                }
                                state = connection.getMutationState();
                                isUpsert = stmt instanceof ExecutableUpsertStatement;
                                isDelete = stmt instanceof ExecutableDeleteStatement;
                                if (isDelete && connection.getAutoCommit() &&
                                        returnResult == ReturnResult.ROW) {
                                    // used only if single row deletion needs to atomically
                                    // return row that is deleted.
                                    plan = ((ExecutableDeleteStatement) stmt).compilePlan(
                                            PhoenixStatement.this,
                                            Sequence.ValueOp.VALIDATE_SEQUENCE, returnResult);
                                } else {
                                    plan = stmt.compilePlan(PhoenixStatement.this,
                                            Sequence.ValueOp.VALIDATE_SEQUENCE);
                                }
                                isAtomicUpsert = isUpsert && ((ExecutableUpsertStatement)stmt).getOnDupKeyPairs() != null;
                                if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null) {
                                    if (!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString())) {
                                        tableName = plan.getTargetRef().getTable().getPhysicalName().toString();
                                    }
                                    if (plan.getTargetRef().getTable().isTransactional()) {
                                        state.startTransaction(plan.getTargetRef().getTable().getTransactionProvider());
                                    }
                                }
                                Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator();
                                state.sendUncommitted(tableRefs);
                                state.checkpointIfNeccessary(plan);
                                checkIfDDLStatementandMutationState(stmt, state);
                                MutationState lastState = plan.execute();
                                state.join(lastState);
                                // Unfortunately, JDBC uses an int for update count, so we
                                // just max out at Integer.MAX_VALUE
                                int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE,
                                        lastState.getUpdateCount());
                                Result result = null;
                                if (connection.getAutoCommit()) {
                                    boolean singleRowUpdate = isSingleRowUpdatePlan(isUpsert,
                                            isDelete, plan);
                                    if (singleRowUpdate) {
                                        state.setReturnResult(returnResult);
                                    }
                                    connection.commit();
                                    if (isAtomicUpsert) {
                                        lastUpdateCount = connection.getMutationState()
                                                .getNumUpdatedRowsForAutoCommit();
                                    }
                                    result = connection.getMutationState().getResult();
                                    connection.getMutationState().clearResult();
                                    result = getResult(singleRowUpdate, isDelete, plan,
                                            lastState, result);
                                }
                                setLastQueryPlan(null);
                                setLastUpdateCount(lastUpdateCount);
                                setLastUpdateOperation(stmt.getOperation());
                                setLastUpdateTable(tableName == null ? TABLE_UNKNOWN : tableName);
                                connection.incrementStatementExecutionCounter();
                                if (queryLogger.isAuditLoggingEnabled()) {
                                    queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt));
                                    queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.COMPLETED.toString());
                                    queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, lastUpdateCount);
                                    queryLogger.syncAudit();
                                }
                                success = true;
                                return new Pair<>(lastUpdateCount,
                                    result == null || result.isEmpty() ?
                                        null : TupleUtil.getResultSet(new ResultTuple(result),
                                        tableName, connection));
                            }
                            //Force update cache and retry if meta not found error occurs
                            catch (MetaDataEntityNotFoundException e) {
                                if (doRetryOnMetaNotFoundError && e.getTableName() != null) {
                                    if (LOGGER.isDebugEnabled()) {
                                        LOGGER.debug("Reloading table {} data from server",  e.getTableName());
                                    }
                                    if (new MetaDataClient(connection).updateCache(connection.getTenantId(),
                                        e.getSchemaName(), e.getTableName(), true).wasUpdated()) {
                                        return executeMutation(stmt, false, queryLogger,
                                                returnResult);
                                    }
                                }
                                throw e;
                            }catch (RuntimeException e) {
                                // FIXME: Expression.evaluate does not throw SQLException
                                // so this will unwrap throws from that.
                                if (e.getCause() instanceof SQLException) {
                                    throw (SQLException) e.getCause();
                                }
                                throw e;
                            } finally {
                                // Regardless of whether the mutation was successfully handled or not,
                                // update the time spent so far. If needed, we can separate out the
                                // success times and failure times.
                                if (tableName != null) {
                                    // Counts for both ddl and dml
                                    TableMetricsManager.updateMetricsMethod(tableName,
                                            MUTATION_SQL_COUNTER, 1);
                                    // Only count dml operations
                                    if (isUpsert || isDelete) {
                                        long executeMutationTimeSpent =
                                                EnvironmentEdgeManager.currentTimeMillis() - startExecuteMutationTime;

                                        TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
                                                UPSERT_SQL_COUNTER : DELETE_SQL_COUNTER, 1);
                                        TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
                                                UPSERT_SQL_QUERY_TIME : DELETE_SQL_QUERY_TIME, executeMutationTimeSpent);
                                        if (isAtomicUpsert) {
                                            TableMetricsManager.updateMetricsMethod(tableName,
                                                ATOMIC_UPSERT_SQL_COUNTER, 1);
                                            TableMetricsManager.updateMetricsMethod(tableName,
                                                ATOMIC_UPSERT_SQL_QUERY_TIME, executeMutationTimeSpent);
                                        }

                                        if (success) {
                                            TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
                                                    UPSERT_SUCCESS_SQL_COUNTER : DELETE_SUCCESS_SQL_COUNTER, 1);
                                        } else {
                                            TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
                                                    UPSERT_FAILED_SQL_COUNTER : DELETE_FAILED_SQL_COUNTER, 1);
                                            //Failures are updated for executeMutation phase and for autocommit=true case here.
                                            TableMetricsManager.updateMetricsMethod(tableName, isUpsert ? UPSERT_AGGREGATE_FAILURE_SQL_COUNTER:
                                                    DELETE_AGGREGATE_FAILURE_SQL_COUNTER, 1);
                                        }
                                        if (plan instanceof DeleteCompiler.ServerSelectDeleteMutationPlan
                                                || plan instanceof UpsertCompiler.ServerUpsertSelectMutationPlan) {
                                            TableMetricsManager.updateLatencyHistogramForMutations(
                                                    tableName, executeMutationTimeSpent, false);
                                            // We won't have size histograms for delete mutations when auto commit is set to true and
                                            // if plan is of ServerSelectDeleteMutationPlan or ServerUpsertSelectMutationPlan
                                            // since the update happens on server.
                                        } else {
                                            state.addExecuteMutationTime(
                                                    executeMutationTimeSpent, tableName);
                                        }
                                    }
                                }

                            }
                        }
                    }, PhoenixContextExecutor.inContext(),
                        Tracing.withTracing(connection, this.toString()));
        } catch (Exception e) {
            if (queryLogger.isAuditLoggingEnabled()) {
                queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt));
                queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
                queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString());
                queryLogger.syncAudit();
            }
            Throwables.propagateIfInstanceOf(e, SQLException.class);
            Throwables.propagate(e);
            throw new IllegalStateException(); // Can't happen as Throwables.propagate() always throws
        }
    }