in cs/src/core/Index/FASTER/FASTERImpl.cs [641:834]
internal OperationStatus InternalRMW<Input, Output, Context, FasterSession>(
ref Key key, ref Input input, ref Output output,
ref Context userContext,
ref PendingContext<Input, Output, Context> pendingContext,
FasterSession fasterSession,
FasterExecutionContext<Input, Output, Context> sessionCtx,
long lsn)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
var bucket = default(HashBucket*);
var slot = default(int);
var physicalAddress = default(long);
var status = default(OperationStatus);
var latchOperation = LatchOperation.None;
var heldOperation = LatchOperation.None;
var latchDestination = LatchDestination.NormalProcessing;
var hash = comparer.GetHashCode64(ref key);
var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);
if (sessionCtx.phase != Phase.REST)
HeavyEnter(hash, sessionCtx, fasterSession);
#region Trace back for record in in-memory HybridLog
var entry = default(HashBucketEntry);
FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);
var logicalAddress = entry.Address;
// For simplicity, we don't let RMW operations use read cache
if (UseReadCache)
SkipReadCache(ref logicalAddress);
var latestLogicalAddress = logicalAddress;
if (logicalAddress >= hlog.HeadAddress)
{
physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
{
logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
TraceBackForKeyMatch(ref key,
logicalAddress,
hlog.HeadAddress,
out logicalAddress,
out physicalAddress);
}
}
#endregion
// Optimization for the most common case
if (sessionCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress)
{
ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
if (!recordInfo.Tombstone
&& fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress))
{
hlog.MarkPage(logicalAddress, sessionCtx.version);
return OperationStatus.SUCCESS;
}
goto CreateNewRecord;
}
#region Entry latch operation
if (sessionCtx.phase != Phase.REST)
{
latchDestination = AcquireLatchRMW(pendingContext, sessionCtx, bucket, ref status, ref latchOperation, ref entry, logicalAddress);
}
#endregion
#region Normal processing
// Mutable Region: Update the record in-place
if (latchDestination == LatchDestination.NormalProcessing)
{
if (logicalAddress >= hlog.ReadOnlyAddress)
{
ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
if (!recordInfo.Tombstone)
{
if (FoldOverSnapshot)
{
Debug.Assert(recordInfo.Version == sessionCtx.version);
}
if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref output, ref recordInfo, logicalAddress))
{
if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version);
else hlog.MarkPageAtomic(logicalAddress, sessionCtx.version);
status = OperationStatus.SUCCESS;
goto LatchRelease; // Release shared latch (if acquired)
}
}
}
// Fuzzy Region: Must go pending due to lost-update anomaly
else if (logicalAddress >= hlog.SafeReadOnlyAddress && !hlog.GetInfo(physicalAddress).Tombstone)
{
status = OperationStatus.RETRY_LATER;
// Do not retain latch for pendings ops in relaxed CPR
if (!RelaxedCPR)
{
// Retain the shared latch (if acquired)
if (latchOperation == LatchOperation.Shared)
{
heldOperation = latchOperation;
latchOperation = LatchOperation.None;
}
}
latchDestination = LatchDestination.CreatePendingContext; // Go pending
}
// Safe Read-Only Region: Create a record in the mutable region
else if (logicalAddress >= hlog.HeadAddress)
{
goto CreateNewRecord;
}
// Disk Region: Need to issue async io requests
else if (logicalAddress >= hlog.BeginAddress)
{
status = OperationStatus.RECORD_ON_DISK;
// Do not retain latch for pendings ops in relaxed CPR
if (!RelaxedCPR)
{
// Retain the shared latch (if acquired)
if (latchOperation == LatchOperation.Shared)
{
heldOperation = latchOperation;
latchOperation = LatchOperation.None;
}
}
latchDestination = LatchDestination.CreatePendingContext; // Go pending
}
// No record exists - create new
else
{
goto CreateNewRecord;
}
}
#endregion
#region Create new record
CreateNewRecord:
if (latchDestination != LatchDestination.CreatePendingContext)
{
status = CreateNewRecordRMW(ref key, ref input, ref output, ref pendingContext, fasterSession, sessionCtx, bucket, slot, logicalAddress, physicalAddress, tag, entry, latestLogicalAddress);
if (status != OperationStatus.ALLOCATE_FAILED)
goto LatchRelease;
latchDestination = LatchDestination.CreatePendingContext;
}
#endregion
#region Create failure context
Debug.Assert(latchDestination == LatchDestination.CreatePendingContext, $"RMW CreatePendingContext encountered latchDest == {latchDestination}");
{
pendingContext.type = OperationType.RMW;
if (pendingContext.key == default) pendingContext.key = hlog.GetKeyContainer(ref key);
if (pendingContext.input == default) pendingContext.input = fasterSession.GetHeapContainer(ref input);
pendingContext.output = output;
if (pendingContext.output is IHeapConvertible heapConvertible)
heapConvertible.ConvertToHeap();
pendingContext.userContext = userContext;
pendingContext.entry.word = entry.word;
pendingContext.logicalAddress = logicalAddress;
pendingContext.version = sessionCtx.version;
pendingContext.serialNum = lsn;
pendingContext.heldLatch = heldOperation;
}
#endregion
#region Latch release
LatchRelease:
{
switch (latchOperation)
{
case LatchOperation.Shared:
HashBucket.ReleaseSharedLatch(bucket);
break;
case LatchOperation.Exclusive:
HashBucket.ReleaseExclusiveLatch(bucket);
break;
default:
break;
}
}
#endregion
return status;
}