in cs/src/core/Index/FASTER/FASTERImpl.cs [98:296]
internal OperationStatus InternalRead<Input, Output, Context, Functions>(
ref Key key,
ref Input input,
ref Output output,
long startAddress,
ref Context userContext,
ref PendingContext<Input, Output, Context> pendingContext,
Functions fasterSession,
FasterExecutionContext<Input, Output, Context> sessionCtx,
long lsn)
where Functions : IFasterSession<Key, Value, Input, Output, Context>
{
var bucket = default(HashBucket*);
var slot = default(int);
var physicalAddress = default(long);
var heldOperation = LatchOperation.None;
var hash = comparer.GetHashCode64(ref key);
var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);
if (sessionCtx.phase != Phase.REST)
HeavyEnter(hash, sessionCtx, fasterSession);
#region Trace back for record in in-memory HybridLog
HashBucketEntry entry = default;
OperationStatus status;
long logicalAddress;
var useStartAddress = startAddress != Constants.kInvalidAddress && !pendingContext.HasMinAddress;
bool tagExists;
if (!useStartAddress)
{
tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry) && entry.Address >= pendingContext.minAddress;
}
else
{
tagExists = startAddress >= hlog.BeginAddress;
entry.Address = startAddress;
}
if (tagExists)
{
logicalAddress = entry.Address;
if (UseReadCache)
{
if (pendingContext.SkipReadCache || pendingContext.NoKey)
{
SkipReadCache(ref logicalAddress);
}
else if (ReadFromCache(ref key, ref logicalAddress, ref physicalAddress))
{
if (sessionCtx.phase == Phase.PREPARE && CheckBucketVersionNew(ref entry, sessionCtx))
{
status = OperationStatus.CPR_SHIFT_DETECTED;
goto CreatePendingContext; // Pivot thread
}
// This is not called when looking up by address, so we do not set pendingContext.recordInfo.
// ReadCache addresses are not valid for indexing etc. so pass kInvalidAddress.
fasterSession.SingleReader(ref key, ref input, ref readcache.GetValue(physicalAddress), ref output, Constants.kInvalidAddress);
return OperationStatus.SUCCESS;
}
}
if (logicalAddress >= hlog.HeadAddress)
{
physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
if (!pendingContext.NoKey)
{
if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
{
logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
TraceBackForKeyMatch(ref key,
logicalAddress,
hlog.HeadAddress,
out logicalAddress,
out physicalAddress);
}
} else
{
// If NoKey, we do not have the key in the call and must use the key from the record.
key = ref hlog.GetKey(physicalAddress);
}
}
}
else
{
// no tag found
return OperationStatus.NOTFOUND;
}
#endregion
if (sessionCtx.phase == Phase.PREPARE && CheckBucketVersionNew(ref entry, sessionCtx))
{
status = OperationStatus.CPR_SHIFT_DETECTED;
goto CreatePendingContext; // Pivot thread
}
#region Normal processing
// Mutable region (even fuzzy region is included here)
if (logicalAddress >= hlog.SafeReadOnlyAddress)
{
ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
pendingContext.recordInfo = recordInfo;
if (!pendingContext.recordInfo.Tombstone)
{
fasterSession.ConcurrentReader(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress);
return OperationStatus.SUCCESS;
}
return OperationStatus.NOTFOUND;
}
// Immutable region
else if (logicalAddress >= hlog.HeadAddress)
{
pendingContext.recordInfo = hlog.GetInfo(physicalAddress);
if (!pendingContext.recordInfo.Tombstone)
{
fasterSession.SingleReader(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, logicalAddress);
if (CopyReadsToTail == CopyReadsToTail.FromReadOnly && !pendingContext.SkipCopyReadsToTail)
{
var container = hlog.GetValueContainer(ref hlog.GetValue(physicalAddress));
InternalTryCopyToTail(ref key, ref container.Get(), ref pendingContext.recordInfo, logicalAddress, fasterSession, sessionCtx);
container.Dispose();
}
return OperationStatus.SUCCESS;
}
return OperationStatus.NOTFOUND;
}
// On-Disk Region
else if (logicalAddress >= hlog.BeginAddress)
{
if (hlog.IsNullDevice)
return OperationStatus.NOTFOUND;
status = OperationStatus.RECORD_ON_DISK;
if (sessionCtx.phase == Phase.PREPARE)
{
Debug.Assert(heldOperation != LatchOperation.Exclusive);
if (useStartAddress)
{
Debug.Assert(heldOperation == LatchOperation.None);
}
else if (heldOperation == LatchOperation.Shared || HashBucket.TryAcquireSharedLatch(bucket))
{
heldOperation = LatchOperation.Shared;
}
else
{
status = OperationStatus.CPR_SHIFT_DETECTED;
}
if (RelaxedCPR) // don't hold on to shared latched during IO
{
if (heldOperation == LatchOperation.Shared)
HashBucket.ReleaseSharedLatch(bucket);
heldOperation = LatchOperation.None;
}
}
goto CreatePendingContext;
}
// No record found
else
{
return OperationStatus.NOTFOUND;
}
#endregion
#region Create pending context
CreatePendingContext:
{
pendingContext.type = OperationType.READ;
if (!pendingContext.NoKey && pendingContext.key == default) // If this is true, we don't have a valid key
pendingContext.key = hlog.GetKeyContainer(ref key);
if (pendingContext.input == default) pendingContext.input = fasterSession.GetHeapContainer(ref input);
pendingContext.output = output;
if (pendingContext.output is IHeapConvertible heapConvertible)
heapConvertible.ConvertToHeap();
pendingContext.userContext = userContext;
pendingContext.entry.word = entry.word;
pendingContext.logicalAddress = logicalAddress;
pendingContext.version = sessionCtx.version;
pendingContext.serialNum = lsn;
pendingContext.heldLatch = heldOperation;
pendingContext.recordInfo.PreviousAddress = startAddress;
}
#endregion
return status;
}