From d191a5ef0df6c366894065579feee17b688ae7aa Mon Sep 17 00:00:00 2001 From: Guo Teng Date: Sun, 14 Jul 2024 00:26:25 +0800 Subject: [PATCH] update hotness estimate --- .gitignore | 1 + YCSB/workloads/workloadt | 4 +- db/art/heat_buckets.cc | 277 ++++++++++++++++++++++++++---------- db/art/heat_buckets.h | 62 ++++++-- db/art/macros.h | 12 +- db/db_impl/db_impl.cc | 22 ++- db/db_impl/db_impl.h | 10 ++ db/db_impl/db_impl_write.cc | 8 ++ 8 files changed, 305 insertions(+), 91 deletions(-) diff --git a/.gitignore b/.gitignore index d953f8e01..20b9be1f4 100644 --- a/.gitignore +++ b/.gitignore @@ -95,3 +95,4 @@ cmake-build-release *_example inode_vptrs +.cache/ \ No newline at end of file diff --git a/YCSB/workloads/workloadt b/YCSB/workloads/workloadt index 02e97cf94..e703b491e 100644 --- a/YCSB/workloads/workloadt +++ b/YCSB/workloads/workloadt @@ -8,9 +8,9 @@ workload=com.yahoo.ycsb.workloads.CoreWorkload readallfields=true -readproportion=0 +readproportion=1 updateproportion=0 scanproportion=0 -insertproportion=1 +insertproportion=0 requestdistribution=zipfian \ No newline at end of file diff --git a/db/art/heat_buckets.cc b/db/art/heat_buckets.cc index df59bf9ac..2648459fe 100644 --- a/db/art/heat_buckets.cc +++ b/db/art/heat_buckets.cc @@ -1,16 +1,17 @@ #include "heat_buckets.h" #include #include -#include namespace ROCKSDB_NAMESPACE { std::vector HeatBuckets::seperators_; std::vector HeatBuckets::buckets_; -uint32_t HeatBuckets::period_cnt_; // the get count of one period, should be fixed uint32_t HeatBuckets::current_cnt_; // current get count in this period -double HeatBuckets::alpha_; std::vector> HeatBuckets::mutex_ptrs_; std::mutex HeatBuckets::cnt_mutex_; +std::mutex HeatBuckets::sample_mutex_; +bool HeatBuckets::is_ready_; // identify whether HeatBuckets ready for hit +SamplesPool HeatBuckets::samples_; +bool HeatBuckets::updated_; // prevent from updating hotness more than once in a short time Bucket::Bucket() { @@ -23,16 +24,10 @@ Bucket::~Bucket() { return; // destroy nothing } -/* -const size_t& Bucket::keys_cnt() { - return keys_.size(); -} -*/ - void Bucket::update(const double& alpha, const uint32_t& period_cnt) { // mutex_.lock(); - hotness_ = (1 - alpha) * hotness_ + - alpha * double(hit_cnt_) / double(period_cnt); + hotness_ = alpha * hotness_ + + (1 - alpha) * double(hit_cnt_) / double(period_cnt); hit_cnt_ = 0; // remember to reset counter // keys_.clear(); // mutex_.unlock(); // remember to unlock!!! @@ -46,53 +41,13 @@ void Bucket::hit() { } HeatBuckets::HeatBuckets() { - // TODO:support reading rocksdb options to get seperators path - HeatBuckets(SEPERATORS_PATH, BUCKETS_ALPHA, BUCKETS_PERIOD); -} - -HeatBuckets::HeatBuckets(const std::string& path, const double& alpha, const uint32_t& period_cnt) { - // 1. init seperators_ - std::ifstream input; - input.open(path, std::ios::in); - std::string seperator; - - if (!input.is_open()) { - // std::cout << "failed to open key range seperators file, skip building head buckets!" << std::endl; - throw "failed to open key range seperators file!"; - } - seperators_.resize(0); - while (std::getline(input, seperator)) { - seperators_.push_back(seperator); - } - // std::cout << "success load key range seperators" << std::endl; - // std::cout << "key range seperators count : " << seperators_.size() << std::endl; - input.close(); - - // 2. init buckets_ - const size_t buckets_num = seperators_.size() - 1; // bucketss number = seperators num - 1 buckets_.resize(0); - buckets_.resize(buckets_num); // auto call Bucket::Bucket() - // std::cout << "set heat buckets size to " << buckets_.size() << std::endl; - - // 3. init period_cnt_ and alpha_, two variables should be fixed after init - period_cnt_ = period_cnt; - alpha_ = alpha; - // std::cout << "set period_cnt_ to " << period_cnt_ << std::endl; - // std::cout << "set alpha_ to " << alpha_ << std::endl; - - // 4. init current_cnt_ current_cnt_ = 0; - // std::cout << "set current_cnt_ to " << current_cnt_ << std::endl; - - // 5. init mutex ptr container - const size_t mutex_ptrs_num = buckets_num; mutex_ptrs_.resize(0); - for (size_t i=0; i(new std::mutex())); - } - // std::cout << "set mutex ptrs size to " << mutex_ptrs_.size() << std::endl; - std::cout << "enable heat buckets estimates" << std::endl; + is_ready_ = false; + samples_.clear(); + updated_ = false; } HeatBuckets::~HeatBuckets() { @@ -100,7 +55,7 @@ HeatBuckets::~HeatBuckets() { } void HeatBuckets::debug() { - std::cout << "[Debug] current cnt in this period: " << current_cnt_ << std::endl; + std::cout << "[Debug] total cnt in this period: " << current_cnt_ << std::endl; for (auto& bucket : buckets_) { std::cout << "[Debug] "; std::cout << "bucket hotness : " << bucket.hotness_; @@ -111,10 +66,11 @@ void HeatBuckets::debug() { } void HeatBuckets::update() { - - // bug : when update, the sum of all buckets cnt may not more or less than period_cnt_. - // we decide to use bigger period_cnt_ and divide into more buckets. - + // mark already updated, after current_cnt_ more than PERIOD_COUNT / MAGIC_FACTOR, updated_ will be reset to false; + // we need guarantee that in one period (one constant time span), db gets are much larger than PERIOD_COUNT / MAGIC_FACTOR; + // usually in server, exec get requests PERIOD_COUNT / MAGIC_FACTOR times only account for a very very short time. + updated_ = true; + assert(mutex_ptrs_.size() == buckets_.size()); for (size_t i=0; ilock(); @@ -122,17 +78,21 @@ void HeatBuckets::update() { // TODO: use multiple threads to update hotness of all buckets for (size_t i=0; iunlock(); } // remember to reset current_cnt_ counter current_cnt_ = 0; } -int32_t HeatBuckets::locate(const std::string& key) { - int32_t left = 0, right = seperators_.size()-1; +uint32_t HeatBuckets::locate(const std::string& key) { + // we use locate method to locate the key range for one key + // reminded one key range -> [lower seperator, upper seperator) + // if we locate key k to idx i, then seperator i <= k < seperator i+1 + // equal to k in key range i + uint32_t left = 0, right = seperators_.size()-1; while (left < right - 1){ - int32_t mid = left + ((right-left) / 2); + uint32_t mid = left + ((right-left) / 2); if (seperators_[mid] > key) { right = mid; } else if (seperators_[mid] <= key) { @@ -142,13 +102,14 @@ int32_t HeatBuckets::locate(const std::string& key) { return left; } -void HeatBuckets::hit(const std::string& key) { - // use linear search to find index i, making seperators_[i] <= key and seperators_[i+1] > i +void HeatBuckets::hit(const std::string& key, const bool& signal) { + assert(is_ready_); + // use binary search to find index i, making seperators_[i] <= key and seperators_[i+1] > i // reminding we have set border guard, so dont worry about out of bounds error // after we find the index i, we call buckets_[i].hit(), then add 1 to current_cnt_ // if current_cnt_ >= period_cnt_, call update() to update hotness of all buckets and reset current cnt counter - int32_t index = 0; + uint32_t idx = 0; // last element is border guard // means last element always bigger than key // first element is border guard @@ -160,29 +121,197 @@ void HeatBuckets::hit(const std::string& key) { index += 1; } */ - index = locate(key); + idx = locate(key); // std::cout << "debug seperators_ size : " << seperators_.size() << std::endl; // std::cout << "debug buckets_ size : " << buckets_.size() << std::endl; // std::cout << "debug mutex_ptrs_ size : " << mutex_ptrs_.size() << std::endl; // std::cout << "debug period_cnt_ : " << period_cnt_ << std::endl; // std::cout << "debug alpha_ : " << alpha_ << std::endl; - assert(index >= 0 && index < buckets_.size()); - assert(seperators_[index] <= key && key < seperators_[index+1]); - assert(index >= 0 && index < mutex_ptrs_.size()); + assert(buckets_.size() == mutex_ptrs_.size()); + assert(idx >= 0 && idx < buckets_.size()); + assert(seperators_[idx] <= key && key < seperators_[idx+1]); - mutex_ptrs_[index]->lock(); - buckets_[index].hit(); // mutex only permits one write opr to one bucket - mutex_ptrs_[index]->unlock(); + mutex_ptrs_[idx]->lock(); + buckets_[idx].hit(); // mutex only permits one write opr to one bucket + mutex_ptrs_[idx]->unlock(); cnt_mutex_.lock(); current_cnt_ += 1; - if (current_cnt_ % period_cnt_ == 0) { + // use updated_ to prevent from updating hotness in a very short time span (due to multi-threads operation) + if (signal && !updated_) { // debug(); update(); } cnt_mutex_.unlock(); + + // remember to reset updated_ to false + if (updated_ && current_cnt_ >= PERIOD_COUNT / MAGIC_FACTOR) { + updated_ = false; + } +} + +SamplesPool::SamplesPool() { + samples_cnt_ = 0; + pool_.resize(0); + filter_.clear(); + + // because put opt will input duplicated keys, we need to guarantee SAMPLES_MAXCNT much larger than SAMPLES_LIMIT + // however std::set only remain deduplicated keys + // to collect good samples for previous put keys, we need a larger SAMPLES_MAXCNT + assert(SAMPLES_MAXCNT >= MAGIC_FACTOR * SAMPLES_LIMIT); +} + +void SamplesPool::clear() { + samples_cnt_ = 0; + pool_.resize(0); + filter_.clear(); +} + +void SamplesPool::sample(const std::string& key) { + assert(pool_.size() == filter_.size()); + // if already in pool, return + if (is_sampled(key)) { + return; + } + // pool not full + if (!is_full()) { + pool_.push_back(key); + filter_.insert(key); + } + // pool is full + else { + // need to generate random integer in [0, old samples_cnt_] (equal to [0, old samples_cnt_ + 1)) + // new samples_cnt_ = old samples_cnt_ + 1 + // if you want random integer in [a, b], use (rand() % (b-a+1))+a; + srand((unsigned)time(NULL)); + uint32_t idx = (rand() % (samples_cnt_ + 1)) + 0; + assert(idx <= samples_cnt_ && idx >= 0); + // idx in [0, samples_limit_) + // pool_ size may lightly more than samples_limit_; + if (idx < pool_.size()) { + // remove old key + std::string old_key = pool_[idx]; + filter_.erase(old_key); + + // update new key + pool_[idx] = key; + filter_.insert(key); + } + } + assert(pool_.size() == filter_.size()); + + // remember to update samples_cnt_ + samples_cnt_ += 1; +} + +void SamplesPool::prepare() { + std::string key_min = "user"; // defined min key for YCSB + std::string key_max = pool_[pool_.size()-1] + pool_[pool_.size()-1]; + if (!is_ready()) { + return; + } + sort(pool_.begin(), pool_.end()); + // add border guard + pool_.emplace(pool_.begin(), key_min); + pool_.emplace_back(key_max); +} + +void SamplesPool::divide(const uint32_t& k, std::vector& dst) { + // reminded we already add border guard to pool vector + std::string key_min = pool_[0]; // defined min key for YCSB + std::string key_max = pool_[pool_.size()-1]; + + dst.resize(0); + dst.emplace_back(key_min); + + // reminded we already add border guard to pool vector + // border guard in idx 0 and idx pool_.size()-1 + uint32_t idx = 1; + while (idx < pool_.size() - 1) { + dst.emplace_back(pool_[idx]); + idx += k; + } + + dst.emplace_back(key_max); +} + + +uint32_t SamplesPool::locate(const std::string& key) { + // pool must be sorted + // and we need to add border guard to pool + // after that, we can use locate(key) + uint32_t left = 0, right = pool_.size()-1; + while (left < right - 1){ + uint32_t mid = left + ((right-left) / 2); + if (pool_[mid] > key) { + right = mid; + } else if (pool_[mid] <= key) { + left = mid; + } + } + return left; } +uint32_t SamplesPool::determine_k(std::vector>& segments) { + // already add border guard to pool + uint32_t k = pool_.size() - 2; + // if segments is empty, use default k to debug + if (segments.empty()) { + k = (pool_.size() - 2) / DEFAULT_BUCKETS; + } + assert(k > 1); + for (auto& segment : segments) { + assert(segment.size() == 2); + assert(segment[0] < segment[1]); + uint32_t span = locate(segment[1]) - locate(segment[0]); + + assert(span > 1); + if (k > span) k = span; + } + // std::cout << "[Debug] samples divided with span k : " << k << std::endl; + return k; +} + +void HeatBuckets::sample(const std::string& key, std::vector>& segments) { + sample_mutex_.lock(); + samples_.sample(key); + if (samples_.is_ready()) { + init(segments); + } + sample_mutex_.unlock(); +} + +void HeatBuckets::init(std::vector>& segments) { + // compute proper k and determine key ranges + samples_.prepare(); + uint32_t k = samples_.determine_k(segments); + samples_.divide(k, seperators_); + + // std::cout << "[Debug] show key ranges below: " << std::endl; + for (size_t i=0; i(new std::mutex())); + } + assert(mutex_ptrs_.size() == buckets_.size()); + assert(seperators_.size() == buckets_.size()+1); + + is_ready_ = true; + + // debug + // std::cout << "[Debug] heat buckets size: " << buckets_.size() << std::endl; + std::cout << "[Debug] key ranges init" << std::endl; +} } \ No newline at end of file diff --git a/db/art/heat_buckets.h b/db/art/heat_buckets.h index 1ac0c204e..6d2d0d543 100644 --- a/db/art/heat_buckets.h +++ b/db/art/heat_buckets.h @@ -5,11 +5,15 @@ #include #include "macros.h" #include +#include +#include +#include namespace ROCKSDB_NAMESPACE { class Bucket; class HeatBuckets; +class SamplesPool; class Bucket { public: @@ -26,30 +30,72 @@ class Bucket { /* - load previous read records (read workload) to generate key range seperators, see rocksdb/workload/generator.cc - after we generate seperators file, we read this file to generate these heat buckets to estimate hotness of every key range - every bucket corresponding to one key range + first sample put keys using reservoir sampling. + If we collect enough keys, determine the common key num (k) for every key group + start with idx 0, add k continuously, get 0, k, 2k, ... + set KEY_MIN, samples[0], samples[k], samples[2k], ..., KEY_MAX as seperators + guarenteed that KEY_MIN < all keys and KEY_MAX > all keys + then we define key ranges (KEY_MIN, samples[0]), [samples[1], samples[2]), [samples[2], samples[3]), ..., [..., KEY_MAX) + one heat bucket corresponding to one key range + compute and update hotness of all heat buckets */ class HeatBuckets { private: static std::vector seperators_; static std::vector buckets_; - static uint32_t period_cnt_; // the get count of one period, should be fixed static uint32_t current_cnt_; // current get count in this period - static double alpha_; static std::vector> mutex_ptrs_; static std::mutex cnt_mutex_; + static std::mutex sample_mutex_; + static bool is_ready_; // identify whether HeatBuckets ready for hit + static SamplesPool samples_; + static bool updated_; public: HeatBuckets(); - HeatBuckets(const std::string& path, const double& alpha, const uint32_t& period_cnt); ~HeatBuckets(); - int32_t locate(const std::string& key); // locate which bucket hitted by this key + uint32_t locate(const std::string& key); // helper func: locate which bucket hitted by this key + + const bool& is_ready() { return is_ready_; } + void sample(const std::string& key, std::vector>& segments); // before init buckets, we need to sample keys; + // input segment-related key range (segments), will use them when SamplesPool ready. + + void init(std::vector>& segments); // if sample enough keys, ready to init heatbuckets void update(); // update hotness value of all buckets - void hit(const std::string& key); // one key only hit one bucket (also mean only hit one key range) + void hit(const std::string& key, const bool& signal); // one key only hit one bucket (also mean only hit one key range) + // if signal is true, update hotness void debug(); // output debug message in standard output }; +class SamplesPool { +private: + std::vector pool_; // using set to guarantee only store deduplicated samples + std::set filter_; // used to check whether new key already exist in pool + uint32_t samples_cnt_; // current sample tries num, need to update after every try +public: + SamplesPool(); + + ~SamplesPool() { return; } + + void clear(); + + bool is_ready() { return samples_cnt_ >= SAMPLES_MAXCNT; } + bool is_full() { return pool_.size() >= SAMPLES_LIMIT; } + bool is_sampled(const std::string& key) { return filter_.count(key) > 0; } + + void sample(const std::string& key); + + void prepare(); + + // need call prepare() before + // generate seperators + void divide(const uint32_t& k, std::vector& dst); + + // determine k based on low-level segments' key range + uint32_t determine_k(std::vector>& segments); + uint32_t locate(const std::string& key); // helper func when determine k +}; + } \ No newline at end of file diff --git a/db/art/macros.h b/db/art/macros.h index 1c2bfc6ce..0d59e9005 100644 --- a/db/art/macros.h +++ b/db/art/macros.h @@ -138,8 +138,14 @@ namespace ROCKSDB_NAMESPACE { */ // micros for HeatBuckets -#define SEPERATORS_PATH "/home/ycc/WaLSM/workloads/seperators" -#define BUCKETS_PERIOD 1000000 -#define BUCKETS_ALPHA 0.1 +#define BUCKETS_ALPHA 0.2 +#define SAMPLES_LIMIT 10000 +#define SAMPLES_MAXCNT 5000000 +#define PERIOD_COUNT 200000 +#define DEFAULT_BUCKETS 500 +#define MAGIC_FACTOR 500 + +// micors for Model Train +#define TRAIN_PERIODS 10 } // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index f636ac6db..532538040 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -241,6 +241,12 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, closed_(false), error_handler_(this, immutable_db_options_, &mutex_), atomic_flush_install_cv_(&mutex_) { +// WaLSM+ +#ifdef ART_PLUS + get_cnt_ = 0; + period_cnt_ = 0; +#endif + // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. assert(batch_per_txn_ || seq_per_batch_); @@ -1744,13 +1750,21 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, // Change // std::cout << "ready for get" << std::endl; +// WaLSM+: add hotness estimating #ifdef ART std::string art_key(key.data(), key.size()); #ifdef ART_PLUS - // std::cout << "Buckets_ address : "; - // std::cout << std::hex << heat_buckets_ << std::endl; - heat_buckets_.hit(art_key); - // std::cout << "hit : " << art_key << std::endl; + // ready to estimate hotness + if (heat_buckets_.is_ready()) { + get_cnt_ += 1; + if (get_cnt_ >= PERIOD_COUNT) { + heat_buckets_.hit(art_key, true); + get_cnt_ = 0; + period_cnt_ += 1; + } else { + heat_buckets_.hit(art_key, false); + } + } #endif done = global_memtable_->Get(art_key, *get_impl_options.value->GetSelf(), &s); #else diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 37cd1c2a9..92b6aca43 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -18,6 +18,7 @@ #include #include #include +#include #include "db/art/compactor.h" #include "db/art/heat_group_manager.h" @@ -1900,6 +1901,15 @@ class DBImpl : public DB { #ifdef ART_PLUS HeatBuckets heat_buckets_; + + // monitor low-level segments min key and max key + std::vector> segments_info_; + + // record get cnt in current period, when equal to PERIOD_COUNT, start next period + uint32_t get_cnt_; + + // record period cnt, if period_cnt_ % TRAIN_PERIODS = 0, start to evaluate or retrain model + uint32_t period_cnt_; #endif // Offset of last record written by leader writer. uint64_t last_record_offset_; diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 07db5edf2..db65892fc 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -20,6 +20,14 @@ namespace ROCKSDB_NAMESPACE { // Convenience methods Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { +// WaLSM+: first sample put keys into pool, then generate key ranges for computing hotness +#ifdef ART_PLUS + // heat_buckets not ready, still sample into pool + if (!heat_buckets_.is_ready()) { + std::string art_key(key.data(), key.size()); + heat_buckets_.sample(art_key, segments_info_); + } +#endif return DB::Put(o, column_family, key, val); }