private Future applyInternal()

in src/java/org/apache/cassandra/db/Keyspace.java [445:597]


    private Future<?> applyInternal(final Mutation mutation,
                                               final boolean makeDurable,
                                               boolean updateIndexes,
                                               boolean isDroppable,
                                               boolean isDeferrable,
                                               Promise<?> future)
    {
        if (TEST_FAIL_WRITES && getMetadata().name.equals(TEST_FAIL_WRITES_KS))
            throw new RuntimeException("Testing write failures");

        Lock[] locks = null;

        boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(mutation, false);

        if (requiresViewUpdate)
        {
            Mutation.viewLockAcquireStartUpdater.compareAndSet(mutation, 0L, currentTimeMillis());

            // the order of lock acquisition doesn't matter (from a deadlock perspective) because we only use tryLock()
            Collection<TableId> tableIds = mutation.getTableIds();
            Iterator<TableId> idIterator = tableIds.iterator();

            locks = new Lock[tableIds.size()];
            for (int i = 0; i < tableIds.size(); i++)
            {
                TableId tableId = idIterator.next();
                int lockKey = Objects.hash(mutation.key().getKey(), tableId);
                while (true)
                {
                    Lock lock = null;

                    if (TEST_FAIL_MV_LOCKS_COUNT == 0)
                        lock = ViewManager.acquireLockFor(lockKey);
                    else
                        TEST_FAIL_MV_LOCKS_COUNT--;

                    if (lock == null)
                    {
                        //throw WTE only if request is droppable
                        if (isDroppable && (approxTime.isAfter(mutation.approxCreatedAtNanos + DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS))))
                        {
                            for (int j = 0; j < i; j++)
                                locks[j].unlock();

                            if (logger.isTraceEnabled())
                                logger.trace("Could not acquire lock for {} and table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), columnFamilyStores.get(tableId).name);
                            Tracing.trace("Could not acquire MV lock");
                            if (future != null)
                            {
                                future.tryFailure(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
                                return future;
                            }
                            else
                                throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
                        }
                        else if (isDeferrable)
                        {
                            for (int j = 0; j < i; j++)
                                locks[j].unlock();

                            // This view update can't happen right now. so rather than keep this thread busy
                            // we will re-apply ourself to the queue and try again later
                            Stage.MUTATION.execute(() ->
                                                   applyInternal(mutation, makeDurable, true, isDroppable, true, future)
                            );
                            return future;
                        }
                        else
                        {
                            // Retry lock on same thread, if mutation is not deferrable.
                            // Mutation is not deferrable, if applied from MutationStage and caller is waiting for future to finish
                            // If blocking caller defers future, this may lead to deadlock situation with all MutationStage workers
                            // being blocked by waiting for futures which will never be processed as all workers are blocked
                            try
                            {
                                // Wait a little bit before retrying to lock
                                Thread.sleep(10);
                            }
                            catch (InterruptedException e)
                            {
                                throw new UncheckedInterruptedException(e);
                            }
                            continue;
                        }
                    }
                    else
                    {
                        locks[i] = lock;
                    }
                    break;
                }
            }

            long acquireTime = currentTimeMillis() - Mutation.viewLockAcquireStartUpdater.get(mutation);
            // Metrics are only collected for droppable write operations
            // Bulk non-droppable operations (e.g. commitlog replay, hint delivery) are not measured
            if (isDroppable)
            {
                for(TableId tableId : tableIds)
                    columnFamilyStores.get(tableId).metric.viewLockAcquireTime.update(acquireTime, MILLISECONDS);
            }
        }
        try (WriteContext ctx = getWriteHandler().beginWrite(mutation, makeDurable))
        {
            ConsensusMigrationMutationHelper.validateSafeToExecuteNonTransactionally(mutation);
            for (PartitionUpdate upd : mutation.getPartitionUpdates())
            {
                ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().id);
                if (cfs == null)
                {
                    logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().id, upd.metadata().keyspace, upd.metadata().name);
                    continue;
                }
                AtomicLong baseComplete = null;

                if (requiresViewUpdate)
                {
                    baseComplete = new AtomicLong(Long.MAX_VALUE);
                    try
                    {
                        Tracing.trace("Creating materialized view mutations from base table replica");
                        viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, makeDurable, baseComplete);
                    }
                    catch (Throwable t)
                    {
                        JVMStabilityInspector.inspectThrowable(t);
                        logger.error(String.format("Unknown exception caught while attempting to update MaterializedView! %s",
                                                   upd.metadata().toString()), t);
                        throw t;
                    }
                }

                cfs.getWriteHandler().write(upd, ctx, updateIndexes);

                if (requiresViewUpdate)
                    baseComplete.set(currentTimeMillis());
            }

            if (future != null) {
                future.trySuccess(null);
            }
            return future;
        }
        finally
        {
            if (locks != null)
            {
                for (Lock lock : locks)
                    if (lock != null)
                        lock.unlock();
            }
        }
    }