internal OperationStatus InternalUpsert()

in cs/src/core/Index/FASTER/FASTERImpl.cs [340:473]


        internal OperationStatus InternalUpsert<Input, Output, Context, FasterSession>(
                            ref Key key, ref Value value,
                            ref Context userContext,
                            ref PendingContext<Input, Output, Context> pendingContext,
                            FasterSession fasterSession,
                            FasterExecutionContext<Input, Output, Context> sessionCtx,
                            long lsn)
            where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
        {
            var bucket = default(HashBucket*);
            var slot = default(int);
            var status = default(OperationStatus);
            var latchOperation = LatchOperation.None;
            var latchDestination = LatchDestination.NormalProcessing;

            var hash = comparer.GetHashCode64(ref key);
            var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);

            if (sessionCtx.phase != Phase.REST)
                HeavyEnter(hash, sessionCtx, fasterSession);

#region Trace back for record in in-memory HybridLog
            var entry = default(HashBucketEntry);
            FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);
            var logicalAddress = entry.Address;
            var physicalAddress = default(long);

            if (UseReadCache)
                SkipAndInvalidateReadCache(ref logicalAddress, ref key);
            var latestLogicalAddress = logicalAddress;

            if (logicalAddress >= hlog.ReadOnlyAddress)
            {
                physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
                if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
                {
                    logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
                    TraceBackForKeyMatch(ref key,
                                        logicalAddress,
                                        hlog.ReadOnlyAddress,
                                        out logicalAddress,
                                        out physicalAddress);
                }
            }
            #endregion

            // Optimization for the most common case
            if (sessionCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress)
            {
                ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
                if (!recordInfo.Tombstone
                    && fasterSession.ConcurrentWriter(ref key, ref value, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress))
                {
                    hlog.MarkPage(logicalAddress, sessionCtx.version);
                    return OperationStatus.SUCCESS;
                }
                goto CreateNewRecord;
            }

#region Entry latch operation
            if (sessionCtx.phase != Phase.REST)
            {
                latchDestination = AcquireLatchUpsert(sessionCtx, bucket, ref status, ref latchOperation, ref entry, logicalAddress);
            }
            #endregion


            #region Normal processing

            // Mutable Region: Update the record in-place
            if (latchDestination == LatchDestination.NormalProcessing)
            {
                if (logicalAddress >= hlog.ReadOnlyAddress)
                {
                    ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
                    if (!recordInfo.Tombstone
                        && fasterSession.ConcurrentWriter(ref key, ref value, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress))
                    {
                        if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version);
                        else hlog.MarkPageAtomic(logicalAddress, sessionCtx.version);
                        status = OperationStatus.SUCCESS;
                        goto LatchRelease; // Release shared latch (if acquired)
                    }
                }
            }

        // All other regions: Create a record in the mutable region
#endregion

#region Create new record in the mutable region
        CreateNewRecord:
            if (latchDestination != LatchDestination.CreatePendingContext)
            {
                // Immutable region or new record
                status = CreateNewRecordUpsert(ref key, ref value, ref pendingContext, fasterSession, sessionCtx, bucket, slot, tag, entry, latestLogicalAddress);
                if (status != OperationStatus.ALLOCATE_FAILED)
                    goto LatchRelease;
                latchDestination = LatchDestination.CreatePendingContext;
            }
            #endregion

            #region Create pending context
            Debug.Assert(latchDestination == LatchDestination.CreatePendingContext, $"Upsert CreatePendingContext encountered latchDest == {latchDestination}");
            {
                pendingContext.type = OperationType.UPSERT;
                if (pendingContext.key == default) pendingContext.key = hlog.GetKeyContainer(ref key);
                if (pendingContext.value == default) pendingContext.value = hlog.GetValueContainer(ref value);
                pendingContext.userContext = userContext;
                pendingContext.entry.word = entry.word;
                pendingContext.logicalAddress = logicalAddress;
                pendingContext.version = sessionCtx.version;
                pendingContext.serialNum = lsn;
            }
#endregion

#region Latch release
        LatchRelease:
            {
                switch (latchOperation)
                {
                    case LatchOperation.Shared:
                        HashBucket.ReleaseSharedLatch(bucket);
                        break;
                    case LatchOperation.Exclusive:
                        HashBucket.ReleaseExclusiveLatch(bucket);
                        break;
                    default:
                        break;
                }
            }
#endregion

            return status;
        }