internal OperationStatus InternalContinuePendingRMW()

in cs/src/core/Index/FASTER/FASTERImpl.cs [1441:1563]


        internal OperationStatus InternalContinuePendingRMW<Input, Output, Context, FasterSession>(
                                    FasterExecutionContext<Input, Output, Context> opCtx,
                                    AsyncIOContext<Key, Value> request,
                                    ref PendingContext<Input, Output, Context> pendingContext,
                                    FasterSession fasterSession,
                                    FasterExecutionContext<Input, Output, Context> sessionCtx)
            where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
        {
            var bucket = default(HashBucket*);
            var slot = default(int);
            var logicalAddress = Constants.kInvalidAddress;
            var physicalAddress = default(long);
            var status = default(OperationStatus);
            ref Key key = ref pendingContext.key.Get();

            var hash = comparer.GetHashCode64(ref key);
            var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);

            while (true)
            {
#region Trace Back for Record on In-Memory HybridLog
                var entry = default(HashBucketEntry);
                FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);
                logicalAddress = entry.Address;

                // For simplicity, we don't let RMW operations use read cache
                if (UseReadCache)
                    SkipReadCache(ref logicalAddress);
                var latestLogicalAddress = logicalAddress;

                if (logicalAddress >= hlog.HeadAddress)
                {
                    physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
                    if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
                    {
                        logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
                        TraceBackForKeyMatch(ref key,
                                                logicalAddress,
                                                hlog.HeadAddress,
                                                out logicalAddress,
                                                out physicalAddress);
                    }
                }
#endregion

                var previousFirstRecordAddress = pendingContext.entry.Address;
                if (logicalAddress > previousFirstRecordAddress)
                {
                    break;
                }

#region Create record in mutable region

                if ((request.logicalAddress >= hlog.BeginAddress) && !hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone)
                {
                    if (!fasterSession.NeedCopyUpdate(ref key, ref pendingContext.input.Get(), ref hlog.GetContextRecordValue(ref request), ref pendingContext.output))
                    {
                        return OperationStatus.SUCCESS;
                    }
                }

                int actualSize, allocatedSize;
                if ((request.logicalAddress < hlog.BeginAddress) || (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone))
                {
                    (actualSize, allocatedSize) = hlog.GetInitialRecordSize(ref key, ref pendingContext.input.Get(), fasterSession);
                }
                else
                {
                    physicalAddress = (long)request.record.GetValidPointer();
                    (actualSize, allocatedSize) = hlog.GetRecordSize(physicalAddress, ref pendingContext.input.Get(), fasterSession);
                }
                BlockAllocate(allocatedSize, out long newLogicalAddress, sessionCtx, fasterSession);
                if (newLogicalAddress == 0)
                    return OperationStatus.ALLOCATE_FAILED;
                var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
                RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version,
                               tombstone:false, invalidBit:false,
                               latestLogicalAddress);
                hlog.Serialize(ref key, newPhysicalAddress);
                if ((request.logicalAddress < hlog.BeginAddress) || (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone))
                {
                    fasterSession.InitialUpdater(ref key,
                                             ref pendingContext.input.Get(), ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
                                             ref pendingContext.output);
                    status = OperationStatus.NOTFOUND;
                }
                else
                {
                    fasterSession.CopyUpdater(ref key,
                                          ref pendingContext.input.Get(), ref hlog.GetContextRecordValue(ref request),
                                          ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
                                          ref pendingContext.output);
                    status = OperationStatus.SUCCESS;
                }

                var updatedEntry = default(HashBucketEntry);
                updatedEntry.Tag = tag;
                updatedEntry.Address = newLogicalAddress & Constants.kAddressMask;
                updatedEntry.Pending = entry.Pending;
                updatedEntry.Tentative = false;

                var foundEntry = default(HashBucketEntry);
                foundEntry.word = Interlocked.CompareExchange(
                                            ref bucket->bucket_entries[slot],
                                            updatedEntry.word, entry.word);

                if (foundEntry.word == entry.word)
                {
                    return status;
                }
                else
                {
                    hlog.GetInfo(newPhysicalAddress).Invalid = true;
                }
#endregion
            }

            OperationStatus internalStatus;
            do
                internalStatus = InternalRMW(ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output, ref pendingContext.userContext, ref pendingContext, fasterSession, opCtx, pendingContext.serialNum);
            while (internalStatus == OperationStatus.RETRY_NOW);
            return internalStatus;
        }