src/traffic_cache_tool/CacheTool.cc (1,211 lines of code) (raw):

/** @file Main program file for Cache Tool. @section license License Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include <iostream> #include <list> #include <memory> #include <vector> #include <map> #include <system_error> #include <fcntl.h> #include <cctype> #include <cstring> #include <vector> #include <unordered_set> #include <ctime> #include <bitset> #include <cinttypes> #include "tscore/ink_memory.h" #include "tscore/ink_file.h" #include "tscore/CryptoHash.h" #include "tscore/ArgParser.h" #include <thread> #include "CacheDefs.h" #include "CacheScan.h" using swoc::Errata; using swoc::MemSpan; using ts::make_errno_code; using ts::Bytes; using ts::CacheDirEntry; using ts::CacheStoreBlocks; using ts::CacheStripeBlocks; using ts::CacheStripeDescriptor; using ts::Doc; using ts::Megabytes; using ts::StripeMeta; enum { SILENT = 0, NORMAL, VERBOSE } Verbosity = NORMAL; extern int cache_config_min_average_object_size; extern CacheStoreBlocks Vol_hash_alloc_size; extern int OPEN_RW_FLAG; const Bytes ts::CacheSpan::OFFSET{CacheStoreBlocks{1}}; swoc::file::path SpanFile; swoc::file::path VolumeFile; ts::ArgParser parser; Errata err; namespace ct { /* --------------------------------------------------------------------------------------- */ /// A live volume. /// Volume data based on data from loaded spans. struct Volume { int _idx; ///< Volume index. CacheStoreBlocks _size; ///< Amount of storage allocated. std::vector<StripeSM *> _stripes; /// Remove all data related to @a span. // void clearSpan(Span *span); /// Remove all allocated space and stripes. void clear(); }; #if 0 void Volume::clearSpan(Span* span) { auto spot = std::remove_if(_stripes.begin(), _stripes.end(), [span,this](StripeSM* stripe) { return stripe->_span == span ? ( this->_size -= stripe->_len , true ) : false; }); _stripes.erase(spot, _stripes.end()); } #endif void Volume::clear() { _size.assign(0); _stripes.clear(); } /* --------------------------------------------------------------------------------------- */ /// Data parsed from the volume config file. struct VolumeConfig { Errata load(swoc::file::path const &path); /// Data direct from the config file. struct Data { int _idx = 0; ///< Volume index. int _percent = 0; ///< Size if specified as a percent. Megabytes _size{0}; ///< Size if specified as an absolute. CacheStripeBlocks _alloc; ///< Allocation size. // Methods handy for parsing bool hasSize() const { return _percent > 0 || _size > 0; } bool hasIndex() const { return _idx > 0; } }; std::vector<Data> _volumes; using iterator = std::vector<Data>::iterator; using const_iterator = std::vector<Data>::const_iterator; iterator begin() { return _volumes.begin(); } iterator end() { return _volumes.end(); } #if 0 const_iterator begin() const { return _volumes.begin(); } const_iterator end() const { return _volumes.end(); } #endif // Errata validatePercentAllocation(); void convertToAbsolute(const ts::CacheStripeBlocks &total_span_size); }; #if 0 Errata VolumeConfig::validatePercentAllocation() { Errata zret; int n = 0; for (auto &vol : _volumes) n += vol._percent; if (n > 100) zret.push(0, 10, "Volume percent allocation ", n, " is more than 100%"); return zret; } #endif void VolumeConfig::convertToAbsolute(const ts::CacheStripeBlocks &n) { for (auto &vol : _volumes) { if (vol._percent) { vol._alloc.assign((n.count() * vol._percent + 99) / 100); } else { vol._alloc = round_up(vol._size); } } } /* --------------------------------------------------------------------------------------- */ struct Cache { ~Cache(); Errata loadSpan(swoc::file::path const &path); Errata loadSpanConfig(swoc::file::path const &path); Errata loadSpanDirect(swoc::file::path const &path, int vol_idx = -1, const Bytes &size = Bytes(-1)); Errata loadURLs(swoc::file::path const &path); Errata allocStripe(Span *span, int vol_idx, const CacheStripeBlocks &len); /// Change the @a span to have a single, unused stripe occupying the entire @a span. // void clearSpan(Span *span); /// Clear all allocated space. void clearAllocation(); enum class SpanDumpDepth { SPAN, STRIPE, DIRECTORY }; void dumpSpans(SpanDumpDepth depth); void dumpVolumes(); void build_stripe_hash_table(); StripeSM *key_to_stripe(CryptoHash *key, const char *hostname, int host_len); // ts::CacheStripeBlocks calcTotalSpanPhysicalSize(); ts::CacheStripeBlocks calcTotalSpanConfiguredSize(); std::list<Span *> _spans; std::map<int, Volume> _volumes; std::vector<StripeSM *> globalVec_stripe; std::unordered_set<ts::CacheURL *> URLset; unsigned short *stripes_hash_table; }; Errata Cache::allocStripe(Span *span, int vol_idx, const CacheStripeBlocks &len) { auto rv = span->allocStripe(vol_idx, len); std::cout << span->_path.string() << ":" << vol_idx << std::endl; if (rv.is_ok()) { _volumes[vol_idx]._stripes.push_back(rv); } return std::move(rv.errata()); } #if 0 void Cache::clearSpan(Span* span) { for ( auto& item : _volumes ) item.second.clearSpan(span); span->clear(); } #endif void Cache::clearAllocation() { for (auto span : _spans) { span->clear(); } for (auto &item : _volumes) { item.second.clear(); } } /* --------------------------------------------------------------------------------------- */ /// Temporary structure used for doing allocation computations. class VolumeAllocator { /// Working struct that tracks allocation information. struct V { VolumeConfig::Data const &_config; ///< Configuration instance. CacheStripeBlocks _size; ///< Current actual size. int64_t _deficit; ///< fractional deficit int64_t _shares; ///< relative amount of free space to allocate V(VolumeConfig::Data const &config, const CacheStripeBlocks &size, int64_t deficit = 0, int64_t shares = 0) : _config(config), _size(size), _deficit(deficit), _shares(shares) { } V(const V &v) = default; V & operator=(V const &that) { new (this) V(that._config, that._size, that._deficit, that._shares); return *this; } }; using AV = std::vector<V>; AV _av; ///< Working vector of volume data. Cache _cache; ///< Current state. VolumeConfig _vols; ///< Configuration state. public: VolumeAllocator(); Errata load(swoc::file::path const &spanFile, swoc::file::path const &volumeFile); Errata fillEmptySpans(); Errata fillAllSpans(); Errata allocateSpan(swoc::file::path const &spanFile); void dumpVolumes(); protected: /// Update the allocation for a span. Errata allocateFor(Span &span); }; VolumeAllocator::VolumeAllocator() = default; Errata VolumeAllocator::load(swoc::file::path const &spanFile, swoc::file::path const &volumeFile) { Errata zret; if (volumeFile.empty()) { zret.note("Volume config file not set"); } if (spanFile.empty()) { zret.note("Span file not set"); } if (zret) { zret = _vols.load(volumeFile); if (zret) { zret = _cache.loadSpan(spanFile); if (zret) { CacheStripeBlocks total = _cache.calcTotalSpanConfiguredSize(); _vols.convertToAbsolute(total); for (auto &vol : _vols) { CacheStripeBlocks size(0); auto spot = _cache._volumes.find(vol._idx); if (spot != _cache._volumes.end()) { size = round_down(spot->second._size); } _av.push_back({vol, size, 0, 0}); } } } } return zret; } void VolumeAllocator::dumpVolumes() { _cache.dumpVolumes(); } Errata VolumeAllocator::fillEmptySpans() { Errata zret; // Walk the spans, skipping ones that are not empty. for (auto span : _cache._spans) { if (span->isEmpty()) { this->allocateFor(*span); } } return zret; } Errata VolumeAllocator::allocateSpan(swoc::file::path const &input_file_path) { Errata zret; for (auto span : _cache._spans) { if (span->_path.view() == input_file_path.view()) { std::cout << "===============================" << std::endl; if (span->_header) { zret.note("Disk already initialized with valid header"); } else { this->allocateFor(*span); span->updateHeader(); for (auto &strp : span->_stripes) { strp->updateHeaderFooter(); } } } } for (auto &_v : _av) { std::cout << _v._size << std::endl; } return zret; } Errata VolumeAllocator::fillAllSpans() { Errata zret; // clear all current volume allocations. for (auto &v : _av) { v._size.assign(0); } // Allocate for each span, clearing as it goes. _cache.clearAllocation(); for (auto span : _cache._spans) { this->allocateFor(*span); } return zret; } Errata VolumeAllocator::allocateFor(Span &span) { Errata zret; /// Scaling factor for shares, effectively the accuracy. static const int64_t SCALE = 1000; int64_t total_shares = 0; if (Verbosity >= NORMAL) { std::cout << "Allocating " << CacheStripeBlocks(round_down(span._len)).count() << " stripe blocks from span " << span._path.string() << std::endl; } // Walk the volumes and get the relative allocations. for (auto &v : _av) { auto delta = v._config._alloc - v._size; if (delta > 0) { v._deficit = (delta.count() * SCALE) / v._config._alloc.count(); v._shares = delta.count() * v._deficit; total_shares += v._shares; } else { v._shares = 0; } } assert(total_shares != 0); // Now allocate blocks. CacheStripeBlocks span_blocks(round_down(span._free_space)); CacheStripeBlocks span_used{0}; // sort by deficit so least relatively full volumes go first. std::sort(_av.begin(), _av.end(), [](V const &lhs, V const &rhs) { return lhs._deficit > rhs._deficit; }); for (auto &v : _av) { if (v._shares) { CacheStripeBlocks n{(((span_blocks - span_used).count() * v._shares) + total_shares - 1) / total_shares}; CacheStripeBlocks delta{v._config._alloc - v._size}; // Not sure why this is needed. But a large and empty volume can dominate the shares // enough to get more than it actually needs if the other volume are relative small or full. // I need to do more math to see if the weighting can be adjusted to not have this happen. n = std::min(n, delta); v._size += n; span_used += n; total_shares -= v._shares; Errata z = _cache.allocStripe(&span, v._config._idx, round_up(n)); if (Verbosity >= NORMAL) { std::cout << " " << n << " to volume " << v._config._idx << std::endl; } if (!z) { std::cout << z; } } } if (Verbosity >= NORMAL) { std::cout << " Total " << span_used << std::endl; } if (OPEN_RW_FLAG) { if (Verbosity >= NORMAL) { std::cout << " Updating Header ... "; } zret = span.updateHeader(); } _cache.dumpVolumes(); // debug if (Verbosity >= NORMAL) { if (zret) { std::cout << " Done" << std::endl; } else { std::cout << " Error" << std::endl << zret; } } return zret; } /* --------------------------------------------------------------------------------------- */ Errata Cache::loadSpan(swoc::file::path const &path) { Errata zret; std::error_code ec; auto fs = swoc::file::status(path, ec); if (path.empty()) { zret = Errata(make_errno_code(EINVAL), "A span file specified by --spans is required"); } else if (!swoc::file::is_readable(path)) { zret = Errata(make_errno_code(EPERM), R"("{}" is not readable.)", path); } else if (swoc::file::is_regular_file(fs)) { zret = this->loadSpanConfig(path); } else { zret = this->loadSpanDirect(path); } return zret; } Errata Cache::loadSpanDirect(swoc::file::path const &path, int vol_idx, [[maybe_unused]] const Bytes &size) { Errata zret; std::unique_ptr<Span> span(new Span(path)); zret = span->load(); if (zret) { if (span->_header) { int nspb = span->_header->num_diskvol_blks; for (auto i = 0; i < nspb; ++i) { ts::CacheStripeDescriptor &raw = span->_header->stripes[i]; StripeSM *stripe = new StripeSM(span.get(), raw.offset, raw.len); stripe->_idx = i; if (raw.free == 0) { stripe->_vol_idx = raw.vol_idx; stripe->_type = raw.type; _volumes[stripe->_vol_idx]._stripes.push_back(stripe); _volumes[stripe->_vol_idx]._size += stripe->_len; stripe->vol_init_data(); } else { span->_free_space += stripe->_len; } span->_stripes.push_back(stripe); globalVec_stripe.push_back(stripe); } span->_vol_idx = vol_idx; } else { span->clear(); } _spans.push_back(span.release()); } return zret; } Errata Cache::loadSpanConfig(swoc::file::path const &path) { static const swoc::TextView TAG_ID("id"); static const swoc::TextView TAG_VOL("volume"); Errata zret; std::error_code ec; std::string load_content = swoc::file::load(path, ec); if (ec.value() == 0) { swoc::TextView content(load_content); while (content) { swoc::TextView line = content.take_prefix_at('\n'); line.ltrim_if(&isspace); if (line.empty() || '#' == *line) { continue; } swoc::TextView localpath = line.take_prefix_if(&isspace); if (localpath) { // After this the line is [size] [id=string] [volume=#] while (line) { swoc::TextView value(line.take_prefix_if(&isspace)); if (value) { swoc::TextView tag(value.take_prefix_at('=')); if (!tag) { // must be the size } else if (0 == strcasecmp(tag, TAG_ID)) { } else if (0 == strcasecmp(tag, TAG_VOL)) { swoc::TextView text; auto n = swoc::svtoi(value, &text); if (text == value && 0 < n && n < 256) { } else { zret.note("Invalid volume index '{}'", value); } } } } zret = this->loadSpan(swoc::file::path(localpath)); } } } else { zret = Errata(make_errno_code(EBADF), "Unable to load {}", path); } return zret; } Errata Cache::loadURLs(swoc::file::path const &path) { static const swoc::TextView TAG_VOL("url"); ts::URLparser loadURLparser; Errata zret; std::error_code ec; std::string load_content = swoc::file::load(path, ec); if (ec.value() == 0) { swoc::TextView content(load_content); while (!content.empty()) { swoc::TextView blob = content.take_prefix_at('\n'); swoc::TextView tag(blob.take_prefix_at('=')); if (tag.empty()) { } else if (0 == strcasecmp(tag, TAG_VOL)) { std::string url; url.assign(blob.data(), blob.size()); int port_ptr = -1, port_len = -1; int port = loadURLparser.getPort(url, port_ptr, port_len); if (port_ptr >= 0 && port_len > 0) { url.erase(port_ptr, port_len + 1); // get rid of :PORT } std::cout << "port # " << port << ":" << port_ptr << ":" << port_len << ":" << url << std::endl; ts::CacheURL *curl = new ts::CacheURL(url, port); URLset.emplace(curl); } } } else { zret = Errata(make_errno_code(EBADF), "Unable to load ", path.string()); } return zret; } void Cache::dumpSpans(SpanDumpDepth depth) { if (depth >= SpanDumpDepth::SPAN) { for (auto span : _spans) { if (nullptr == span->_header) { std::cout << "Span: " << span->_path.string() << " is uninitialized" << std::endl; } else { std::cout << "\n----------------------------------\n" << "Span: " << span->_path.string() << "\n----------------------------------\n" << "#Magic: " << span->_header->magic << " #Volumes: " << span->_header->num_volumes << " #in use: " << span->_header->num_used << " #free: " << span->_header->num_free << " #stripes: " << span->_header->num_diskvol_blks << " Len(bytes): " << span->_header->num_blocks.value() << std::endl; for (auto stripe : span->_stripes) { std::cout << "\n>>>>>>>>> Stripe " << static_cast<int>(stripe->_idx) << " @ " << stripe->_start << " len=" << stripe->_len.count() << " blocks " << " vol=" << static_cast<int>(stripe->_vol_idx) << " type=" << static_cast<int>(stripe->_type) << " " << (stripe->isFree() ? "free" : "in-use") << std::endl; std::cout << " " << stripe->_segments << " segments with " << stripe->_buckets << " buckets per segment for " << stripe->_buckets * stripe->_segments * ts::ENTRIES_PER_BUCKET << " total directory entries taking " << stripe->_buckets * stripe->_segments * sizeof(CacheDirEntry) * ts::ENTRIES_PER_BUCKET // << " out of " << (delta-header_len).units() << " bytes." << std::endl; if (depth >= SpanDumpDepth::STRIPE) { Errata r = stripe->loadMeta(); if (r) { // print Meta[A][HEAD] std::string MetaCopy[2] = {"A", "B"}; std::string MetaType[2] = {"HEAD", "FOOT"}; for (int i = 0; i < 2; i++) { for (int j = 0; j < 2; j++) { std::cout << "\n" << MetaCopy[i] << ":" << MetaType[j] << "\n" << std::endl; std::cout << " Magic:" << stripe->_meta[i][j].magic << "\n version: major: " << stripe->_meta[i][j].version._major << "\n version: minor: " << stripe->_meta[i][j].version._minor << "\n create_time: " << stripe->_meta[i][j].create_time << "\n write_pos: " << stripe->_meta[i][j].write_pos << "\n last_write_pos: " << stripe->_meta[i][j].last_write_pos << "\n agg_pos: " << stripe->_meta[i][j].agg_pos << "\n generation: " << stripe->_meta[i][j].generation << "\n phase: " << stripe->_meta[i][j].phase << "\n cycle: " << stripe->_meta[i][j].cycle << "\n sync_serial: " << stripe->_meta[i][j].sync_serial << "\n write_serial: " << stripe->_meta[i][j].write_serial << "\n dirty: " << stripe->_meta[i][j].dirty << "\n sector_size: " << stripe->_meta[i][j].sector_size << std::endl; } } if (!stripe->validate_sync_serial()) { std::cout << "WARNING:::::Validity check failed for sync_serials" << std::endl; } stripe->_directory.clear(); } else { std::cout << r; } } } } } } } void Cache::dumpVolumes() { for (auto const &elt : _volumes) { size_t size = 0; for (auto const &r : elt.second._stripes) { size += r->_len; } std::cout << "Volume " << elt.first << " has " << elt.second._stripes.size() << " stripes and " << size << " bytes" << std::endl; } } ts::CacheStripeBlocks Cache::calcTotalSpanConfiguredSize() { ts::CacheStripeBlocks zret(0); for (auto span : _spans) { zret += round_down(span->_len); } return zret; } #if 0 ts::CacheStripeBlocks Cache::calcTotalSpanPhysicalSize() { ts::CacheStripeBlocks zret(0); for (auto span : _spans) { // This is broken, physical_size doesn't work for devices, need to fix that. zret += round_down(span->_path.physical_size()); } return zret; } #endif Cache::~Cache() { for (auto *span : _spans) { delete span; } } Errata Span::load() { Errata zret; std::error_code ec; auto fs = swoc::file::status(_path, ec); if (!swoc::file::is_readable(_path)) { return Errata(make_errno_code(EPERM), R"("{}" is not readable.)", _path); } if (swoc::file::is_char_device(fs) || swoc::file::is_block_device(fs)) { return this->loadDevice(); } if (swoc::file::is_dir(fs)) { return Errata("Directory support not yet available"); } return Errata(make_errno_code(EBADF), R"("{}" is not a valid file type)", _path); } Errata Span::loadDevice() { Errata zret; int flags; flags = OPEN_RW_FLAG #if defined(O_DIRECT) | O_DIRECT #endif #if defined(O_DSYNC) | O_DSYNC #endif ; ats_scoped_fd fd(!_path.empty() ? ::open(_path.c_str(), flags) : ats_scoped_fd()); if (fd.isValid()) { if (ink_file_get_geometry(fd, _geometry)) { off_t offset = ts::CacheSpan::OFFSET; CacheStoreBlocks span_hdr_size(1); // default. static const ssize_t BUFF_SIZE = CacheStoreBlocks::SCALE; // match default span_hdr_size alignas(512) char buff[BUFF_SIZE]; ssize_t n = pread(fd, buff, BUFF_SIZE, offset); if (n >= BUFF_SIZE) { ts::SpanHeader &span_hdr = reinterpret_cast<ts::SpanHeader &>(buff); _base = round_up(offset); // See if it looks valid if (span_hdr.magic == ts::SpanHeader::MAGIC && span_hdr.num_diskvol_blks == span_hdr.num_used + span_hdr.num_free) { int nspb = span_hdr.num_diskvol_blks; span_hdr_size = round_up(sizeof(ts::SpanHeader) + (nspb - 1) * sizeof(ts::CacheStripeDescriptor)); _header.reset(new (malloc(span_hdr_size)) ts::SpanHeader); if (span_hdr_size <= BUFF_SIZE) { memcpy(static_cast<void *>(_header.get()), buff, span_hdr_size); } else { // TODO - check the pread return ssize_t n = pread(fd, _header.get(), span_hdr_size, offset); if (n < span_hdr_size) { std::cout << "Failed to read the Span Header" << std::endl; } } _len = _header->num_blocks; } else { zret.note("{} header is uninitialized or invalid", _path); std::cout << "Span: " << _path.string() << " header is uninitialized or invalid" << std::endl; _len = round_down(_geometry.totalsz) - _base; } // valid FD means the device is accessible and has enough storage to be configured. _fd = fd.release(); _offset = _base + span_hdr_size; } else { zret = Errata(make_errno_code(), "Failed to read from {}", _path); } } else { zret = Errata("Unable to get device geometry for {}", _path); } } else { zret = Errata(make_errno_code(), "Unable to open {}", _path); } return zret; } swoc::Rv<StripeSM *> Span::allocStripe(int vol_idx, const CacheStripeBlocks &len) { for (auto spot = _stripes.begin(), limit = _stripes.end(); spot != limit; ++spot) { StripeSM *stripe = *spot; if (stripe->isFree()) { if (len < stripe->_len) { // If the remains would be less than a stripe block, just take it all. if (stripe->_len <= (len + CacheStripeBlocks(1))) { stripe->_vol_idx = vol_idx; stripe->_type = 1; return stripe; } else { StripeSM *ns = new StripeSM(this, stripe->_start, len); stripe->_start += len; stripe->_len -= len; ns->_vol_idx = vol_idx; ns->_type = 1; _stripes.insert(spot, ns); return ns; } } } } return Errata("Failed to allocate stripe of size {} - no free block large enough", len); } bool Span::isEmpty() const { return std::all_of(_stripes.begin(), _stripes.end(), [](StripeSM *s) { return s->_vol_idx == 0; }); } Errata Span::clear() { StripeSM *stripe; std::for_each(_stripes.begin(), _stripes.end(), [](StripeSM *s) { delete s; }); _stripes.clear(); // Gah, due to lack of anything better, TS depends on the number of usable blocks to be consistent // with internal calculations so have to match that here. Yay. CacheStoreBlocks eff = _len - _base; // starting # of usable blocks. // The maximum number of volumes that can store stored, accounting for the space used to store the descriptors. int n = (eff - sizeof(ts::SpanHeader)) / (CacheStripeBlocks::SCALE + sizeof(CacheStripeDescriptor)); _offset = _base + round_up(sizeof(ts::SpanHeader) + (n - 1) * sizeof(CacheStripeDescriptor)); stripe = new StripeSM(this, _offset, _len - _offset); stripe->vol_init_data(); stripe->InitializeMeta(); _stripes.push_back(stripe); _free_space = stripe->_len; return Errata(); } Errata Span::updateHeader() { Errata zret; int n = _stripes.size(); CacheStripeDescriptor *sd; CacheStoreBlocks hdr_size = round_up(sizeof(ts::SpanHeader) + (n - 1) * sizeof(ts::CacheStripeDescriptor)); void *raw = ats_memalign(512, hdr_size); ts::SpanHeader *hdr = static_cast<ts::SpanHeader *>(raw); std::bitset<ts::MAX_VOLUME_IDX + 1> volume_mask; hdr->magic = ts::SpanHeader::MAGIC; hdr->num_free = 0; hdr->num_used = 0; hdr->num_diskvol_blks = n; hdr->num_blocks = _len; sd = hdr->stripes; for (auto stripe : _stripes) { sd->offset = stripe->_start; sd->len = stripe->_len; sd->vol_idx = stripe->_vol_idx; sd->type = stripe->_type; volume_mask[sd->vol_idx] = true; if (sd->vol_idx == 0) { sd->free = true; ++(hdr->num_free); } else { sd->free = false; ++(hdr->num_used); } ++sd; } volume_mask[0] = false; // don't include free stripes in distinct volume count. hdr->num_volumes = volume_mask.count(); _header.reset(hdr); if (OPEN_RW_FLAG) { ssize_t r = pwrite(_fd, hdr, hdr_size, ts::CacheSpan::OFFSET); if (r < ts::CacheSpan::OFFSET) { zret = Errata(make_errno_code(errno), "Failed to update span."); } } else { std::cout << "Writing not enabled, no updates performed" << std::endl; } return zret; } void Span::clearPermanently() { if (OPEN_RW_FLAG) { alignas(512) static char zero[CacheStoreBlocks::SCALE]; // should be all zero, it's static. std::cout << "Clearing " << _path.string() << " permanently on disk "; ssize_t n = pwrite(_fd, zero, sizeof(zero), ts::CacheSpan::OFFSET); if (n == sizeof(zero)) { std::cout << "done"; } else { const char *text = strerror(errno); std::cout << "failed"; if (n >= 0) { std::cout << " - " << n << " of " << sizeof(zero) << " bytes written"; } std::cout << " - " << text; } std::cout << std::endl; // clear the stripes as well for (auto *strp : _stripes) { strp->loadMeta(); std::cout << "Clearing stripe @" << strp->_start << " of length: " << strp->_len << std::endl; strp->clear(); } } else { std::cout << "Clearing " << _path.string() << " not performed, write not enabled" << std::endl; } } // explicit pair for random table in build_vol_hash_table struct rtable_pair { unsigned int rval; ///< relative value, used to sort. unsigned int idx; ///< volume mapping table index. }; // comparison operator for random table in build_vol_hash_table // sorts based on the randomly assigned rval static int cmprtable(const void *aa, const void *bb) { rtable_pair *a = (rtable_pair *)aa; rtable_pair *b = (rtable_pair *)bb; if (a->rval < b->rval) { return -1; } if (a->rval > b->rval) { return 1; } return 0; } unsigned int next_rand(unsigned int *p) { unsigned int seed = *p; seed = 1103515145 * seed + 12345; *p = seed; return seed; } void Cache::build_stripe_hash_table() { int num_stripes = globalVec_stripe.size(); CacheStoreBlocks total; unsigned int *forvol = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_stripes)); unsigned int *gotvol = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_stripes)); unsigned int *rnd = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_stripes)); unsigned short *ttable = static_cast<unsigned short *>(ats_malloc(sizeof(unsigned short) * STRIPE_HASH_TABLE_SIZE)); unsigned int *rtable_entries = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_stripes)); unsigned int rtable_size = 0; int i = 0; uint64_t used = 0; // estimate allocation for (auto &elt : globalVec_stripe) { // printf("stripe length %" PRId64 "\n", elt->_len.count()); rtable_entries[i] = static_cast<int64_t>(elt->_len) / Vol_hash_alloc_size; rtable_size += rtable_entries[i]; uint64_t x = elt->hash_id.fold(); // seed random number generator rnd[i] = static_cast<unsigned int>(x); total += elt->_len; i++; } i = 0; for (auto &elt : globalVec_stripe) { forvol[i] = total ? static_cast<int64_t>(STRIPE_HASH_TABLE_SIZE * elt->_len) / total : 0; used += forvol[i]; gotvol[i] = 0; i++; } // spread around the excess int extra = STRIPE_HASH_TABLE_SIZE - used; for (int i = 0; i < extra; i++) { forvol[i % num_stripes]++; } // initialize table to "empty" for (int i = 0; i < STRIPE_HASH_TABLE_SIZE; i++) { ttable[i] = STRIPE_HASH_EMPTY; } // generate random numbers proportional to allocation rtable_pair *rtable = static_cast<rtable_pair *>(ats_malloc(sizeof(rtable_pair) * rtable_size)); int rindex = 0; for (int i = 0; i < num_stripes; i++) { for (int j = 0; j < static_cast<int>(rtable_entries[i]); j++) { rtable[rindex].rval = next_rand(&rnd[i]); rtable[rindex].idx = i; rindex++; } } assert(rindex == (int)rtable_size); // sort (rand #, vol $ pairs) qsort(rtable, rtable_size, sizeof(rtable_pair), cmprtable); unsigned int width = (1LL << 32) / STRIPE_HASH_TABLE_SIZE; unsigned int pos; // target position to allocate // select vol with closest random number for each bucket i = 0; // index moving through the random numbers for (int j = 0; j < STRIPE_HASH_TABLE_SIZE; j++) { pos = width / 2 + j * width; // position to select closest to while (pos > rtable[i].rval && i < static_cast<int>(rtable_size) - 1) { i++; } ttable[j] = rtable[i].idx; gotvol[rtable[i].idx]++; } for (int i = 0; i < num_stripes; i++) { printf("build_vol_hash_table index %d mapped to %d requested %d got %d\n", i, i, forvol[i], gotvol[i]); } stripes_hash_table = ttable; ats_free(forvol); ats_free(gotvol); ats_free(rnd); ats_free(rtable_entries); ats_free(rtable); } StripeSM * Cache::key_to_stripe(CryptoHash *key, [[maybe_unused]] const char *hostname, [[maybe_unused]] int host_len) { uint32_t h = (key->slice32(2) >> DIR_TAG_WIDTH) % STRIPE_HASH_TABLE_SIZE; return globalVec_stripe[stripes_hash_table[h]]; } /* --------------------------------------------------------------------------------------- */ Errata VolumeConfig::load(swoc::file::path const &path) { static const swoc::TextView TAG_SIZE("size"); static const swoc::TextView TAG_VOL("volume"); Errata zret; int ln = 0; std::error_code ec; std::string load_content = swoc::file::load(path, ec); if (ec.value() == 0) { swoc::TextView content(load_content); while (content) { Data v; ++ln; swoc::TextView line = content.take_prefix_at('\n'); line.ltrim_if(&isspace); if (line.empty() || '#' == *line) { continue; } while (line) { swoc::TextView value(line.take_prefix_if(&isspace)); swoc::TextView tag(value.take_prefix_at('=')); if (tag.empty()) { zret.note("Line {} is invalid", ln); } else if (0 == strcasecmp(tag, TAG_SIZE)) { if (v.hasSize()) { zret.note("Line {} has field {} more than once", ln, TAG_SIZE); } else { swoc::TextView text; auto n = swoc::svtoi(value, &text); if (text) { swoc::TextView percent(text.data_end(), value.data_end()); // clip parsed number. if (percent.empty()) { v._size = CacheStripeBlocks(round_up(Megabytes(n))); if (v._size.count() != n) { zret.note("Line {} size {} was rounded up to {}", ln, n, v._size); } } else if ('%' == *percent && percent.size() == 1) { v._percent = n; } else { zret.note("Line {} has invalid value '{}' for {} field", ln, value, TAG_SIZE); } } else { zret.note("Line {} has invalid value '{}' for {} field", ln, value, TAG_SIZE); } } } else if (0 == strcasecmp(tag, TAG_VOL)) { if (v.hasIndex()) { zret.note("Line {} has field {} more than once", ln, TAG_VOL); } else { swoc::TextView text; auto n = swoc::svtoi(value, &text); if (text == value) { v._idx = n; } else { zret.note("Line {} has invalid value '{}' for {} field", ln, value, TAG_VOL); } } } } if (v.hasSize() && v.hasIndex()) { _volumes.push_back(std::move(v)); } else { if (!v.hasSize()) { zret.note("Line {} does not have the required field {}", ln, TAG_SIZE); } if (!v.hasIndex()) { zret.note("Line {} does not have the required field {}", ln, TAG_VOL); } } } } else { zret = Errata(make_errno_code(EBADF), "Unable to load ", path); } return zret; } } // namespace ct using namespace ct; void List_Stripes(Cache::SpanDumpDepth depth) { Cache cache; if ((err = cache.loadSpan(SpanFile))) { cache.dumpSpans(depth); cache.dumpVolumes(); } } void Cmd_Allocate_Empty_Spans() { VolumeAllocator va; // OPEN_RW_FLAG = O_RDWR; if ((err = va.load(SpanFile, VolumeFile))) { va.fillEmptySpans(); } } void Simulate_Span_Allocation() { VolumeAllocator va; if (VolumeFile.empty()) { err.note("Volume config file not set"); } if (SpanFile.empty()) { err.note("Span file not set"); } if (err) { if ((err = va.load(SpanFile, VolumeFile)).is_ok()) { err = va.fillAllSpans(); va.dumpVolumes(); } } } void Clear_Spans() { Cache cache; if (!OPEN_RW_FLAG) { err.note("Writing Not Enabled.. Please use --write to enable writing to disk"); return; } if ((err = cache.loadSpan(SpanFile))) { for (auto *span : cache._spans) { span->clearPermanently(); } } } void Find_Stripe(swoc::file::path const &input_file_path) { // scheme=http user=u password=p host=172.28.56.109 path=somepath query=somequery port=1234 // input file format: scheme://hostname:port/somepath;params?somequery user=USER password=PASS // user, password, are optional; scheme and host are required // char* h= http://user:pass@IPADDRESS/path_to_file;?port <== this is the format we need // url_matcher matcher; // if (matcher.match(h)) // std::cout << h << " : is valid" << std::endl; // else // std::cout << h << " : is NOT valid" << std::endl; Cache cache; if (!input_file_path.empty()) { printf("passed argv %s\n", input_file_path.c_str()); } cache.loadURLs(input_file_path); if ((err = cache.loadSpan(SpanFile))) { cache.dumpSpans(Cache::SpanDumpDepth::SPAN); cache.build_stripe_hash_table(); for (auto host : cache.URLset) { CryptoContext ctx; CryptoHash hashT; swoc::LocalBufferWriter<33> w; ctx.update(host->url.data(), host->url.size()); ctx.update(&host->port, sizeof(host->port)); ctx.finalize(hashT); StripeSM *stripe_ = cache.key_to_stripe(&hashT, host->url.data(), host->url.size()); w.print("{}", hashT); printf("hash of %.*s is %.*s: Stripe %s \n", static_cast<int>(host->url.size()), host->url.data(), static_cast<int>(w.size()), w.data(), stripe_->hashText.data()); } } } void dir_check() { Cache cache; if ((err = cache.loadSpan(SpanFile))) { cache.dumpSpans(Cache::SpanDumpDepth::SPAN); for (auto &stripe : cache.globalVec_stripe) { stripe->dir_check(); } } printf("\nCHECK succeeded\n"); } void walk_bucket_chain(const std::string &devicePath) { Cache cache; if ((err = cache.loadSpan(SpanFile))) { cache.dumpSpans(Cache::SpanDumpDepth::SPAN); for (auto sp : cache._spans) { if (devicePath.size() > 0 && sp->_path.view() == devicePath) { for (auto strp : sp->_stripes) { strp->loadMeta(); strp->loadDir(); strp->walk_all_buckets(); } } } } } void Clear_Span(const std::string &devicePath) { Cache cache; if ((err = cache.loadSpan(SpanFile))) { cache.dumpSpans(Cache::SpanDumpDepth::SPAN); for (auto sp : cache._spans) { if (devicePath.size() > 0 && sp->_path.view() == devicePath) { printf("clearing %s\n", devicePath.data()); sp->clearPermanently(); } } } return; } void Check_Freelist(const std::string &devicePath) { Cache cache; if ((err = cache.loadSpan(SpanFile))) { cache.dumpSpans(Cache::SpanDumpDepth::SPAN); for (auto sp : cache._spans) { if (devicePath.size() > 0 && sp->_path.view() == devicePath) { printf("Scanning %s\n", devicePath.data()); for (auto strp : sp->_stripes) { strp->loadMeta(); strp->loadDir(); for (int s = 0; s < strp->_segments; s++) { strp->check_loop(s); } } break; } } } } void Init_disk(swoc::file::path const &input_file_path) { Cache cache; VolumeAllocator va; if (!OPEN_RW_FLAG) { err.note("Writing Not Enabled.. Please use --write to enable writing to disk"); return; } err = va.load(SpanFile, VolumeFile); va.allocateSpan(input_file_path); } void Get_Response(swoc::file::path const &input_file_path) { // scheme=http user=u password=p host=172.28.56.109 path=somepath query=somequery port=1234 // input file format: scheme://hostname:port/somepath;params?somequery user=USER password=PASS // user, password, are optional; scheme and host are required // char* h= http://user:pass@IPADDRESS/path_to_file;?port <== this is the format we need Cache cache; if (!input_file_path.empty()) { printf("passed argv %s\n", input_file_path.c_str()); } cache.loadURLs(input_file_path); if ((err = cache.loadSpan(SpanFile))) { cache.dumpSpans(Cache::SpanDumpDepth::SPAN); cache.build_stripe_hash_table(); for (auto host : cache.URLset) { CryptoContext ctx; CryptoHash hashT; swoc::LocalBufferWriter<33> w; ctx.update(host->url.data(), host->url.size()); ctx.update(&host->port, sizeof(host->port)); ctx.finalize(hashT); StripeSM *stripe_ = cache.key_to_stripe(&hashT, host->url.data(), host->url.size()); w.print("{}", hashT); printf("hash of %.*s is %.*s: Stripe %s \n", static_cast<int>(host->url.size()), host->url.data(), static_cast<int>(w.size()), w.data(), stripe_->hashText.data()); CacheDirEntry *dir_result = nullptr; stripe_->loadMeta(); stripe_->loadDir(); stripe_->dir_probe(&hashT, dir_result, nullptr); } } } void static scan_span(Span *span, swoc::file::path const &regex_path) { for (auto strp : span->_stripes) { strp->loadMeta(); strp->loadDir(); if (!regex_path.empty()) { CacheScan cs(strp, regex_path); cs.Scan(true); } else { CacheScan cs(strp); cs.Scan(false); } } } void Scan_Cache(swoc::file::path const &regex_path) { Cache cache; std::vector<std::thread> threadPool; if ((err = cache.loadSpan(SpanFile))) { if (err.length()) { return; } cache.dumpSpans(Cache::SpanDumpDepth::SPAN); for (auto sp : cache._spans) { threadPool.emplace_back(scan_span, sp, regex_path); } for (auto &th : threadPool) { th.join(); } } } int main([[maybe_unused]] int argc, const char *argv[]) { swoc::file::path input_url_file; std::string inputFile; parser.add_global_usage(std::string(argv[0]) + " --spans <SPAN> --volume <FILE> <COMMAND> [<SUBCOMMAND> ...]\n"); parser.require_commands() .add_option("--help", "-h", "") .add_option("--spans", "-s", "", "", 1) .add_option("--volumes", "-v", "", "", 1) .add_option("--write", "-w", "") .add_option("--input", "-i", "", "", 1) .add_option("--device", "-d", "", "", 1) .add_option("--aos", "-o", "", "", 1); parser.add_command("list", "List elements of the cache", []() { List_Stripes(Cache::SpanDumpDepth::SPAN); }) .add_command("stripes", "List the stripes", []() { List_Stripes(Cache::SpanDumpDepth::STRIPE); }); parser.add_command("clear", "Clear spans", []() { Clear_Spans(); }).add_command("span", "clear an specific span", [&]() { Clear_Span(inputFile); }); auto &c = parser.add_command("dir_check", "cache check").require_commands(); c.add_command("full", "Full report of the cache storage", &dir_check); c.add_command("freelist", "check the freelist for loop", [&]() { Check_Freelist(inputFile); }); c.add_command("bucket_chain", "walk bucket chains for loops", [&]() { walk_bucket_chain(inputFile); }); parser.add_command("volumes", "Volumes", &Simulate_Span_Allocation); parser.add_command("alloc", "Storage allocation") .require_commands() .add_command("free", "Allocate storage on free (empty) spans", &Cmd_Allocate_Empty_Spans); parser.add_command("find", "Find Stripe Assignment", [&]() { Find_Stripe(input_url_file); }); parser.add_command("clearspan", "clear specific span").add_command("span", "device path", [&]() { Clear_Span(inputFile); }); parser.add_command("retrieve", " retrieve the response of the given list of URLs", [&]() { Get_Response(input_url_file); }); parser.add_command("init", " Initializes uninitialized span", [&]() { Init_disk(input_url_file); }); parser.add_command("scan", " Scans the whole cache and lists the urls of the cached contents", [&]() { Scan_Cache(input_url_file); }); // parse the arguments auto arguments = parser.parse(argv); if (auto data = arguments.get("spans")) { SpanFile = data.value(); } if (auto data = arguments.get("volumes")) { VolumeFile = data.value(); } if (auto data = arguments.get("input")) { input_url_file = data.value(); } if (auto data = arguments.get("aos")) { cache_config_min_average_object_size = std::stoi(data.value()); } if (auto data = arguments.get("device")) { inputFile = data.value(); } if (auto data = arguments.get("write")) { OPEN_RW_FLAG = O_RDWR; std::cout << "NOTE: Writing to physical devices enabled" << std::endl; } if (arguments.has_action()) { arguments.invoke(); } if (err.length()) { std::cerr << err; exit(1); } return 0; }