be/src/runtime/mem-pool.h (145 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #ifndef IMPALA_RUNTIME_MEM_POOL_H #define IMPALA_RUNTIME_MEM_POOL_H #include <stdio.h> #include <algorithm> #include <cstddef> #include <string> #include <vector> #include "common/logging.h" #include "gutil/dynamic_annotations.h" #include "gutil/threading/thread_collision_warner.h" #include "util/bit-util.h" namespace impala { class MemTracker; /// Similar to SummaryStatsCounter without thread-safe support so don't need /// to acquire locks. struct SummaryStats { /// The total number of values seen so far. int32_t total_num_values_ = 0; /// Summary statistics of values seen so far. int64_t min_ = INT64_MAX; int64_t max_ = INT64_MIN; int64_t sum_ = 0; void UpdateCounter(int64_t new_value) { ++total_num_values_; sum_ += new_value; if (new_value < min_) min_ = new_value; if (new_value > max_) max_ = new_value; } }; struct MemPoolCounters { /// Stats of duration in malloc() SummaryStats sys_alloc_duration; /// Stats of duration in free() SummaryStats sys_free_duration; /// Stats of allocated bytes in malloc() SummaryStats allocated_bytes; /// Stats of freed bytes in free() SummaryStats freed_bytes; }; /// A MemPool maintains a list of memory chunks from which it allocates memory in /// response to Allocate() calls; /// Chunks stay around for the lifetime of the mempool or until they are passed on to /// another mempool. // /// The caller registers a MemTracker with the pool; chunk allocations are counted /// against that tracker and all of its ancestors. If chunks get moved between pools /// during AcquireData() calls, the respective MemTrackers are updated accordingly. /// Chunks freed up in the d'tor are subtracted from the registered trackers. // /// An Allocate() call will attempt to allocate memory from the chunk that was most /// recently added; if that chunk doesn't have enough memory to /// satisfy the allocation request, the free chunks are searched for one that is /// big enough otherwise a new chunk is added to the list. /// In order to keep allocation overhead low, chunk sizes double with each new one /// added, until they hit a maximum size. But if the required size is greater than the /// next chunk size, then a new chunk with the required size is allocated and the next /// chunk size is set to the min(2*(required size), max chunk size). However if the /// 'enforce_binary_chunk_sizes' flag passed to the c'tor is true, then all chunk sizes /// allocated will be rounded up to the next power of two. /// /// Allocated chunks can be reused for new allocations if Clear() is called to free /// all allocations or ReturnPartialAllocation() is called to return part of the last /// allocation. /// /// All chunks before 'current_chunk_idx_' have allocated memory, while all chunks /// after 'current_chunk_idx_' are free. The chunk at 'current_chunk_idx_' may or may /// not have allocated memory. /// /// Example: /// MemPool* p = new MemPool(); /// for (int i = 0; i < 1024; ++i) { /// returns 8-byte aligned memory (effectively 24 bytes): /// .. = p->Allocate(17); /// } /// at this point, 17K have been handed out in response to Allocate() calls and /// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K) /// We track total and peak allocated bytes. At this point they would be the same: /// 28k bytes. A call to Clear will return the allocated memory so /// total_allocated_bytes_ becomes 0. /// p->Clear(); /// the entire 1st chunk is returned: /// .. = p->Allocate(4 * 1024); /// 4K of the 2nd chunk are returned: /// .. = p->Allocate(4 * 1024); /// a new 20K chunk is created /// .. = p->Allocate(20 * 1024); // /// MemPool* p2 = new MemPool(); /// the new mempool receives all chunks containing data from p /// p2->AcquireData(p, false); /// At this point p.total_allocated_bytes_ would be 0. /// The one remaining (empty) chunk is released: /// delete p; // /// This class is not thread-safe. A DFAKE_MUTEX is used to help enforce correct usage. class MemPool { public: /// 'tracker' tracks the amount of memory allocated by this pool. Must not be NULL. /// If 'enforce_binary_chunk_sizes' is set to true then all chunk sizes /// allocated will be rounded up to the next power of two. MemPool(MemTracker* mem_tracker, bool enforce_binary_chunk_sizes = false); /// Frees all chunks of memory and subtracts the total allocated bytes /// from the registered limits. ~MemPool(); /// Allocates a section of memory of 'size' bytes with DEFAULT_ALIGNMENT at the end /// of the the current chunk. Creates a new chunk if there aren't any chunks /// with enough capacity. uint8_t* Allocate(int64_t size) noexcept { DFAKE_SCOPED_LOCK(mutex_); return Allocate<false>(size, DEFAULT_ALIGNMENT); } /// Same as Allocate() except the mem limit is checked before the allocation and /// this call will fail (returns NULL) if it does. /// The caller must handle the NULL case. This should be used for allocations /// where the size can be very big to bound the amount by which we exceed mem limits. uint8_t* TryAllocate(int64_t size) noexcept { DFAKE_SCOPED_LOCK(mutex_); return Allocate<true>(size, DEFAULT_ALIGNMENT); } /// Same as TryAllocate() except a non-default alignment can be specified. It /// should be a power-of-two in [1, alignof(std::max_align_t)]. uint8_t* TryAllocateAligned(int64_t size, int alignment) noexcept { DFAKE_SCOPED_LOCK(mutex_); DCHECK_GE(alignment, 1); DCHECK_LE(alignment, alignof(std::max_align_t)); DCHECK_EQ(BitUtil::RoundUpToPowerOfTwo(alignment), alignment); return Allocate<true>(size, alignment); } /// Same as TryAllocate() except returned memory is not aligned at all. uint8_t* TryAllocateUnaligned(int64_t size) noexcept { DFAKE_SCOPED_LOCK(mutex_); // Call templated implementation directly so that it is inlined here and the // alignment logic can be optimised out. return Allocate<true>(size, 1); } /// Returns 'byte_size' to the current chunk back to the mem pool. This can /// only be used to return either all or part of the previous allocation returned /// by Allocate(). void ReturnPartialAllocation(int64_t byte_size) { DFAKE_SCOPED_LOCK(mutex_); DCHECK_GE(byte_size, 0); DCHECK(current_chunk_idx_ != -1); ChunkInfo& info = chunks_[current_chunk_idx_]; DCHECK_GE(info.allocated_bytes, byte_size); info.allocated_bytes -= byte_size; ASAN_POISON_MEMORY_REGION(info.data + info.allocated_bytes, byte_size); total_allocated_bytes_ -= byte_size; } /// Return a dummy pointer for zero-length allocations. static uint8_t* EmptyAllocPtr() { return reinterpret_cast<uint8_t*>(&zero_length_region_); } /// Makes all allocated chunks available for re-use, but doesn't delete any chunks. void Clear(); /// Deletes all allocated chunks. FreeAll() or AcquireData() must be called for /// each mem pool void FreeAll(); /// Absorb all chunks that hold data from src. If keep_current is true, let src hold on /// to its last allocated chunk that contains data. /// All offsets handed out by calls to GetCurrentOffset() for 'src' become invalid. void AcquireData(MemPool* src, bool keep_current); /// Change the MemTracker, updating consumption on the current and new tracker. void SetMemTracker(MemTracker* new_tracker); std::string DebugString(); int64_t total_allocated_bytes() const { return total_allocated_bytes_; } int64_t total_reserved_bytes() const { return total_reserved_bytes_; } MemTracker* mem_tracker() { return mem_tracker_; } /// Return sum of chunk_sizes_. int64_t GetTotalChunkSizes() const; MemPoolCounters GetMemPoolCounters() const { return counters_; } /// TODO: make a macro for doing this /// For C++/IR interop, we need to be able to look up types by name. static const char* LLVM_CLASS_NAME; static const int DEFAULT_ALIGNMENT = 8; private: friend class MemPoolTest; static const int INITIAL_CHUNK_SIZE = 4 * 1024; /// The maximum size of chunk that should be allocated. Allocations larger than this /// size will get their own individual chunk. Chosen to be small enough that it gets /// a freelist in TCMalloc's central cache. static const int MAX_CHUNK_SIZE = 512 * 1024; struct ChunkInfo { uint8_t* data; // Owned by the ChunkInfo. int64_t size; // in bytes /// bytes allocated via Allocate() in this chunk int64_t allocated_bytes; explicit ChunkInfo(int64_t size, uint8_t* buf); ChunkInfo() : data(NULL), size(0), allocated_bytes(0) {} }; /// A static field used as non-NULL pointer for zero length allocations. NULL is /// reserved for allocation failures. It must be as aligned as max_align_t for /// TryAllocateAligned(). static uint32_t zero_length_region_ alignas(std::max_align_t); /// Ensures a MemPool is not used by two threads concurrently. DFAKE_MUTEX(mutex_); /// chunk from which we served the last Allocate() call; /// always points to the last chunk that contains allocated data; /// chunks 0..current_chunk_idx_ - 1 are guaranteed to contain data /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_ - 1); /// chunks after 'current_chunk_idx_' are "free chunks" that contain no data. /// -1 if no chunks present int current_chunk_idx_; /// The size of the next chunk to allocate. int next_chunk_size_; /// sum of allocated_bytes_ int64_t total_allocated_bytes_; /// sum of all bytes allocated in chunks_ int64_t total_reserved_bytes_; std::vector<ChunkInfo> chunks_; /// The current and peak memory footprint of this pool. This is different from /// total allocated_bytes_ since it includes bytes in chunks that are not used. MemTracker* mem_tracker_; /// If set to true, all chunk sizes allocated will be rounded up to the next power of /// two. const bool enforce_binary_chunk_sizes_; MemPoolCounters counters_; /// Find or allocated a chunk with at least min_size spare capacity and update /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_ /// if a new chunk needs to be created. /// If check_limits is true, this call can fail (returns false) if adding a /// new chunk exceeds the mem limits. bool FindChunk(int64_t min_size, bool check_limits) noexcept; /// Check integrity of the supporting data structures; always returns true but DCHECKs /// all invariants. /// If 'check_current_chunk_empty' is true, checks that the current chunk contains no /// data. Otherwise the current chunk can be either empty or full. bool CheckIntegrity(bool check_current_chunk_empty); /// Return offset to unoccupied space in current chunk. int64_t GetFreeOffset() const { if (current_chunk_idx_ == -1) return 0; return chunks_[current_chunk_idx_].allocated_bytes; } template <bool CHECK_LIMIT_FIRST> uint8_t* ALWAYS_INLINE Allocate(int64_t size, int alignment) noexcept { DCHECK_GE(size, 0); if (UNLIKELY(size == 0)) return reinterpret_cast<uint8_t*>(&zero_length_region_); if (current_chunk_idx_ != -1) { ChunkInfo& info = chunks_[current_chunk_idx_]; int64_t aligned_allocated_bytes = BitUtil::RoundUpToPowerOf2( info.allocated_bytes, alignment); if (aligned_allocated_bytes + size <= info.size) { // Ensure the requested alignment is respected. int64_t padding = aligned_allocated_bytes - info.allocated_bytes; uint8_t* result = info.data + aligned_allocated_bytes; ASAN_UNPOISON_MEMORY_REGION(result, size); DCHECK_LE(info.allocated_bytes + size, info.size); info.allocated_bytes += padding + size; total_allocated_bytes_ += padding + size; DCHECK_LE(current_chunk_idx_, chunks_.size() - 1); return result; } } // If we couldn't allocate a new chunk, return NULL. malloc() guarantees alignment // of alignof(std::max_align_t), so we do not need to do anything additional to // guarantee alignment. static_assert( INITIAL_CHUNK_SIZE >= alignof(std::max_align_t), "Min chunk size too low"); if (UNLIKELY(!FindChunk(size, CHECK_LIMIT_FIRST))) return NULL; ChunkInfo& info = chunks_[current_chunk_idx_]; uint8_t* result = info.data + info.allocated_bytes; ASAN_UNPOISON_MEMORY_REGION(result, size); DCHECK_LE(info.allocated_bytes + size, info.size); info.allocated_bytes += size; total_allocated_bytes_ += size; DCHECK_LE(current_chunk_idx_, chunks_.size() - 1); return result; } }; // Stamp out templated implementations here so they're included in IR module template uint8_t* MemPool::Allocate<false>(int64_t size, int alignment) noexcept; template uint8_t* MemPool::Allocate<true>(int64_t size, int alignment) noexcept; } #endif