HMODULE GetLibHandle()

in src/core/fishstore.h [278:501]


  HMODULE GetLibHandle(size_t lib_id) {
#else
  void* GetLibHandle(size_t lib_id) {
#endif
    return libs.at(lib_id).handle;
  }


  /// Checkpoint/recovery operations.
  bool Checkpoint(const std::function<void(Status)>& index_persistent_callback,
                  const std::function<void(Status, uint64_t, uint32_t)>& hybrid_log_persistence_callback,
                  Guid& token);
  bool CheckpointIndex(const std::function<void(Status)>& index_persistent_callback, Guid& token);
  bool CheckpointHybridLog(const std::function<void(Status, uint64_t, uint32_t)>&
                           hybrid_log_persistence_callback,
                           Guid& token);
  Status Recover(const Guid& index_token, const Guid& hybrid_log_token, uint32_t& version,
                 std::vector<Guid>& session_ids);

  /// Truncating the head of the log.
  bool ShiftBeginAddress(Address address, GcState::truncate_callback_t truncate_callback,
                         GcState::complete_callback_t complete_callback);

  /// Make the hash table larger.
  bool GrowIndex(GrowState::callback_t caller_callback);

  // Applying a list of actions to the parser.
  // Returns safe deregister boundary through returned value
  // Returns safe register boudnary through callback
  uint64_t ApplyParserShift(const std::vector<ParserAction>& actions,
                            const std::function<void(uint64_t)>& callback);

  /// Statistics
  inline uint64_t Size() const {
    return hlog.GetTailAddress().control();
  }
  inline void DumpDistribution() {
    state_[resize_info_.version].DumpDistribution(
      overflow_buckets_allocator_[resize_info_.version]);
  }

 private:
  typedef Record record_t;

  typedef PendingContext pending_context_t;

  uint16_t AcquireFieldID(const std::string& field_name);

  template <class C>
  inline OperationStatus InternalRead(C& pending_context) const;

  template <class C>
  inline OperationStatus InternalInsert(C& pending_context);

  template <class C>
  inline OperationStatus InternalScan(C& pending_context, uint64_t start_addr,
                                      uint64_t end_addr) const;

  template <class C>
  inline OperationStatus InternalFullScan(C& pending_context, uint64_t start_addr, uint64_t end_addr);

  OperationStatus InternalContinuePendingRead(ExecutionContext& ctx,
      AsyncIOContext& io_context);
  OperationStatus InternalContinuePendingScan(ExecutionContext& ctx,
      AsyncIOContext& io_context);

  // Find the hash bucket entry, if any, corresponding to the specified hash.
  inline const AtomicHashBucketEntry* FindEntry(KeyHash hash) const;
  // If a hash bucket entry corresponding to the specified hash exists, return it; otherwise,
  // create a new entry. The caller can use the "expected_entry" to CAS its desired address into
  // the entry.
  inline AtomicHashBucketEntry* FindOrCreateEntry(KeyHash hash, HashBucketEntry& expected_entry,
      HashBucket*& bucket);

  template <class C>
  inline Address TraceBackForMatch(C& pending_context, Address from_address,
                                   Address min_offset) const;

  Address TraceBackForOtherChainStart(uint64_t old_size,  uint64_t new_size, Address from_address,
                                      Address min_address, uint8_t side);

  // If a hash bucket entry corresponding to the specified hash exists, return it; otherwise,
  // return an unused bucket entry.
  inline AtomicHashBucketEntry* FindTentativeEntry(KeyHash hash, HashBucket* bucket,
      uint8_t version, HashBucketEntry& expected_entry);
  // Looks for an entry that has the same
  inline bool HasConflictingEntry(KeyHash hash, const HashBucket* bucket, uint8_t version,
                                  const AtomicHashBucketEntry* atomic_entry) const;

  inline Address BlockAllocate(uint32_t record_size);

  inline Status HandleOperationStatus(ExecutionContext& ctx,
                                      pending_context_t& pending_context,
                                      OperationStatus internal_status, bool& async);
  inline Status PivotAndRetry(ExecutionContext& ctx, pending_context_t& pending_context,
                              bool& async);
  inline Status RetryLater(ExecutionContext& ctx, pending_context_t& pending_context,
                           bool& async);
  inline constexpr uint32_t MinIoRequestSize() const;

  inline Status IssueAsyncIoRequest(ExecutionContext& ctx,
                                    pending_context_t& pending_context,
                                    bool& async);
  inline Status IssueAsyncScanRequest(ExecutionContext& ctx,
                                      pending_context_t& pending_context,
                                      bool& async);

  void AsyncGetFromDisk(Address address, uint32_t num_records, AsyncIOCallback callback,
                        AsyncIOContext& context);
  static void AsyncGetFromDiskCallback(IAsyncContext* ctxt, Status result,
                                       size_t bytes_transferred);
  static void AsyncScanFromDiskCallback(IAsyncContext* ctxt, Status result,
                                        size_t bytes_transferred);
  static void AsyncFullScanFromDiskCallback(IAsyncContext* ctxt, Status result,
      size_t bytes_transferred);

  void CompleteIoPendingRequests(ExecutionContext& context);
  void CompleteRetryRequests(ExecutionContext& context);

  void InitializeCheckpointLocks();

  /// Checkpoint/recovery methods.
  void HandleSpecialPhases();
  bool GlobalMoveToNextState(SystemState current_state);

  Status CheckpointFuzzyIndex();
  Status CheckpointFuzzyIndexComplete();
  Status RecoverFuzzyIndex();
  Status RecoverFuzzyIndexComplete(bool wait);

  Status WriteIndexMetadata();
  Status ReadIndexMetadata(const Guid& token);
  Status WriteCprMetadata();
  Status ReadCprMetadata(const Guid& token);
  Status WriteCprContext();
  Status ReadCprContexts(const Guid& token, const Guid* guids);

  Status RecoverHybridLog();
  Status RecoverHybridLogFromSnapshotFile();
  Status RecoverFromPage(Address from_address, Address to_address);
  Status RestoreHybridLog();

  void MarkAllPendingRequests();

  inline void HeavyEnter();
  bool CleanHashTableBuckets();
  void SplitHashTableBuckets();
  void AddHashEntry(HashBucket*& bucket, uint32_t& next_idx, uint8_t version,
                    HashBucketEntry entry);

  void ParserStateApplyActions(ParserState& state, const std::vector<ParserAction>& actions);

  /// Access the current and previous (thread-local) execution contexts.
  const ExecutionContext& thread_ctx() const {
    return thread_contexts_[Thread::id()].cur();
  }
  ExecutionContext& thread_ctx() {
    return thread_contexts_[Thread::id()].cur();
  }
  ExecutionContext& prev_thread_ctx() {
    return thread_contexts_[Thread::id()].prev();
  }
  ParserContext<A>& parser_ctx() {
    return thread_contexts_[Thread::id()].parser_context();
  }

 private:
  LightEpoch epoch_;

 public:
  disk_t disk;
  hlog_t hlog;

 private:
  static constexpr bool kCopyReadsToTail = false;
  static constexpr uint64_t kGcHashTableChunkSize = 16384;
  static constexpr uint64_t kGrowHashTableChunkSize = 16384;

  bool fold_over_snapshot = true;

  /// Initial size of the table
  uint64_t min_table_size_;

  // Allocator for the hash buckets that don't fit in the hash table.
  MallocFixedPageSize<HashBucket, disk_t> overflow_buckets_allocator_[2];

  // An array of size two, that contains the old and new versions of the hash-table
  InternalHashTable<disk_t> state_[2];

  CheckpointLocks checkpoint_locks_;

  ResizeInfo resize_info_;

  AtomicSystemState system_state_;

  /// Checkpoint/recovery state.
  CheckpointState<file_t> checkpoint_;
  /// Garbage collection state.
  GcState gc_;
  /// Grow (hash table) state.
  GrowState grow_;
  PSState ps_;

  /// Global count of pending I/Os, used for throttling.
  std::atomic<uint64_t> num_pending_ios;

  /// Space for two contexts per thread, stored inline.
  ThreadContext<A> thread_contexts_[Thread::kMaxNumThreads];

  // FishStore Meta-data.
  // Mutex is used to protect the field_lookup_map during field and
  // predicate registration, which is not on the execution hot path.
  std::mutex mutex;
  // Field and predicate global registry.
  std::unordered_map<std::string, uint16_t> field_lookup_map;
  std::vector<LibraryHandle> libs;
  concurrent_vector<std::string> field_names;

  concurrent_vector<InlinePSF<A>> inline_psf_map;
  concurrent_vector<GeneralPSF<A>> general_psf_map;

  ParserState parser_states[2];
  std::atomic_int8_t system_parser_no_;
};