internal OperationStatus InternalRead()

in cs/src/core/Index/FASTER/FASTERImpl.cs [98:296]


        internal OperationStatus InternalRead<Input, Output, Context, Functions>(
                                    ref Key key,
                                    ref Input input,
                                    ref Output output,
                                    long startAddress,
                                    ref Context userContext,
                                    ref PendingContext<Input, Output, Context> pendingContext,
                                    Functions fasterSession,
                                    FasterExecutionContext<Input, Output, Context> sessionCtx,
                                    long lsn)
            where Functions : IFasterSession<Key, Value, Input, Output, Context>
        {
            var bucket = default(HashBucket*);
            var slot = default(int);
            var physicalAddress = default(long);
            var heldOperation = LatchOperation.None;

            var hash = comparer.GetHashCode64(ref key);
            var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);

            if (sessionCtx.phase != Phase.REST)
                HeavyEnter(hash, sessionCtx, fasterSession);

            #region Trace back for record in in-memory HybridLog
            HashBucketEntry entry = default;

            OperationStatus status;
            long logicalAddress;
            var useStartAddress = startAddress != Constants.kInvalidAddress && !pendingContext.HasMinAddress;
            bool tagExists;
            if (!useStartAddress)
            {
                tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry) && entry.Address >= pendingContext.minAddress;
            }
            else
            {
                tagExists = startAddress >= hlog.BeginAddress;
                entry.Address = startAddress;
            }

            if (tagExists)
            {
                logicalAddress = entry.Address;

                if (UseReadCache)
                {
                    if (pendingContext.SkipReadCache || pendingContext.NoKey)
                    {
                        SkipReadCache(ref logicalAddress);
                    }
                    else if (ReadFromCache(ref key, ref logicalAddress, ref physicalAddress))
                    {
                        if (sessionCtx.phase == Phase.PREPARE && CheckBucketVersionNew(ref entry, sessionCtx))
                        {
                            status = OperationStatus.CPR_SHIFT_DETECTED;
                            goto CreatePendingContext; // Pivot thread
                        }

                        // This is not called when looking up by address, so we do not set pendingContext.recordInfo.
                        // ReadCache addresses are not valid for indexing etc. so pass kInvalidAddress.
                        fasterSession.SingleReader(ref key, ref input, ref readcache.GetValue(physicalAddress), ref output, Constants.kInvalidAddress);
                        return OperationStatus.SUCCESS;
                    }
                }

                if (logicalAddress >= hlog.HeadAddress)
                {
                    physicalAddress = hlog.GetPhysicalAddress(logicalAddress);

                    if (!pendingContext.NoKey)
                    {
                        if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
                        {
                            logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
                            TraceBackForKeyMatch(ref key,
                                                    logicalAddress,
                                                    hlog.HeadAddress,
                                                    out logicalAddress,
                                                    out physicalAddress);
                        }
                    } else
                    {
                        // If NoKey, we do not have the key in the call and must use the key from the record.
                        key = ref hlog.GetKey(physicalAddress);
                    }
                }
            }
            else
            {
                // no tag found
                return OperationStatus.NOTFOUND;
            }
            #endregion

            if (sessionCtx.phase == Phase.PREPARE && CheckBucketVersionNew(ref entry, sessionCtx))
            {
                status = OperationStatus.CPR_SHIFT_DETECTED;
                goto CreatePendingContext; // Pivot thread
            }

#region Normal processing

            // Mutable region (even fuzzy region is included here)
            if (logicalAddress >= hlog.SafeReadOnlyAddress)
            {
                ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
                pendingContext.recordInfo = recordInfo;
                if (!pendingContext.recordInfo.Tombstone)
                {
                    fasterSession.ConcurrentReader(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress);
                    return OperationStatus.SUCCESS;
                }
                return OperationStatus.NOTFOUND;
            }

            // Immutable region
            else if (logicalAddress >= hlog.HeadAddress)
            {
                pendingContext.recordInfo = hlog.GetInfo(physicalAddress);
                if (!pendingContext.recordInfo.Tombstone)
                {
                    fasterSession.SingleReader(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, logicalAddress);
                    if (CopyReadsToTail == CopyReadsToTail.FromReadOnly && !pendingContext.SkipCopyReadsToTail)
                    {
                        var container = hlog.GetValueContainer(ref hlog.GetValue(physicalAddress));
                        InternalTryCopyToTail(ref key, ref container.Get(), ref pendingContext.recordInfo, logicalAddress, fasterSession, sessionCtx);
                        container.Dispose();
                    }
                    return OperationStatus.SUCCESS;
                }
                return OperationStatus.NOTFOUND;
            }

            // On-Disk Region
            else if (logicalAddress >= hlog.BeginAddress)
            {
                if (hlog.IsNullDevice)
                    return OperationStatus.NOTFOUND;

                status = OperationStatus.RECORD_ON_DISK;
                if (sessionCtx.phase == Phase.PREPARE)
                {
                    Debug.Assert(heldOperation != LatchOperation.Exclusive);
                    if (useStartAddress)
                    {
                        Debug.Assert(heldOperation == LatchOperation.None);
                    }
                    else if (heldOperation == LatchOperation.Shared || HashBucket.TryAcquireSharedLatch(bucket))
                    {
                        heldOperation = LatchOperation.Shared;
                    }
                    else
                    {
                        status = OperationStatus.CPR_SHIFT_DETECTED;
                    }

                    if (RelaxedCPR) // don't hold on to shared latched during IO
                    {
                        if (heldOperation == LatchOperation.Shared)
                            HashBucket.ReleaseSharedLatch(bucket);
                        heldOperation = LatchOperation.None;
                    }
                }

                goto CreatePendingContext;
            }

            // No record found
            else
            {
                return OperationStatus.NOTFOUND;
            }

#endregion

#region Create pending context
        CreatePendingContext:
            {
                pendingContext.type = OperationType.READ;
                if (!pendingContext.NoKey && pendingContext.key == default)    // If this is true, we don't have a valid key
                    pendingContext.key = hlog.GetKeyContainer(ref key);
                if (pendingContext.input == default) pendingContext.input = fasterSession.GetHeapContainer(ref input);

                pendingContext.output = output;
                if (pendingContext.output is IHeapConvertible heapConvertible)
                    heapConvertible.ConvertToHeap();

                pendingContext.userContext = userContext;
                pendingContext.entry.word = entry.word;
                pendingContext.logicalAddress = logicalAddress;
                pendingContext.version = sessionCtx.version;
                pendingContext.serialNum = lsn;
                pendingContext.heldLatch = heldOperation;
                pendingContext.recordInfo.PreviousAddress = startAddress;
            }
#endregion

            return status;
        }