in cs/src/core/Index/FASTER/FASTERImpl.cs [340:473]
internal OperationStatus InternalUpsert<Input, Output, Context, FasterSession>(
ref Key key, ref Value value,
ref Context userContext,
ref PendingContext<Input, Output, Context> pendingContext,
FasterSession fasterSession,
FasterExecutionContext<Input, Output, Context> sessionCtx,
long lsn)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
var bucket = default(HashBucket*);
var slot = default(int);
var status = default(OperationStatus);
var latchOperation = LatchOperation.None;
var latchDestination = LatchDestination.NormalProcessing;
var hash = comparer.GetHashCode64(ref key);
var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);
if (sessionCtx.phase != Phase.REST)
HeavyEnter(hash, sessionCtx, fasterSession);
#region Trace back for record in in-memory HybridLog
var entry = default(HashBucketEntry);
FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);
var logicalAddress = entry.Address;
var physicalAddress = default(long);
if (UseReadCache)
SkipAndInvalidateReadCache(ref logicalAddress, ref key);
var latestLogicalAddress = logicalAddress;
if (logicalAddress >= hlog.ReadOnlyAddress)
{
physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
{
logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
TraceBackForKeyMatch(ref key,
logicalAddress,
hlog.ReadOnlyAddress,
out logicalAddress,
out physicalAddress);
}
}
#endregion
// Optimization for the most common case
if (sessionCtx.phase == Phase.REST && logicalAddress >= hlog.ReadOnlyAddress)
{
ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
if (!recordInfo.Tombstone
&& fasterSession.ConcurrentWriter(ref key, ref value, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress))
{
hlog.MarkPage(logicalAddress, sessionCtx.version);
return OperationStatus.SUCCESS;
}
goto CreateNewRecord;
}
#region Entry latch operation
if (sessionCtx.phase != Phase.REST)
{
latchDestination = AcquireLatchUpsert(sessionCtx, bucket, ref status, ref latchOperation, ref entry, logicalAddress);
}
#endregion
#region Normal processing
// Mutable Region: Update the record in-place
if (latchDestination == LatchDestination.NormalProcessing)
{
if (logicalAddress >= hlog.ReadOnlyAddress)
{
ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
if (!recordInfo.Tombstone
&& fasterSession.ConcurrentWriter(ref key, ref value, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress))
{
if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version);
else hlog.MarkPageAtomic(logicalAddress, sessionCtx.version);
status = OperationStatus.SUCCESS;
goto LatchRelease; // Release shared latch (if acquired)
}
}
}
// All other regions: Create a record in the mutable region
#endregion
#region Create new record in the mutable region
CreateNewRecord:
if (latchDestination != LatchDestination.CreatePendingContext)
{
// Immutable region or new record
status = CreateNewRecordUpsert(ref key, ref value, ref pendingContext, fasterSession, sessionCtx, bucket, slot, tag, entry, latestLogicalAddress);
if (status != OperationStatus.ALLOCATE_FAILED)
goto LatchRelease;
latchDestination = LatchDestination.CreatePendingContext;
}
#endregion
#region Create pending context
Debug.Assert(latchDestination == LatchDestination.CreatePendingContext, $"Upsert CreatePendingContext encountered latchDest == {latchDestination}");
{
pendingContext.type = OperationType.UPSERT;
if (pendingContext.key == default) pendingContext.key = hlog.GetKeyContainer(ref key);
if (pendingContext.value == default) pendingContext.value = hlog.GetValueContainer(ref value);
pendingContext.userContext = userContext;
pendingContext.entry.word = entry.word;
pendingContext.logicalAddress = logicalAddress;
pendingContext.version = sessionCtx.version;
pendingContext.serialNum = lsn;
}
#endregion
#region Latch release
LatchRelease:
{
switch (latchOperation)
{
case LatchOperation.Shared:
HashBucket.ReleaseSharedLatch(bucket);
break;
case LatchOperation.Exclusive:
HashBucket.ReleaseExclusiveLatch(bucket);
break;
default:
break;
}
}
#endregion
return status;
}