in cs/src/core/Index/FASTER/FASTERImpl.cs [1441:1563]
internal OperationStatus InternalContinuePendingRMW<Input, Output, Context, FasterSession>(
FasterExecutionContext<Input, Output, Context> opCtx,
AsyncIOContext<Key, Value> request,
ref PendingContext<Input, Output, Context> pendingContext,
FasterSession fasterSession,
FasterExecutionContext<Input, Output, Context> sessionCtx)
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
{
var bucket = default(HashBucket*);
var slot = default(int);
var logicalAddress = Constants.kInvalidAddress;
var physicalAddress = default(long);
var status = default(OperationStatus);
ref Key key = ref pendingContext.key.Get();
var hash = comparer.GetHashCode64(ref key);
var tag = (ushort)((ulong)hash >> Constants.kHashTagShift);
while (true)
{
#region Trace Back for Record on In-Memory HybridLog
var entry = default(HashBucketEntry);
FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);
logicalAddress = entry.Address;
// For simplicity, we don't let RMW operations use read cache
if (UseReadCache)
SkipReadCache(ref logicalAddress);
var latestLogicalAddress = logicalAddress;
if (logicalAddress >= hlog.HeadAddress)
{
physicalAddress = hlog.GetPhysicalAddress(logicalAddress);
if (!comparer.Equals(ref key, ref hlog.GetKey(physicalAddress)))
{
logicalAddress = hlog.GetInfo(physicalAddress).PreviousAddress;
TraceBackForKeyMatch(ref key,
logicalAddress,
hlog.HeadAddress,
out logicalAddress,
out physicalAddress);
}
}
#endregion
var previousFirstRecordAddress = pendingContext.entry.Address;
if (logicalAddress > previousFirstRecordAddress)
{
break;
}
#region Create record in mutable region
if ((request.logicalAddress >= hlog.BeginAddress) && !hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone)
{
if (!fasterSession.NeedCopyUpdate(ref key, ref pendingContext.input.Get(), ref hlog.GetContextRecordValue(ref request), ref pendingContext.output))
{
return OperationStatus.SUCCESS;
}
}
int actualSize, allocatedSize;
if ((request.logicalAddress < hlog.BeginAddress) || (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone))
{
(actualSize, allocatedSize) = hlog.GetInitialRecordSize(ref key, ref pendingContext.input.Get(), fasterSession);
}
else
{
physicalAddress = (long)request.record.GetValidPointer();
(actualSize, allocatedSize) = hlog.GetRecordSize(physicalAddress, ref pendingContext.input.Get(), fasterSession);
}
BlockAllocate(allocatedSize, out long newLogicalAddress, sessionCtx, fasterSession);
if (newLogicalAddress == 0)
return OperationStatus.ALLOCATE_FAILED;
var newPhysicalAddress = hlog.GetPhysicalAddress(newLogicalAddress);
RecordInfo.WriteInfo(ref hlog.GetInfo(newPhysicalAddress), opCtx.version,
tombstone:false, invalidBit:false,
latestLogicalAddress);
hlog.Serialize(ref key, newPhysicalAddress);
if ((request.logicalAddress < hlog.BeginAddress) || (hlog.GetInfoFromBytePointer(request.record.GetValidPointer()).Tombstone))
{
fasterSession.InitialUpdater(ref key,
ref pendingContext.input.Get(), ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
ref pendingContext.output);
status = OperationStatus.NOTFOUND;
}
else
{
fasterSession.CopyUpdater(ref key,
ref pendingContext.input.Get(), ref hlog.GetContextRecordValue(ref request),
ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize),
ref pendingContext.output);
status = OperationStatus.SUCCESS;
}
var updatedEntry = default(HashBucketEntry);
updatedEntry.Tag = tag;
updatedEntry.Address = newLogicalAddress & Constants.kAddressMask;
updatedEntry.Pending = entry.Pending;
updatedEntry.Tentative = false;
var foundEntry = default(HashBucketEntry);
foundEntry.word = Interlocked.CompareExchange(
ref bucket->bucket_entries[slot],
updatedEntry.word, entry.word);
if (foundEntry.word == entry.word)
{
return status;
}
else
{
hlog.GetInfo(newPhysicalAddress).Invalid = true;
}
#endregion
}
OperationStatus internalStatus;
do
internalStatus = InternalRMW(ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output, ref pendingContext.userContext, ref pendingContext, fasterSession, opCtx, pendingContext.serialNum);
while (internalStatus == OperationStatus.RETRY_NOW);
return internalStatus;
}