private void sendMutations()

in phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java [1386:1678]


    private void sendMutations(Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator, Span span, ImmutableBytesWritable indexMetaDataPtr, boolean isVerifiedPhase)
            throws SQLException {
        while (mutationsIterator.hasNext()) {
            Entry<TableInfo, List<Mutation>> pair = mutationsIterator.next();
            TableInfo tableInfo = pair.getKey();
            byte[] htableName = tableInfo.getHTableName().getBytes();
            String htableNameStr = tableInfo.getHTableName().getString();
            List<Mutation> mutationList = pair.getValue();
            List<List<Mutation>> mutationBatchList =
                    getMutationBatchList(batchSize, batchSizeBytes, mutationList);
            int totalBatchCount = mutationBatchList.size();

            // create a span per target table
            // TODO maybe we can be smarter about the table name to string here?
            Span child = Tracing.child(span, "Writing mutation batch for table: " + Bytes.toString(htableName));

            int retryCount = 0;
            boolean shouldRetry = false;
            long numMutations = 0;
            long mutationSizeBytes = 0;
            long mutationCommitTime = 0;
            long numFailedMutations = 0;
            long numFailedPhase3Mutations = 0;

            long startTime = EnvironmentEdgeManager.currentTimeMillis();
            MutationBytes totalMutationBytesObject = null;
            boolean shouldRetryIndexedMutation = false;
            IndexWriteException iwe = null;
            do {
                TableRef origTableRef = tableInfo.getOrigTableRef();
                PTable table = origTableRef.getTable();
                table.getIndexMaintainers(indexMetaDataPtr, connection);
                final ServerCache cache = tableInfo.isDataTable() ?
                        IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table,
                                mutationList, indexMetaDataPtr) : null;
                // no-op if table doesn't have Conditional TTL
                ScanUtil.annotateMutationWithConditionalTTL(connection, tableInfo.getPTable(),
                        mutationList);
                // If we haven't retried yet, retry for this case only, as it's possible that
                // a split will occur after we send the index metadata cache to all known
                // region servers.
                shouldRetry = cache != null;
                SQLException sqlE = null;
                Table hTable = connection.getQueryServices().getTable(htableName);
                List<Mutation> currentMutationBatch = null;
                boolean areAllBatchesSuccessful = false;
                Object[] resultObjects = null;

                try {
                    if (table.isTransactional()) {
                        // Track tables to which we've sent uncommitted data
                        if (tableInfo.isDataTable()) {
                            uncommittedPhysicalNames.add(table.getPhysicalName().getString());
                            phoenixTransactionContext.markDMLFence(table);
                        }
                        // Only pass true for last argument if the index is being written to on it's own (i.e. initial
                        // index population), not if it's being written to for normal maintenance due to writes to
                        // the data table. This case is different because the initial index population does not need
                        // to be done transactionally since the index is only made active after all writes have
                        // occurred successfully.
                        hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX);
                    }
                    numMutations = mutationList.size();
                    GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
                    totalMutationBytesObject = calculateMutationSize(mutationList, true);

                    child.addTimelineAnnotation("Attempt " + retryCount);
                    Iterator<List<Mutation>> itrListMutation = mutationBatchList.iterator();
                    while (itrListMutation.hasNext()) {
                        final List<Mutation> mutationBatch = itrListMutation.next();
                        currentMutationBatch = mutationBatch;
                        if (connection.getAutoCommit() && mutationBatch.size() == 1) {
                            resultObjects = new Object[mutationBatch.size()];
                        }
                        if (shouldRetryIndexedMutation) {
                            // if there was an index write failure, retry the mutation in a loop
                            final Table finalHTable = hTable;
                            final ImmutableBytesWritable finalindexMetaDataPtr =
                                    indexMetaDataPtr;
                            final PTable finalPTable = table;
                            final Object[] finalResultObjects = resultObjects;
                            PhoenixIndexFailurePolicyHelper.doBatchWithRetries(new MutateCommand() {
                                @Override
                                public void doMutation() throws IOException {
                                    try {
                                        finalHTable.batch(mutationBatch, finalResultObjects);
                                    } catch (InterruptedException e) {
                                        Thread.currentThread().interrupt();
                                        throw new IOException(e);
                                    } catch (IOException e) {
                                        e = updateTableRegionCacheIfNecessary(e);
                                        throw e;
                                    }
                                }

                                @Override
                                public List<Mutation> getMutationList() {
                                    return mutationBatch;
                                }

                                private IOException
                                updateTableRegionCacheIfNecessary(IOException ioe) {
                                    SQLException sqlE =
                                            ClientUtil.parseLocalOrRemoteServerException(ioe);
                                    if (sqlE != null
                                            && sqlE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND
                                            .getErrorCode()) {
                                        try {
                                            connection.getQueryServices().clearTableRegionCache(
                                                    finalHTable.getName());
                                            IndexMetaDataCacheClient.setMetaDataOnMutations(
                                                    connection, finalPTable, mutationBatch,
                                                    finalindexMetaDataPtr);
                                        } catch (SQLException e) {
                                            return ClientUtil.createIOException(
                                                    "Exception during updating index meta data cache",
                                                    ioe);
                                        }
                                    }
                                    return ioe;
                                }
                            }, iwe, connection, connection.getQueryServices().getProps());
                            shouldRetryIndexedMutation = false;
                        } else {
                            hTable.batch(mutationBatch, resultObjects);
                        }

                        if (resultObjects != null) {
                            Result result = (Result) resultObjects[0];
                            if (result != null && !result.isEmpty()) {
                                Cell cell = result.getColumnLatestCell(
                                        Bytes.toBytes(UPSERT_CF), Bytes.toBytes(UPSERT_STATUS_CQ));
                                numUpdatedRowsForAutoCommit = PInteger.INSTANCE.getCodec()
                                        .decodeInt(cell.getValueArray(), cell.getValueOffset(),
                                                SortOrder.getDefault());
                                if (this.returnResult != null) {
                                    if (this.returnResult == ReturnResult.ROW) {
                                        this.result = result;
                                    }
                                }
                            } else {
                                numUpdatedRowsForAutoCommit = 1;
                            }
                        }

                        // remove each batch from the list once it gets applied
                        // so when failures happens for any batch we only start
                        // from that batch only instead of doing duplicate reply of already
                        // applied batches from entire list, also we can set
                        // REPLAY_ONLY_INDEX_WRITES for first batch
                        // only in case of 1121 SQLException
                        itrListMutation.remove();
                        batchCount++;
                        if (LOGGER.isDebugEnabled())
                            LOGGER.debug("Sent batch of " + mutationBatch.size() + " for "
                                    + Bytes.toString(htableName));
                    }
                    child.stop();
                    child.stop();
                    shouldRetry = false;
                    numFailedMutations = 0;

                    // Remove batches as we process them
                    removeMutations(this.mutationsMap, origTableRef);
                    if (tableInfo.isDataTable()) {
                        numRows -= numMutations;
                        // recalculate the estimated size
                        estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSizeWithBatch(this.mutationsMap);
                    }
                    areAllBatchesSuccessful = true;
                } catch (Exception e) {
                    long serverTimestamp = ClientUtil.parseServerTimestamp(e);
                    SQLException inferredE = ClientUtil.parseServerExceptionOrNull(e);
                    if (inferredE != null) {
                        if (shouldRetry
                                && retryCount == 0
                                && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND
                                .getErrorCode()) {
                            // Swallow this exception once, as it's possible that we split after sending the index
                            // metadata
                            // and one of the region servers doesn't have it. This will cause it to have it the next
                            // go around.
                            // If it fails again, we don't retry.
                            String msg = "Swallowing exception and retrying after clearing meta cache on connection. "
                                    + inferredE;
                            LOGGER.warn(LogUtil.addCustomAnnotations(msg, connection));
                            connection.getQueryServices().clearTableRegionCache(TableName.valueOf(htableName));

                            // add a new child span as this one failed
                            child.addTimelineAnnotation(msg);
                            child.stop();
                            child = Tracing.child(span, "Failed batch, attempting retry");

                            continue;
                        } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
                            iwe = PhoenixIndexFailurePolicyHelper.getIndexWriteException(inferredE);
                            if (iwe != null && !shouldRetryIndexedMutation) {
                                // For an index write failure, the data table write succeeded,
                                // so when we retry we need to set REPLAY_WRITES
                                // for first batch in list only.
                                for (Mutation m : mutationBatchList.get(0)) {
                                    if (!PhoenixIndexMetaData.isIndexRebuild(
                                            m.getAttributesMap())){
                                        m.setAttribute(BaseScannerRegionObserverConstants.REPLAY_WRITES,
                                                BaseScannerRegionObserverConstants.REPLAY_ONLY_INDEX_WRITES
                                        );
                                    }
                                    PhoenixKeyValueUtil.setTimestamp(m, serverTimestamp);
                                }
                                shouldRetry = true;
                                shouldRetryIndexedMutation = true;
                                continue;
                            }
                        }
                        e = inferredE;
                    }
                    // Throw to client an exception that indicates the statements that
                    // were not committed successfully.
                    int[] uncommittedStatementIndexes = getUncommittedStatementIndexes();
                    sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp);

                    numFailedMutations = uncommittedStatementIndexes.length;

                    if (isVerifiedPhase) {
                        numFailedPhase3Mutations = numFailedMutations;
                        GLOBAL_MUTATION_INDEX_COMMIT_FAILURE_COUNT.update(numFailedPhase3Mutations);
                    }
                } finally {
                    mutationCommitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
                    GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
                    MutationMetric failureMutationMetrics = MutationMetric.EMPTY_METRIC;
                    if (!areAllBatchesSuccessful) {
                        failureMutationMetrics =
                                updateMutationBatchFailureMetrics(currentMutationBatch,
                                        htableNameStr, numFailedMutations,
                                        table.isTransactional());
                    }

                    MutationMetric committedMutationsMetric =
                            getCommittedMutationsMetric(
                                    totalMutationBytesObject,
                                    mutationBatchList,
                                    numMutations,
                                    numFailedMutations,
                                    numFailedPhase3Mutations,
                                    mutationCommitTime, totalBatchCount);
                    // Combine failure mutation metrics with committed ones for the final picture
                    committedMutationsMetric.combineMetric(failureMutationMetrics);
                    mutationMetricQueue.addMetricsForTable(htableNameStr, committedMutationsMetric);

                    if (allUpsertsMutations ^ allDeletesMutations) {
                        //success cases are updated for both cases autoCommit=true and conn.commit explicit
                        if (areAllBatchesSuccessful){
                            TableMetricsManager
                                    .updateMetricsMethod(htableNameStr, allUpsertsMutations ? UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER :
                                            DELETE_AGGREGATE_SUCCESS_SQL_COUNTER, 1);
                        }
                        //Failures cases are updated only for conn.commit explicit case.
                        if (!areAllBatchesSuccessful && !connection.getAutoCommit()){
                            TableMetricsManager.updateMetricsMethod(htableNameStr, allUpsertsMutations ? UPSERT_AGGREGATE_FAILURE_SQL_COUNTER :
                                    DELETE_AGGREGATE_FAILURE_SQL_COUNTER, 1);
                        }
                        // Update size and latency histogram metrics.
                        TableMetricsManager.updateSizeHistogramMetricsForMutations(htableNameStr,
                                committedMutationsMetric.getTotalMutationsSizeBytes().getValue(), allUpsertsMutations);
                        Long latency = timeInExecuteMutationMap.get(htableNameStr);
                        if (latency == null) {
                            latency = 0l;
                        }
                        latency += mutationCommitTime;
                        TableMetricsManager.updateLatencyHistogramForMutations(htableNameStr,
                                latency, allUpsertsMutations);
                    }
                    resetAllMutationState();

                    try {
                        if (cache != null) cache.close();
                    } finally {
                        try {
                            hTable.close();
                        } catch (IOException e) {
                            if (sqlE != null) {
                                sqlE.setNextException(ClientUtil.parseServerException(e));
                            } else {
                                sqlE = ClientUtil.parseServerException(e);
                            }
                        }
                        if (sqlE != null) { throw sqlE; }
                    }
                }
            } while (shouldRetry && retryCount++ < 1);
        }
    }