in src/core/fishstore.h [1552:1626]
inline Status FishStore<D, A>::HandleOperationStatus(ExecutionContext& ctx,
pending_context_t& pending_context, OperationStatus internal_status, bool& async) {
async = false;
switch(internal_status) {
case OperationStatus::RETRY_NOW:
// We do not retry full scan, since it may be super slow.
// Should we stop doing rety for hash chain scan as well???
switch(pending_context.type) {
case OperationType::Read: {
async_pending_read_context_t& read_context =
*static_cast<async_pending_read_context_t*>(&pending_context);
internal_status = InternalRead(read_context);
break;
}
case OperationType::Insert: {
async_pending_insert_context_t& insert_context =
*static_cast<async_pending_insert_context_t*>(&pending_context);
internal_status = InternalInsert(insert_context);
break;
}
case OperationType::Scan: {
async_pending_scan_context_t& scan_context =
*static_cast<async_pending_scan_context_t*>(&pending_context);
internal_status =
InternalScan(scan_context, pending_context.start_addr.control(),
pending_context.end_addr.control());
break;
}
default: {
assert(false);
}
}
if(internal_status == OperationStatus::SUCCESS) {
return Status::Ok;
} else {
return HandleOperationStatus(ctx, pending_context, internal_status, async);
}
case OperationStatus::RETRY_LATER:
return RetryLater(ctx, pending_context, async);
case OperationStatus::RECORD_ON_DISK:
if(thread_ctx().phase == Phase::PREPARE) {
assert(pending_context.type == OperationType::Read);
// Can I be marking an operation again and again?
if(!checkpoint_locks_.get_lock(pending_context.get_hash()).try_lock_old()) {
return PivotAndRetry(ctx, pending_context, async);
}
}
return IssueAsyncIoRequest(ctx, pending_context, async);
case OperationStatus::SCAN_IN_PROGRESS:
// Only try mark or retry for hash chain scan.
// FullScan is not protected against CPR. Should we do this to hash chain
// as well?
if(thread_ctx().phase == Phase::PREPARE &&
pending_context.type == OperationType::Scan) {
// Can I be marking an operation again and again?
if(!checkpoint_locks_.get_lock(pending_context.get_hash()).try_lock_old()) {
return PivotAndRetry(ctx, pending_context, async);
}
}
return IssueAsyncScanRequest(ctx, pending_context, async);
case OperationStatus::SUCCESS_UNMARK:
checkpoint_locks_.get_lock(pending_context.get_hash()).unlock_old();
return Status::Ok;
case OperationStatus::NOT_FOUND_UNMARK:
checkpoint_locks_.get_lock(pending_context.get_hash()).unlock_old();
return Status::NotFound;
case OperationStatus::CPR_SHIFT_DETECTED:
return PivotAndRetry(ctx, pending_context, async);
default: break;
}
// not reached
assert(false);
return Status::Corruption;
}