in src/device/file_system_disk.h [289:348]
Status OpenSegment(uint64_t segment) {
class Context : public IAsyncContext {
public:
Context(void* files_)
: files{ files_ } {
}
/// The deep-copy constructor.
Context(const Context& other)
: files{ other.files} {
}
protected:
Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
return IAsyncContext::DeepCopy_Internal(*this, context_copy);
}
public:
void* files;
};
auto callback = [](IAsyncContext* ctxt) {
CallbackContext<Context> context{ ctxt };
std::free(context->files);
};
// Only one thread can modify the list of files at a given time.
std::lock_guard<std::mutex> lock{ mutex_ };
bundle_t* files = files_.load();
if(segment < begin_segment_) {
// The requested segment has been truncated.
return Status::IOError;
}
if(files && files->exists(segment)) {
// Some other thread already opened this segment for us.
return Status::Ok;
}
if(!files) {
// First segment opened.
void* buffer = std::malloc(bundle_t::size(1));
bundle_t* new_files = new(buffer) bundle_t{ filename_, file_options_, handler_,
segment, segment + 1 };
files_.store(new_files);
return Status::Ok;
}
// Expand the list of files_.
uint64_t new_begin_segment = std::min(files->begin_segment, segment);
uint64_t new_end_segment = std::max(files->end_segment, segment + 1);
void* buffer = std::malloc(bundle_t::size(new_end_segment - new_begin_segment));
bundle_t* new_files = new(buffer) bundle_t{ handler_, new_begin_segment, new_end_segment,
*files };
files_.store(new_files);
// Delete the old list only after all threads have finished looking at it.
Context context{ files };
IAsyncContext* context_copy;
Status result = context.DeepCopy(context_copy);
assert(result == Status::Ok);
epoch_->BumpCurrentEpoch(callback, context_copy);
return Status::Ok;
}