internal OperationStatus InternalRMW()

in cs/src/core/Index/FASTER/FASTERImpl.cs [641:834]


        internal OperationStatus InternalRMW<Input, Output, Context, FasterSession>(
                                   ref Key key, ref Input input, ref Output output,
                                   ref Context userContext,
                                   ref PendingContext<Input, Output, Context> pendingContext,
                                   FasterSession fasterSession,
                                   FasterExecutionContext<Input, Output, Context> sessionCtx,
                                   long lsn)
            where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
        {
            var bucket = default(HashBucket*);
            var slot = default(int);
            var physicalAddress = default(long);
            var status = default(OperationStatus);
            var latchOperation = LatchOperation.None;
            var heldOperation = LatchOperation.None;
            var latchDestination = LatchDestination.NormalProcessing;

            var hash = comparer.GetHashCode64(ref key);
            var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);

            if (sessionCtx.phase != Phase.REST)
                HeavyEnter(hash, sessionCtx, fasterSession);

#region Trace back for record in in-memory HybridLog
            var entry = default(HashBucketEntry);
            FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);
            var logicalAddress = entry.Address;

            // For simplicity, we don't let RMW operations use read cache
            if (UseReadCache)
                SkipReadCache(ref logicalAddress);
            var latestLogicalAddress = logicalAddress;

            if (logicalAddress >= hlog.HeadAddress)
            {
                physicalAddress = hlog.GetPhysicalAddress(logicalAddress);

                if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
                {
                    logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
                    TraceBackForKeyMatch(ref key,
                                        logicalAddress,
                                        hlog.HeadAddress,
                                        out logicalAddress,
                                        out physicalAddress);
                }
            }
#endregion

            // Optimization for the most common case
            if (sessionCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress)
            {
                ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
                if (!recordInfo.Tombstone
                    && fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress))
                {
                    hlog.MarkPage(logicalAddress, sessionCtx.version);
                    return OperationStatus.SUCCESS;
                }
                goto CreateNewRecord;
            }

#region Entry latch operation
            if (sessionCtx.phase != Phase.REST)
            {
                latchDestination = AcquireLatchRMW(pendingContext, sessionCtx, bucket, ref status, ref latchOperation, ref entry, logicalAddress);
            }
            #endregion


            #region Normal processing

            // Mutable Region: Update the record in-place
            if (latchDestination == LatchDestination.NormalProcessing)
            {
                if (logicalAddress >= hlog.ReadOnlyAddress)
                {
                    ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
                    if (!recordInfo.Tombstone)
                    { 
                        if (FoldOverSnapshot)
                        {
                            Debug.Assert(recordInfo.Version == sessionCtx.version);
                        }

                        if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress))
                        {
                            if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version);
                            else hlog.MarkPageAtomic(logicalAddress, sessionCtx.version);
                            status = OperationStatus.SUCCESS;
                            goto LatchRelease; // Release shared latch (if acquired)
                        }
                    }
                }

                // Fuzzy Region: Must go pending due to lost-update anomaly
                else if (logicalAddress >= hlog.SafeReadOnlyAddress && !hlog.GetInfo(physicalAddress).Tombstone)
                {
                    status = OperationStatus.RETRY_LATER;
                    // Do not retain latch for pendings ops in relaxed CPR
                    if (!RelaxedCPR)
                    {
                        // Retain the shared latch (if acquired)
                        if (latchOperation == LatchOperation.Shared)
                        {
                            heldOperation = latchOperation;
                            latchOperation = LatchOperation.None;
                        }
                    }
                    latchDestination = LatchDestination.CreatePendingContext; // Go pending
                }

                // Safe Read-Only Region: Create a record in the mutable region
                else if (logicalAddress >= hlog.HeadAddress)
                {
                    goto CreateNewRecord;
                }

                // Disk Region: Need to issue async io requests
                else if (logicalAddress >= hlog.BeginAddress)
                {
                    status = OperationStatus.RECORD_ON_DISK;
                    // Do not retain latch for pendings ops in relaxed CPR
                    if (!RelaxedCPR)
                    {
                        // Retain the shared latch (if acquired)
                        if (latchOperation == LatchOperation.Shared)
                        {
                            heldOperation = latchOperation;
                            latchOperation = LatchOperation.None;
                        }
                    }
                    latchDestination = LatchDestination.CreatePendingContext; // Go pending
                }

                // No record exists - create new
                else
                {
                    goto CreateNewRecord;
                }
            }

#endregion

#region Create new record
        CreateNewRecord:
            if (latchDestination != LatchDestination.CreatePendingContext)
            {
                status = CreateNewRecordRMW(ref key, ref input, ref output, ref pendingContext, fasterSession, sessionCtx, bucket, slot, logicalAddress, physicalAddress, tag, entry, latestLogicalAddress);
                if (status != OperationStatus.ALLOCATE_FAILED)
                    goto LatchRelease;
                latchDestination = LatchDestination.CreatePendingContext;
            }
            #endregion

            #region Create failure context
            Debug.Assert(latchDestination == LatchDestination.CreatePendingContext, $"RMW CreatePendingContext encountered latchDest == {latchDestination}");
            {
                pendingContext.type = OperationType.RMW;
                if (pendingContext.key == default) pendingContext.key = hlog.GetKeyContainer(ref key);
                if (pendingContext.input == default) pendingContext.input = fasterSession.GetHeapContainer(ref input);

                pendingContext.output = output;
                if (pendingContext.output is IHeapConvertible heapConvertible)
                    heapConvertible.ConvertToHeap();

                pendingContext.userContext = userContext;
                pendingContext.entry.word = entry.word;
                pendingContext.logicalAddress = logicalAddress;
                pendingContext.version = sessionCtx.version;
                pendingContext.serialNum = lsn;
                pendingContext.heldLatch = heldOperation;
            }
#endregion

#region Latch release
        LatchRelease:
            {
                switch (latchOperation)
                {
                    case LatchOperation.Shared:
                        HashBucket.ReleaseSharedLatch(bucket);
                        break;
                    case LatchOperation.Exclusive:
                        HashBucket.ReleaseExclusiveLatch(bucket);
                        break;
                    default:
                        break;
                }
            }
#endregion

            return status;
        }