in cs/src/core/Index/FASTER/FASTERImpl.cs [1021:1256]
internal OperationStatus InternalDelete<Input, Output, Context, FasterSession>(
ref Key key,
ref Context userContext,
ref PendingContext<Input, Output, Context> pendingContext,
FasterSession fasterSession,
FasterExecutionContext<Input, Output, Context> sessionCtx,
long lsn)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
var status = default(OperationStatus);
var bucket = default(HashBucket*);
var slot = default(int);
var logicalAddress = Constants.kInvalidAddress;
var physicalAddress = default(long);
var latchOperation = default(LatchOperation);
var hash = comparer.GetHashCode64(ref key);
var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);
if (sessionCtx.phase != Phase.REST)
HeavyEnter(hash, sessionCtx, fasterSession);
#region Trace back for record in in-memory HybridLog
var entry = default(HashBucketEntry);
var tagExists = FindTag(hash, tag, ref bucket, ref slot, ref entry);
if (!tagExists)
return OperationStatus.NOTFOUND;
logicalAddress = entry.Address;
if (UseReadCache)
SkipAndInvalidateReadCache(ref logicalAddress, ref key);
var latestLogicalAddress = logicalAddress;
if (logicalAddress >= hlog.ReadOnlyAddress)
{
physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
{
logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
TraceBackForKeyMatch(ref key,
logicalAddress,
hlog.ReadOnlyAddress,
out logicalAddress,
out physicalAddress);
}
}
#endregion
#region Entry latch operation
if (sessionCtx.phase != Phase.REST)
{
switch (sessionCtx.phase)
{
case Phase.PREPARE:
{
if (HashBucket.TryAcquireSharedLatch(bucket))
{
// Set to release shared latch (default)
latchOperation = LatchOperation.Shared;
if (CheckBucketVersionNew(ref entry, sessionCtx))
{
status = OperationStatus.CPR_SHIFT_DETECTED;
goto CreatePendingContext; // Pivot Thread
}
break; // Normal Processing
}
else
{
status = OperationStatus.CPR_SHIFT_DETECTED;
goto CreatePendingContext; // Pivot Thread
}
}
case Phase.IN_PROGRESS:
{
if (!CheckEntryVersionNew(logicalAddress, sessionCtx))
{
if (HashBucket.TryAcquireExclusiveLatch(bucket))
{
// Set to release exclusive latch (default)
latchOperation = LatchOperation.Exclusive;
goto CreateNewRecord; // Create a (v+1) record
}
else
{
status = OperationStatus.RETRY_LATER;
goto CreatePendingContext; // Go Pending
}
}
break; // Normal Processing
}
case Phase.WAIT_PENDING:
{
if (!CheckEntryVersionNew(logicalAddress, sessionCtx))
{
if (HashBucket.NoSharedLatches(bucket))
{
goto CreateNewRecord; // Create a (v+1) record
}
else
{
status = OperationStatus.RETRY_LATER;
goto CreatePendingContext; // Go Pending
}
}
break; // Normal Processing
}
case Phase.WAIT_FLUSH:
{
if (!CheckEntryVersionNew(logicalAddress, sessionCtx))
{
goto CreateNewRecord; // Create a (v+1) record
}
break; // Normal Processing
}
default:
break;
}
}
#endregion
#region Normal processing
// Mutable Region: Update the record in-place
if (logicalAddress >= hlog.ReadOnlyAddress)
{
ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
ref Value value = ref hlog.GetValue(physicalAddress);
fasterSession.ConcurrentDeleter(ref hlog.GetKey(physicalAddress), ref value, ref recordInfo, logicalAddress);
if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version);
else hlog.MarkPageAtomic(logicalAddress, sessionCtx.version);
if (WriteDefaultOnDelete)
value = default;
// Try to update hash chain and completely elide record only if previous address points to invalid address
if (entry.Address == logicalAddress && recordInfo.PreviousAddress < hlog.BeginAddress)
{
var updatedEntry = default(HashBucketEntry);
updatedEntry.Tag = 0;
if (recordInfo.PreviousAddress == Constants.kTempInvalidAddress)
updatedEntry.Address = Constants.kInvalidAddress;
else
updatedEntry.Address = recordInfo.PreviousAddress;
updatedEntry.Pending = entry.Pending;
updatedEntry.Tentative = false;
// Ignore return value; this is a performance optimization to keep the hash table clean if we can, so if we fail it just means
// the hashtable entry has already been updated by someone else.
Interlocked.CompareExchange(ref bucket->bucket_entries[slot], updatedEntry.word, entry.word);
}
status = OperationStatus.SUCCESS;
goto LatchRelease; // Release shared latch (if acquired)
}
// All other regions: Create a record in the mutable region
#endregion
#region Create new record in the mutable region
CreateNewRecord:
{
var value = default(Value);
// Immutable region or new record
// Allocate default record size for tombstone
var (actualSize, allocateSize) = hlog.GetRecordSize(ref key, ref value);
BlockAllocate(allocateSize, out long newLogicalAddress, sessionCtx, fasterSession, pendingContext.IsAsync);
if (newLogicalAddress == 0)
{
status = OperationStatus.ALLOCATE_FAILED;
goto CreatePendingContext;
}
var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress),
sessionCtx.version, tombstone:true, invalidBit:false,
latestLogicalAddress);
hlog.Serialize(ref key, newPhysicalAddress);
var updatedEntry = default(HashBucketEntry);
updatedEntry.Tag = tag;
updatedEntry.Address = newLogicalAddress & Constants.kAddressMask;
updatedEntry.Pending = entry.Pending;
updatedEntry.Tentative = false;
var foundEntry = default(HashBucketEntry);
foundEntry.word = Interlocked.CompareExchange(
ref bucket->bucket_entries[slot],
updatedEntry.word, entry.word);
if (foundEntry.word == entry.word)
{
pendingContext.logicalAddress = newLogicalAddress;
status = OperationStatus.SUCCESS;
goto LatchRelease;
}
else
{
hlog.GetInfo(newPhysicalAddress).Invalid = true;
status = OperationStatus.RETRY_NOW;
goto LatchRelease;
}
}
#endregion
#region Create pending context
CreatePendingContext:
{
pendingContext.type = OperationType.DELETE;
if (pendingContext.key == default) pendingContext.key = hlog.GetKeyContainer(ref key);
pendingContext.userContext = userContext;
pendingContext.entry.word = entry.word;
pendingContext.logicalAddress = logicalAddress;
pendingContext.version = sessionCtx.version;
pendingContext.serialNum = lsn;
}
#endregion
#region Latch release
LatchRelease:
{
switch (latchOperation)
{
case LatchOperation.Shared:
HashBucket.ReleaseSharedLatch(bucket);
break;
case LatchOperation.Exclusive:
HashBucket.ReleaseExclusiveLatch(bucket);
break;
default:
break;
}
}
#endregion
return status;
}