in src/core/fishstore.h [278:501]
HMODULE GetLibHandle(size_t lib_id) {
#else
void* GetLibHandle(size_t lib_id) {
#endif
return libs.at(lib_id).handle;
}
/// Checkpoint/recovery operations.
bool Checkpoint(const std::function<void(Status)>& index_persistent_callback,
const std::function<void(Status, uint64_t, uint32_t)>& hybrid_log_persistence_callback,
Guid& token);
bool CheckpointIndex(const std::function<void(Status)>& index_persistent_callback, Guid& token);
bool CheckpointHybridLog(const std::function<void(Status, uint64_t, uint32_t)>&
hybrid_log_persistence_callback,
Guid& token);
Status Recover(const Guid& index_token, const Guid& hybrid_log_token, uint32_t& version,
std::vector<Guid>& session_ids);
/// Truncating the head of the log.
bool ShiftBeginAddress(Address address, GcState::truncate_callback_t truncate_callback,
GcState::complete_callback_t complete_callback);
/// Make the hash table larger.
bool GrowIndex(GrowState::callback_t caller_callback);
// Applying a list of actions to the parser.
// Returns safe deregister boundary through returned value
// Returns safe register boudnary through callback
uint64_t ApplyParserShift(const std::vector<ParserAction>& actions,
const std::function<void(uint64_t)>& callback);
/// Statistics
inline uint64_t Size() const {
return hlog.GetTailAddress().control();
}
inline void DumpDistribution() {
state_[resize_info_.version].DumpDistribution(
overflow_buckets_allocator_[resize_info_.version]);
}
private:
typedef Record record_t;
typedef PendingContext pending_context_t;
uint16_t AcquireFieldID(const std::string& field_name);
template <class C>
inline OperationStatus InternalRead(C& pending_context) const;
template <class C>
inline OperationStatus InternalInsert(C& pending_context);
template <class C>
inline OperationStatus InternalScan(C& pending_context, uint64_t start_addr,
uint64_t end_addr) const;
template <class C>
inline OperationStatus InternalFullScan(C& pending_context, uint64_t start_addr, uint64_t end_addr);
OperationStatus InternalContinuePendingRead(ExecutionContext& ctx,
AsyncIOContext& io_context);
OperationStatus InternalContinuePendingScan(ExecutionContext& ctx,
AsyncIOContext& io_context);
// Find the hash bucket entry, if any, corresponding to the specified hash.
inline const AtomicHashBucketEntry* FindEntry(KeyHash hash) const;
// If a hash bucket entry corresponding to the specified hash exists, return it; otherwise,
// create a new entry. The caller can use the "expected_entry" to CAS its desired address into
// the entry.
inline AtomicHashBucketEntry* FindOrCreateEntry(KeyHash hash, HashBucketEntry& expected_entry,
HashBucket*& bucket);
template <class C>
inline Address TraceBackForMatch(C& pending_context, Address from_address,
Address min_offset) const;
Address TraceBackForOtherChainStart(uint64_t old_size, uint64_t new_size, Address from_address,
Address min_address, uint8_t side);
// If a hash bucket entry corresponding to the specified hash exists, return it; otherwise,
// return an unused bucket entry.
inline AtomicHashBucketEntry* FindTentativeEntry(KeyHash hash, HashBucket* bucket,
uint8_t version, HashBucketEntry& expected_entry);
// Looks for an entry that has the same
inline bool HasConflictingEntry(KeyHash hash, const HashBucket* bucket, uint8_t version,
const AtomicHashBucketEntry* atomic_entry) const;
inline Address BlockAllocate(uint32_t record_size);
inline Status HandleOperationStatus(ExecutionContext& ctx,
pending_context_t& pending_context,
OperationStatus internal_status, bool& async);
inline Status PivotAndRetry(ExecutionContext& ctx, pending_context_t& pending_context,
bool& async);
inline Status RetryLater(ExecutionContext& ctx, pending_context_t& pending_context,
bool& async);
inline constexpr uint32_t MinIoRequestSize() const;
inline Status IssueAsyncIoRequest(ExecutionContext& ctx,
pending_context_t& pending_context,
bool& async);
inline Status IssueAsyncScanRequest(ExecutionContext& ctx,
pending_context_t& pending_context,
bool& async);
void AsyncGetFromDisk(Address address, uint32_t num_records, AsyncIOCallback callback,
AsyncIOContext& context);
static void AsyncGetFromDiskCallback(IAsyncContext* ctxt, Status result,
size_t bytes_transferred);
static void AsyncScanFromDiskCallback(IAsyncContext* ctxt, Status result,
size_t bytes_transferred);
static void AsyncFullScanFromDiskCallback(IAsyncContext* ctxt, Status result,
size_t bytes_transferred);
void CompleteIoPendingRequests(ExecutionContext& context);
void CompleteRetryRequests(ExecutionContext& context);
void InitializeCheckpointLocks();
/// Checkpoint/recovery methods.
void HandleSpecialPhases();
bool GlobalMoveToNextState(SystemState current_state);
Status CheckpointFuzzyIndex();
Status CheckpointFuzzyIndexComplete();
Status RecoverFuzzyIndex();
Status RecoverFuzzyIndexComplete(bool wait);
Status WriteIndexMetadata();
Status ReadIndexMetadata(const Guid& token);
Status WriteCprMetadata();
Status ReadCprMetadata(const Guid& token);
Status WriteCprContext();
Status ReadCprContexts(const Guid& token, const Guid* guids);
Status RecoverHybridLog();
Status RecoverHybridLogFromSnapshotFile();
Status RecoverFromPage(Address from_address, Address to_address);
Status RestoreHybridLog();
void MarkAllPendingRequests();
inline void HeavyEnter();
bool CleanHashTableBuckets();
void SplitHashTableBuckets();
void AddHashEntry(HashBucket*& bucket, uint32_t& next_idx, uint8_t version,
HashBucketEntry entry);
void ParserStateApplyActions(ParserState& state, const std::vector<ParserAction>& actions);
/// Access the current and previous (thread-local) execution contexts.
const ExecutionContext& thread_ctx() const {
return thread_contexts_[Thread::id()].cur();
}
ExecutionContext& thread_ctx() {
return thread_contexts_[Thread::id()].cur();
}
ExecutionContext& prev_thread_ctx() {
return thread_contexts_[Thread::id()].prev();
}
ParserContext<A>& parser_ctx() {
return thread_contexts_[Thread::id()].parser_context();
}
private:
LightEpoch epoch_;
public:
disk_t disk;
hlog_t hlog;
private:
static constexpr bool kCopyReadsToTail = false;
static constexpr uint64_t kGcHashTableChunkSize = 16384;
static constexpr uint64_t kGrowHashTableChunkSize = 16384;
bool fold_over_snapshot = true;
/// Initial size of the table
uint64_t min_table_size_;
// Allocator for the hash buckets that don't fit in the hash table.
MallocFixedPageSize<HashBucket, disk_t> overflow_buckets_allocator_[2];
// An array of size two, that contains the old and new versions of the hash-table
InternalHashTable<disk_t> state_[2];
CheckpointLocks checkpoint_locks_;
ResizeInfo resize_info_;
AtomicSystemState system_state_;
/// Checkpoint/recovery state.
CheckpointState<file_t> checkpoint_;
/// Garbage collection state.
GcState gc_;
/// Grow (hash table) state.
GrowState grow_;
PSState ps_;
/// Global count of pending I/Os, used for throttling.
std::atomic<uint64_t> num_pending_ios;
/// Space for two contexts per thread, stored inline.
ThreadContext<A> thread_contexts_[Thread::kMaxNumThreads];
// FishStore Meta-data.
// Mutex is used to protect the field_lookup_map during field and
// predicate registration, which is not on the execution hot path.
std::mutex mutex;
// Field and predicate global registry.
std::unordered_map<std::string, uint16_t> field_lookup_map;
std::vector<LibraryHandle> libs;
concurrent_vector<std::string> field_names;
concurrent_vector<InlinePSF<A>> inline_psf_map;
concurrent_vector<GeneralPSF<A>> general_psf_map;
ParserState parser_states[2];
std::atomic_int8_t system_parser_no_;
};