diff --git a/.gitignore b/.gitignore index e2d24c97b..f2424a220 100644 --- a/.gitignore +++ b/.gitignore @@ -103,3 +103,4 @@ __pycache__/ include/csv2/ debug.* +*.log \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 95751f6fd..b7e2db5e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -592,6 +592,8 @@ set(SOURCES db/art/global_memtable.cc db/art/heat_buckets.cc db/art/clf_model.cc + db/art/filter_cache_heap.cc + db/art/greedy_algo.cc db/art/heat_group.cc db/art/lock.cc db/art/logger.cc diff --git a/TARGETS b/TARGETS index 6aa3b118b..eb13d129b 100644 --- a/TARGETS +++ b/TARGETS @@ -449,6 +449,8 @@ cpp_library( "db/art/global_memtable.cc", "db/art/heat_buckets.cc", "db/art/clf_model.cc", + "db/art/filter_cache_heap.cc", + "db/art/greedy_algo.cc", "db/art/heat_group.cc", "db/art/lock.cc", "db/art/logger.cc", diff --git a/YCSB/Makefile b/YCSB/Makefile index 9f53be021..10ba2d280 100644 --- a/YCSB/Makefile +++ b/YCSB/Makefile @@ -18,9 +18,10 @@ BIND_LEVELDB ?= 0 BIND_LMDB ?= 0 EXTRA_LDFLAGS += -lstdc++ -EXTRA_LDFLAGS += -lpython3.12 -EXTRA_CXXFLAGS += -I$(PYTHON_INCLUDE_PATH) -EXTRA_CXXFLAGS += -L$(PYTHON_LIBRARY_PATH) +EXTRA_LDFLAGS += -lsocket++ +# EXTRA_LDFLAGS += -lpython3.12 +# EXTRA_CXXFLAGS += -I$(PYTHON_INCLUDE_PATH) +# EXTRA_CXXFLAGS += -L$(PYTHON_LIBRARY_PATH) #---------------------------------------------------------- diff --git a/YCSB/rocksdb/rocksdb.properties b/YCSB/rocksdb/rocksdb.properties index f12117bf3..4fdbcf04f 100644 --- a/YCSB/rocksdb/rocksdb.properties +++ b/YCSB/rocksdb/rocksdb.properties @@ -1,4 +1,4 @@ -rocksdb.dbname=/pg_wal/ycc +rocksdb.dbname=/tmp/ycc rocksdb.nvm_path=/pg_wal/ycc/memory_art rocksdb.format=single rocksdb.destroy=false diff --git a/YCSB/workloads/workloadt b/YCSB/workloads/workloadt index e41752dd5..f5357c20f 100644 --- a/YCSB/workloads/workloadt +++ b/YCSB/workloads/workloadt @@ -3,7 +3,7 @@ recordcount=5000000 -operationcount=100000000 +operationcount=600000 workload=com.yahoo.ycsb.workloads.CoreWorkload readallfields=true diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index aa78bfb40..a5a8e6d5a 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -280,9 +280,10 @@ JAVA_STATIC_LDFLAGS="$PLATFORM_LDFLAGS" JAVAC_ARGS="-source 7" COMMON_FLAGS="$COMMON_FLAGS -DUSE_PMEM -DART -DART_PLUS" -COMMON_FLAGS="$COMMON_FLAGS -lstdc++ -lpython3.12" -COMMON_FLAGS="$COMMON_FLAGS -I$PYTHON_INCLUDE_PATH" -COMMON_FLAGS="$COMMON_FLAGS -L$PYTHON_LIBRARY_PATH" +COMMON_FLAGS="$COMMON_FLAGS -lstdc++ -lsocket++" +# COMMON_FLAGS="$COMMON_FLAGS -lstdc++ -lpython3.12" +# COMMON_FLAGS="$COMMON_FLAGS -I$PYTHON_INCLUDE_PATH" +# COMMON_FLAGS="$COMMON_FLAGS -L$PYTHON_LIBRARY_PATH" PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lpmem" JAVA_LDFLAGS="$JAVA_LDFLAGS -lpmem" diff --git a/db/art/clf_model.cc b/db/art/clf_model.cc index d641440d7..9ae2491d3 100644 --- a/db/art/clf_model.cc +++ b/db/art/clf_model.cc @@ -1,6 +1,8 @@ #include "clf_model.h" #include #include +#include +#include #include #include #include @@ -8,14 +10,16 @@ namespace ROCKSDB_NAMESPACE { -std::string ClfModel::base_dir_; -uint16_t ClfModel::model_cnt_; -uint16_t ClfModel::dataset_cnt_; uint16_t ClfModel::feature_num_; +std::string ClfModel::dataset_name_; +std::string ClfModel::dataset_path_; +std::string ClfModel::host_, ClfModel::port_; +size_t ClfModel::buffer_size_; void ClfModel::write_debug_dataset() { + assert(feature_num_ > 0); // ready for writer - std::ofstream stream(next_dataset_path()); + std::ofstream stream(dataset_path_); csv2::Writer> writer(stream); // init hotness values @@ -59,10 +63,10 @@ void ClfModel::write_debug_dataset() { values.emplace_back(std::to_string(level)); for (int j = 0; j < 20; j ++) { values.emplace_back(std::to_string(ids[j])); - values.emplace_back(std::to_string(uint32_t(SIGNIFICANT_DIGITS * hotness_map[ids[j]]))); + values.emplace_back(std::to_string(uint32_t(SIGNIFICANT_DIGITS_FACTOR * hotness_map[ids[j]]))); } values.emplace_back(std::to_string(target)); - + assert(values.size() == feature_num_ + 1); rows.emplace_back(values); } @@ -70,9 +74,13 @@ void ClfModel::write_debug_dataset() { stream.close(); } -void ClfModel::write_real_dataset(std::vector>& datas) { +void ClfModel::write_real_dataset(std::vector>& datas, std::vector& tags) { + assert(feature_num_ > 0); + // tags is real class of all segments, + // we also need to write these tags to dataset besides features + assert(datas.size()==tags.size()); // ready for writer - std::ofstream stream(next_dataset_path()); + std::ofstream stream(dataset_path_); csv2::Writer> writer(stream); // init csv header vector @@ -90,12 +98,17 @@ void ClfModel::write_real_dataset(std::vector>& datas) { rows.emplace_back(header); std::vector values; + size_t idx = 0; for (std::vector& data : datas) { + // resize features vector to size feature_num_ prepare_data(data); values.clear(); for (uint32_t& value : data) { values.emplace_back(std::to_string(value)); } + // remember to write real tag to dataset + values.emplace_back(std::to_string(tags[idx++])); + assert(values.size() == feature_num_ + 1); rows.emplace_back(values); } @@ -103,7 +116,8 @@ void ClfModel::write_real_dataset(std::vector>& datas) { stream.close(); } -void ClfModel::write_dataset(std::vector>& datas) { +void ClfModel::write_dataset(std::vector>& datas, std::vector& tags) { + assert(feature_num_ > 0); if (datas.empty()) { write_debug_dataset(); // dataset_cnt_ += 1; @@ -112,114 +126,93 @@ void ClfModel::write_dataset(std::vector>& datas) { assert(feature_num_ % 2 != 0); // features num: 2r + 1 - write_real_dataset(datas); + write_real_dataset(datas, tags); // dataset_cnt_ += 1; return; } -uint16_t ClfModel::make_train(std::vector>& datas) { - write_dataset(datas); - - PyObject* pModule = PyImport_ImportModule("lgb"); - assert(pModule != nullptr); - - PyObject* pFunc = PyObject_GetAttrString(pModule, "train"); - assert(pFunc != nullptr && PyCallable_Check(pFunc)); - - - PyObject* pArg = PyTuple_New(2); - PyTuple_SetItem(pArg, 0, Py_BuildValue("s", next_dataset_path().c_str())); - PyTuple_SetItem(pArg, 1, Py_BuildValue("s", next_model_path().c_str())); - - PyObject_CallObject(pFunc, pArg); - - dataset_cnt_ += 1; - model_cnt_ += 1; - - Py_DECREF(pModule); - Py_DECREF(pFunc); - Py_DECREF(pArg); - - return model_cnt_; +void ClfModel::make_train(std::vector>& datas, std::vector& tags) { + assert(feature_num_ > 0); + write_dataset(datas, tags); + + // already write dataset + // send msg to LightGBM server, let server read dataset and train new model + libsocket::inet_stream sock(host_, port_, LIBSOCKET_IPv4); + std::string message = TRAIN_PREFIX + dataset_name_; + // already write dataset, send dataset path to server + // should not receive any message from server + sock << message; + // will destroy sock when leaving this func scope } void ClfModel::make_predict_samples(std::vector>& datas) { + assert(feature_num_ > 0); csv2::Reader, csv2::quote_character<'"'>, csv2::first_row_is_header, csv2::trim_policy::trim_whitespace> csv; - - if (csv.mmap(latest_dataset_path())) { + std::vector data; + if (csv.mmap(dataset_path_)) { // const auto header = csv.header(); int cnt = 0; for (auto row : csv) { - if ((cnt++) > 10) { + // only choose first 10 samples + if ((++cnt) > 10) { break; } - - std::vector data; + data.clear(); for (auto cell : row) { std::string value; cell.read_value(value); data.emplace_back(stoul(value)); } + // remind that csv reader will read a empty row in the end, that is why !data.empty() + // csv file last column is real tag + // we need to pop out last column + if (!data.empty()) { + data.pop_back(); + } + assert(data.size() == feature_num_); datas.emplace_back(data); } } } void ClfModel::make_real_predict(std::vector>& datas, std::vector& preds) { - PyObject* pModule = PyImport_ImportModule("lgb"); - assert(pModule == nullptr); - - PyObject* pFunc = PyObject_GetAttrString(pModule, "predict"); - assert(pFunc != nullptr && PyCallable_Check(pFunc)); - - PyObject* pArg = PyTuple_New(2); - PyTuple_SetItem(pArg, 0, Py_BuildValue("s", latest_model_path())); - - PyObject* pDatas = PyList_New(0); - PyObject* pData = nullptr; + assert(preds.empty()); + libsocket::inet_stream sock(host_, port_, LIBSOCKET_IPv4); + std::string message, recv_buffer; for (std::vector& data : datas) { - pData = PyList_New(0); - prepare_data(data); - for (uint32_t& feature : data) { - PyList_Append(pData, Py_BuildValue("i", feature)); + if (!data.empty()) { + message.clear(); + recv_buffer.clear(); + recv_buffer.resize(buffer_size_); + message = std::to_string(data[0]); + for (size_t i = 1; i < data.size(); i ++) { + message = message + " " + std::to_string(data[i]); + } + message = PREDICT_PREFIX + message; + assert(message.size() <= buffer_size_); + sock << message; + // only receive pred tag integer + sock >> recv_buffer; + uint16_t pred = std::stoul(recv_buffer); + assert(pred >= MIN_UNITS_NUM && pred <= MAX_UNITS_NUM); + preds.emplace_back(pred); } - PyList_Append(pDatas, pData); - } - - PyTuple_SetItem(pArg, 1, pDatas); - - PyObject* pReturn = PyObject_CallObject(pFunc, pArg); // should return list - assert(pReturn != nullptr); - - for (size_t i = 0; i < datas.size(); i ++) { - int nResult = 0; - PyArg_Parse(PyList_GetItem(pReturn, i), "i", &nResult); - preds.emplace_back(nResult); } - assert(preds.size() != 0); - - Py_DECREF(pModule); - Py_DECREF(pFunc); - Py_DECREF(pArg); - Py_DECREF(pDatas); - Py_DECREF(pReturn); + // only write pred result to vector preds, and return nothing + assert(datas.size() == preds.size()); } void ClfModel::make_predict(std::vector>& datas, std::vector& preds) { preds.clear(); - if (model_cnt_ == 0) { - preds.resize(datas.size(), DEFAULT_UNITS); - return; - } - + // datas empty means we are debuging class ClfModel if (datas.empty()) { make_predict_samples(datas); } - + // only write pred result to vector preds, and return nothing make_real_predict(datas, preds); return; } diff --git a/db/art/clf_model.h b/db/art/clf_model.h index b01bd38a6..b468f3dee 100644 --- a/db/art/clf_model.h +++ b/db/art/clf_model.h @@ -4,7 +4,6 @@ #include #include #include -#include #include #include #include "macros.h" @@ -15,7 +14,8 @@ // every key range have id and hotness ( see heat_buckets ) // so data point features format : // LSM-Tree level, Key Range 1 id, Key Range 1 hotness, Key Range 2 id, Key Range 2 hotness, ..., Key Range r id, Key Range r hotness -// remind that heat_buckets recorded hotness is double type, we use uint32_t(uint32_t(SIGNIFICANT_DIGITS * hotness)) +// remind that heat_buckets recorded hotness is double type, +// we use uint32_t(uint32_t(SIGNIFICANT_DIGITS * hotness)) to closely estimate its hotness value namespace ROCKSDB_NAMESPACE { @@ -23,76 +23,59 @@ class ClfModel; class ClfModel { private: - static std::string base_dir_; // dir path for dataset and model - static uint16_t model_cnt_; // trained model file cnt - static uint16_t dataset_cnt_; // written dataset file cnt static uint16_t feature_num_; // model input features num + static std::string dataset_name_; // dataset csv file name + static std::string dataset_path_; // path to save dataset csv file + static std::string host_, port_; // lightgbm server connection + static size_t buffer_size_; // socket receive buffer max size public: // init member vars ClfModel() { - base_dir_ = MODEL_PATH; - model_cnt_ = 0; - dataset_cnt_ = 0; feature_num_ = 0; - } - - // next model file path - std::string next_model_path() { return base_dir_ + MODEL_PREFIX + std::to_string(model_cnt_) + MODEL_SUFFIX; } - - // next dataset file path - std::string next_dataset_path() { return base_dir_ + DATASET_PREFIX + std::to_string(dataset_cnt_) + DATASET_SUFFIX; } - - // latest model file path - std::string latest_model_path() { - assert(model_cnt_ > 0); - return base_dir_ + MODEL_PREFIX + std::to_string(model_cnt_ - 1) + MODEL_SUFFIX; - } - - // latest dataset file path - std::string latest_dataset_path() { - assert(dataset_cnt_ > 0); - return base_dir_ + DATASET_PREFIX + std::to_string(dataset_cnt_ - 1) + DATASET_SUFFIX; + dataset_name_ = DATASET_NAME; + // MODEL_PATH must end with '/' + dataset_path_ = MODEL_PATH; + dataset_path_ = dataset_path_ + DATASET_NAME; + host_ = HOST; port_ = PORT; + buffer_size_ = BUFFER_SIZE; } // check whether ready, only need check feature_nums_ now bool is_ready() { return feature_num_ > 0; } // make ready for training, only need init feature_nums_ now + // when first call ClfModel, we need to use current segments information to init features_num_ + // we can calcuate feature nums for every segment, + // feature num = level feature num (1) + 2 * num of key ranges segment covers + // we set features_num_ to largest feature num void make_ready(std::vector& features_nums) { - Py_Initialize(); - assert(Py_IsInitialized()); - - PyRun_SimpleString("import sys"); - PyRun_SimpleString("sys.path.append('../models')"); - if (features_nums.empty()) { - feature_num_ = 41; // debug feature num + feature_num_ = 41; // debug feature num, see ../lgb_server files } else { feature_num_ = *max_element(features_nums.begin(), features_nums.end()); } - std::cout << "[DEBUG] ClfModel ready, feature_num_: " << feature_num_ << std::endl; + // std::cout << "[DEBUG] ClfModel ready, feature_num_: " << feature_num_ << std::endl; } ~ClfModel() { - Py_Finalize(); + return; // do nothing } // resize data point features void prepare_data(std::vector& data) { + // at least level feature and one key range, so data size always >= 3 assert(data.size() >= 3); data.resize(feature_num_, 0); } // resize every data point and write to csv file for training void write_debug_dataset(); - void write_real_dataset(std::vector>& datas); - void write_dataset(std::vector>& datas); + void write_real_dataset(std::vector>& datas, std::vector& tags); + void write_dataset(std::vector>& datas, std::vector& tags); - // write dataset and train, return model cnt - // filter cache caller will check this model cnt and cnt it records, - // if model cnt not equal to caller cnt, it will do update job in filter cache - uint16_t make_train(std::vector>& datas); + // write dataset then send msg to train new model in LightGBM server side + void make_train(std::vector>& datas, std::vector& tags); // predict void make_predict_samples(std::vector>& datas); diff --git a/db/art/filter_cache_heap.cc b/db/art/filter_cache_heap.cc new file mode 100644 index 000000000..28f74b6d6 --- /dev/null +++ b/db/art/filter_cache_heap.cc @@ -0,0 +1,1036 @@ +#include "filter_cache_heap.h" +#include +#include + +namespace ROCKSDB_NAMESPACE { + +FilterCacheHeap FilterCacheHeapManager::benefit_heap_; +FilterCacheHeap FilterCacheHeapManager::cost_heap_; +std::map FilterCacheHeapManager::heap_visit_cnt_recorder_; +std::map FilterCacheHeapManager::units_num_limit_recorder_; +std::mutex FilterCacheHeapManager::manager_mutex_; + +FilterCacheHeapNode FilterCacheHeap::heap_top() { + // need lock heap, or we may retrive outdated node + // heap_mutex_.lock(); + + if (!heap_.empty()) { + return heap_[0]; + } else { + return nullptr; + } + + // heap_mutex_.unlock(); +} + +/* +void FilterCacheHeap::pop() { + // heap_mutex_.lock(); + + FilterCacheHeapNode node; + const size_t size = heap_.size(); + + assert(heap_type_ != UNKNOWN_HEAP); + if (heap_type_ == BENEFIT_HEAP) { + std::pop_heap(heap_.begin(), heap_.end(), FilterCacheHeapNodeLessComparor); + } else if (heap_type_ == COST_HEAP) { + std::pop_heap(heap_.begin(), heap_.end(), FilterCacheHeapNodeGreaterComparor); + } + node = heap_[size - 1]; + heap_.pop_back(); + + // remove node from heap_index_ + if (node == nullptr) { + return; + } + const uint32_t segment_id = node->segment_id; + auto it = heap_index_.find(segment_id); + if (it != heap_index_.end()) { + heap_index_.erase(node->segment_id); + } + if (node != nullptr) { + delete node; // remember to release node !!! + } + assert(heap_.size() == heap_index_.size()); + + // heap_mutex_.unlock(); +} +*/ + +/* +void FilterCacheHeap::push(FilterCacheHeapNode& node) { + if (node == nullptr) { + return; + } + + // heap_mutex_.lock(); + + heap_.emplace_back(node); + assert(heap_type_ != UNKNOWN_HEAP); + if (heap_type_ == BENEFIT_HEAP) { + std::push_heap(heap_.begin(), heap_.end(), FilterCacheHeapNodeLessComparor); + } else if (heap_type_ == COST_HEAP) { + std::push_heap(heap_.begin(), heap_.end(), FilterCacheHeapNodeGreaterComparor); + } + + // remember to upsert node into heap_index_ + // upsert(node); + if (node == nullptr) { + return; + } + const uint32_t segment_id = node->segment_id; + auto it = heap_index_.find(segment_id); + if (it != heap_index_.end()) { + it->second = node; // already exist in heap_index_, only update + } else { + heap_index_.insert(std::make_pair(segment_id, node)); // insert into heap_index_ + } + assert(heap_.size() == heap_index_.size()); + + // heap_mutex_.unlock(); +} +*/ + +void FilterCacheHeap::batch_query(std::vector& segment_ids, std::vector& return_nodes) { + // heap_mutex_.lock(); + + return_nodes.clear(); + for (uint32_t& segment_id : segment_ids) { + auto it = heap_index_.find(segment_id); + FilterCacheHeapNode return_node = nullptr; + // if node->is_alive is false, the segment already merged and never exists in storage + // so we should return null when query a merged segment id + if (it != heap_index_.end() && (it->second)->is_alive == true) { + return_node = it->second; // node exists in heap_index_ and segment alive + } + return_nodes.emplace_back(return_node); + } + + // heap_mutex_.unlock(); +} + +void FilterCacheHeap::batch_upsert(std::vector& nodes) { + // heap_mutex_.lock(); + + // we guarantee that if one node already exists in heap_index_, it must exist in heap + for (FilterCacheHeapNode& node : nodes) { + const uint32_t segment_id = node->segment_id; + auto it = heap_index_.find(segment_id); + if (it != heap_index_.end()) { + // exist in heap_index_ and heap_ + // we may query nodes from this heap, and update var in nodes, then upsert original nodes + // check it->second != node to make sure that we won't free a refered sapce + if (it->second != node) { + *(it->second) = *(node); // only copy content, this will update content of node in heap_index_ and heap_ + delete node; // remember to free unnecessary space! + } + } else { + // not exist in heap_index_ and heap_ + heap_index_.insert(std::make_pair(segment_id, node)); // insert into heap_index_ + heap_.emplace_back(node); // push into heap_ + } + } + + // update or insert done, need to rebuild heap_ + rebuild_heap(); + + // heap_mutex_.unlock(); +} + +void FilterCacheHeap::batch_delete(std::vector& segment_ids) { + // heap_mutex_.lock(); + + // we guarantee that if one node not exist in heap_index_, it must not exist in heap + for (uint32_t& segment_id : segment_ids) { + auto it = heap_index_.find(segment_id); + if (it == heap_index_.end()) { + // not exist in heap_index_ and heap_ + // do nothing + } else { + // exist in heap_index_ and heap_ + // set is_alive to false and delete after that + it->second->is_alive = false; + } + } + + // delete nodes that is_alive == false + auto it = heap_.begin(); + FilterCacheHeapNode node = nullptr; + while(it != heap_.end()) { + node = (*it); + if (node->is_alive == false) { + // need delete + const uint32_t segment_id = node->segment_id; + // already delete node in heap_ + it = heap_.erase(it); // it will point to next available node + // already delete node in heap_index_ + heap_index_.erase(segment_id); + // remember to free node after that + delete node; + } else { + it ++; + } + } + + // delete done, need to rebuild heap_ + rebuild_heap(); + + // heap_mutex_.unlock(); +} + +void FilterCacheHeapManager::batch_delete(std::vector& segment_ids) { + manager_mutex_.lock(); + + for (uint32_t& segment_id : segment_ids) { + auto cnt_it = heap_visit_cnt_recorder_.find(segment_id); + auto limit_it = units_num_limit_recorder_.find(segment_id); + if (cnt_it != heap_visit_cnt_recorder_.end()) { + heap_visit_cnt_recorder_.erase(segment_id); + } + if (limit_it != units_num_limit_recorder_.end()) { + units_num_limit_recorder_.erase(segment_id); + } + } + + benefit_heap_.batch_delete(segment_ids); + cost_heap_.batch_delete(segment_ids); + + manager_mutex_.unlock(); +} + +void FilterCacheHeapManager::batch_upsert(std::vector& items) { + manager_mutex_.lock(); + + std::vector benefit_nodes, cost_nodes; + for (FilterCacheHeapItem& item : items) { + assert(item.current_units_num >= MIN_UNITS_NUM); + assert(item.current_units_num <= item.units_num_limit); + double benefit = StandardBenefitWithMaxBound(item.approx_visit_cnt, item.current_units_num, item.units_num_limit); + double cost = StandardCostWithMinBound(item.approx_visit_cnt, item.current_units_num, MIN_UNITS_NUM); + // item meets at least one conditions + // so that item always upsert into heap + // if item.approx_visit_cnt = 0, still push into heap + // we may modify its visit cnt in heap later + /* + if (item.current_units_num > MIN_UNITS_NUM) { + // make ready to upsert cost nodes + // FilterCacheHeapItem(const uint32_t& id, const uint32_t& cnt, const uint16_t& units, const double& heap_value, const uint16_t& limit) + cost_nodes.emplace_back(new FilterCacheHeapItem(item.segment_id, + item.approx_visit_cnt, + item.current_units_num, + cost, + item.units_num_limit) + ); + } + if (item.current_units_num < item.units_num_limit) { + // make ready to upsert benefit nodes + // FilterCacheHeapItem(const uint32_t& id, const uint32_t& cnt, const uint16_t& units, const double& heap_value, const uint16_t& limit) + benefit_nodes.emplace_back(new FilterCacheHeapItem(item.segment_id, + item.approx_visit_cnt, + item.current_units_num, + benefit, + item.units_num_limit) + ); + } + */ + + if (item.current_units_num <= item.units_num_limit) { + cost_nodes.emplace_back(new FilterCacheHeapItem(item.segment_id, + item.approx_visit_cnt, + item.current_units_num, + cost, + item.units_num_limit) + ); + benefit_nodes.emplace_back(new FilterCacheHeapItem(item.segment_id, + item.approx_visit_cnt, + item.current_units_num, + benefit, + item.units_num_limit) + ); + } + + // update visit cnt, we need to keep recorder visit cnt and heap visit cnt the same + const uint32_t segment_id = item.segment_id; + const uint32_t visit_cnt = item.approx_visit_cnt; + const uint16_t units_limit = item.units_num_limit; + auto cnt_it = heap_visit_cnt_recorder_.find(segment_id); + auto limit_it = units_num_limit_recorder_.find(segment_id); + if (cnt_it != heap_visit_cnt_recorder_.end()) { + cnt_it->second = visit_cnt; + } else { + heap_visit_cnt_recorder_.insert(std::make_pair(segment_id, visit_cnt)); + } + if (limit_it != units_num_limit_recorder_.end()) { + limit_it->second = units_limit; + } else { + units_num_limit_recorder_.insert(std::make_pair(segment_id, units_limit)); + } + } + + // upsert nodes into heaps + benefit_heap_.batch_upsert(benefit_nodes); + cost_heap_.batch_upsert(cost_nodes); + + manager_mutex_.unlock(); +} + +bool FilterCacheHeapManager::try_modify(FilterCacheModifyResult& result) { + manager_mutex_.lock(); + + FilterCacheHeapNode benefit_node = benefit_heap_.heap_top(); + FilterCacheHeapNode cost_node = cost_heap_.heap_top(); + // if benefit heap or cost heap empty, no need to modify + if (benefit_node == nullptr || cost_node == nullptr) { + manager_mutex_.unlock(); // remember to unlock, or we will cause deadlock + return false; + } + + if (benefit_node->is_alive == false || cost_node->is_alive == false) { + manager_mutex_.unlock(); // remember to unlock, or we will cause deadlock + return false; + } + + const double benefit = benefit_node->benefit_or_cost; + const double cost = cost_node->benefit_or_cost; + // if benefit of enable one unit <= cost of disable one unit, no need to modify + if (benefit <= cost) { + manager_mutex_.unlock(); // remember to unlock, or we will cause deadlock + return false; + } + + const uint32_t benefit_segment_id = benefit_node->segment_id; + const uint32_t cost_segment_id = cost_node->segment_id; + // if we will enable and disable one unit of the same segment, ignore it + if (benefit_segment_id == cost_segment_id) { + manager_mutex_.unlock(); // remember to unlock, or we will cause deadlock + return false; + } + + // FilterCacheHeapItem(const uint32_t& id, const uint32_t& cnt, const uint16_t& units, const double& heap_value) + // we can try filter unit modification, reminded that this modification will modify units num of two segments + // so we need to upsert new nodes of these two segments into benefit heap and cost heap + std::vector new_benefit_nodes, new_cost_nodes; + + /* + if (benefit_node->current_units_num + 1 < benefit_node->units_num_limit) { + new_benefit_nodes.emplace_back(new FilterCacheHeapItem(benefit_node->segment_id, + benefit_node->approx_visit_cnt, + benefit_node->current_units_num + 1, + StandardBenefit(benefit_node->approx_visit_cnt, + benefit_node->current_units_num + 1 + ), + benefit_node->units_num_limit + ) + ); + } + // benefit node will enable one unit, so its units num will always > MIN_UNITS_NUM + new_cost_nodes.emplace_back(new FilterCacheHeapItem(benefit_node->segment_id, + benefit_node->approx_visit_cnt, + benefit_node->current_units_num + 1, + StandardCost(benefit_node->approx_visit_cnt, + benefit_node->current_units_num + 1 + ), + benefit_node->units_num_limit + ) + ); + + if (cost_node->current_units_num - 1 > MIN_UNITS_NUM) { + new_cost_nodes.emplace_back(new FilterCacheHeapItem(cost_node->segment_id, + cost_node->approx_visit_cnt, + cost_node->current_units_num - 1, + StandardCost(cost_node->approx_visit_cnt, + cost_node->current_units_num - 1 + ), + cost_node->units_num_limit + ) + ); + } + // cost node will disable one unit, so its units num will always < MAX_UNITS_NUM + new_benefit_nodes.emplace_back(new FilterCacheHeapItem(cost_node->segment_id, + cost_node->approx_visit_cnt, + cost_node->current_units_num - 1, + StandardBenefit(cost_node->approx_visit_cnt, + cost_node->current_units_num - 1 + ), + cost_node->units_num_limit + ) + ); + */ + // we set benefit of nodes (units num == units num limit) to 0.0 + // and cost of nodes (units num == 0) to Infinite + // these prevent modifying these nodes' units num + // so we dont need to check units num + new_benefit_nodes.emplace_back(new FilterCacheHeapItem(benefit_node->segment_id, + benefit_node->approx_visit_cnt, + benefit_node->current_units_num + 1, + StandardBenefitWithMaxBound(benefit_node->approx_visit_cnt, + benefit_node->current_units_num + 1, + benefit_node->units_num_limit + ), + benefit_node->units_num_limit + ) + ); + new_cost_nodes.emplace_back(new FilterCacheHeapItem(benefit_node->segment_id, + benefit_node->approx_visit_cnt, + benefit_node->current_units_num + 1, + StandardCostWithMinBound(benefit_node->approx_visit_cnt, + benefit_node->current_units_num + 1, + MIN_UNITS_NUM + ), + benefit_node->units_num_limit + ) + ); + new_cost_nodes.emplace_back(new FilterCacheHeapItem(cost_node->segment_id, + cost_node->approx_visit_cnt, + cost_node->current_units_num - 1, + StandardCostWithMinBound(cost_node->approx_visit_cnt, + cost_node->current_units_num - 1, + MIN_UNITS_NUM), + cost_node->units_num_limit + ) + ); + new_benefit_nodes.emplace_back(new FilterCacheHeapItem(cost_node->segment_id, + cost_node->approx_visit_cnt, + cost_node->current_units_num - 1, + StandardBenefitWithMaxBound(cost_node->approx_visit_cnt, + cost_node->current_units_num - 1, + cost_node->units_num_limit + ), + cost_node->units_num_limit + ) + ); + // already make ready for upsert + benefit_heap_.batch_upsert(new_benefit_nodes); + cost_heap_.batch_upsert(new_cost_nodes); + + // write result + result.enable_segment_id = benefit_node->segment_id; + result.disable_segment_id = cost_node->segment_id; + result.enable_segment_units_num = benefit_node->current_units_num; + result.disable_segment_units_num = cost_node->current_units_num; + result.enable_segment_next_units_num = benefit_node->current_units_num + 1; + result.disable_segment_next_units_num = cost_node->current_units_num - 1; + result.enable_benefit = benefit; + result.disable_cost = cost; + + // return nothing, result already written into var result + + manager_mutex_.unlock(); + + return true; +} + +void FilterCacheHeapManager::sync_visit_cnt(std::map& current_visit_cnt_recorder) { + manager_mutex_.lock(); + + std::vector sync_nodes; + std::vector sync_segment_ids; + + auto heap_it = heap_visit_cnt_recorder_.begin(); + auto current_it = current_visit_cnt_recorder.begin(); + while (heap_it != heap_visit_cnt_recorder_.end() && + current_it != current_visit_cnt_recorder.end()) { + if (heap_it->first < current_it->first) { + heap_it ++; + } else if (heap_it->first > current_it->first) { + current_it ++; + } else { + // heap_it->first == current_it->first + assert(heap_it->first == current_it->first); + int64_t old_visit_cnt = heap_it->second; + int64_t cur_visit_cnt = current_it->second; + if (std::abs(cur_visit_cnt-old_visit_cnt) > VISIT_CNT_UPDATE_BOUND) { + heap_it->second = current_it->second; // remember to update heap visit cnt recorder + sync_segment_ids.emplace_back(current_it->first); + } + // heap_it ++; + current_it ++; + } + } + + // query nodes in heap + std::vector sync_benefit_nodes, sync_cost_nodes; + benefit_heap_.batch_query(sync_segment_ids, sync_benefit_nodes); + cost_heap_.batch_query(sync_segment_ids, sync_cost_nodes); + + // update visit cnt and benefit/cost in these nodes + for (FilterCacheHeapNode& sync_benefit_node : sync_benefit_nodes) { + if (sync_benefit_node != nullptr) { + sync_benefit_node->approx_visit_cnt = current_visit_cnt_recorder[sync_benefit_node->segment_id]; + sync_benefit_node->benefit_or_cost = StandardBenefitWithMaxBound(sync_benefit_node->approx_visit_cnt, + sync_benefit_node->current_units_num, + sync_benefit_node->units_num_limit); + } + } + for (FilterCacheHeapNode& sync_cost_node : sync_cost_nodes) { + if (sync_cost_node != nullptr) { + sync_cost_node->approx_visit_cnt = current_visit_cnt_recorder[sync_cost_node->segment_id]; + sync_cost_node->benefit_or_cost = StandardCostWithMinBound(sync_cost_node->approx_visit_cnt, + sync_cost_node->current_units_num, + MIN_UNITS_NUM); + } + } + + // upsert nodes into benefit heap and cost heap + // benefit_heap_.batch_upsert(sync_benefit_nodes); + // cost_heap_.batch_upsert(sync_cost_nodes); + + + // notice that we already updated these nodes in heap, we only need to rebuild heap + // but heap.upsert include the step of checking whether these segments already in heap + // this will waste some time, can we rebuild heap directly? + benefit_heap_.rebuild_heap(); + cost_heap_.rebuild_heap(); + + manager_mutex_.unlock(); +} + +void FilterCacheHeapManager::sync_units_num_limit(std::map& current_units_num_limit_recorder) { + manager_mutex_.lock(); + + std::vector sync_nodes; + std::vector sync_segment_ids; + + auto origin_it = units_num_limit_recorder_.begin(); + auto current_it = current_units_num_limit_recorder.begin(); + while (origin_it != units_num_limit_recorder_.end() && + current_it != current_units_num_limit_recorder.end()) { + if (origin_it->first < current_it->first) { + origin_it ++; + } else if (origin_it->first > current_it->first) { + current_it ++; + } else { + // origin_it->first == current_it->first + assert(origin_it->first == current_it->first); + assert(current_it->second <= MAX_UNITS_NUM); + if (origin_it->second != current_it->second) { + origin_it->second = current_it->second; + sync_segment_ids.emplace_back(current_it->first); + } + current_it ++; + } + } + + // query nodes in heap + std::vector sync_benefit_nodes, sync_cost_nodes; + benefit_heap_.batch_query(sync_segment_ids, sync_benefit_nodes); + cost_heap_.batch_query(sync_segment_ids, sync_cost_nodes); + + // update units num limit, units num and benefit/cost in these nodes + for (FilterCacheHeapNode& sync_benefit_node : sync_benefit_nodes) { + if (sync_benefit_node != nullptr) { + sync_benefit_node->units_num_limit = current_units_num_limit_recorder[sync_benefit_node->segment_id]; + sync_benefit_node->current_units_num = std::min(sync_benefit_node->units_num_limit, + sync_benefit_node->current_units_num); + sync_benefit_node->benefit_or_cost = StandardBenefitWithMaxBound(sync_benefit_node->approx_visit_cnt, + sync_benefit_node->current_units_num, + sync_benefit_node->units_num_limit); + } + } + for (FilterCacheHeapNode& sync_cost_node : sync_cost_nodes) { + if (sync_cost_node != nullptr) { + sync_cost_node->units_num_limit = current_units_num_limit_recorder[sync_cost_node->segment_id]; + sync_cost_node->current_units_num = std::min(sync_cost_node->units_num_limit, + sync_cost_node->current_units_num); + sync_cost_node->benefit_or_cost = StandardCostWithMinBound(sync_cost_node->approx_visit_cnt, + sync_cost_node->current_units_num, + MIN_UNITS_NUM); + } + } + + // upsert nodes into benefit heap and cost heap + // benefit_heap_.batch_upsert(sync_benefit_nodes); + // cost_heap_.batch_upsert(sync_cost_nodes); + + + // notice that we already updated these nodes in heap, we only need to rebuild heap + // but heap.upsert include the step of checking whether these segments already in heap + // this will waste some time, can we rebuild heap directly? + benefit_heap_.rebuild_heap(); + cost_heap_.rebuild_heap(); + + manager_mutex_.unlock(); +} + +void FilterCacheHeapManager::debug() { + std::vector items; + std::vector segment_ids; + std::map current_visit_cnt_recorder; + std::map current_units_num_limit_recorder; + std::map b_heap_index; + std::vector b_heap; + std::map c_heap_index; + std::vector c_heap; + std::fstream f_heap; + f_heap.open("/pg_wal/ycc/heap.log", std::ios::out | std::ios::app); + // FilterCacheHeapItem(const uint32_t& id, const uint32_t& cnt, const uint16_t& units, + // const double& heap_value, const uint16_t& limit) + // 1. try to insert some new data + f_heap << "[DEBUG] debug step 1 : batch insert" << std::endl << std::endl; + for (uint32_t id = 0; id < 70; id++) { + items.emplace_back(id % 70, (id % 70) * 10, (id % 70) / 10, 0, MAX_UNITS_NUM); + } + batch_upsert(items); + benefit_heap_.heap_index(b_heap_index); + benefit_heap_.heap(b_heap); + cost_heap_.heap_index(c_heap_index); + cost_heap_.heap(c_heap); + f_heap << "[DEBUG] step1 b_heap_index : " << std::endl; + for (auto it = b_heap_index.begin(); it != b_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step1 b_heap : " << std::endl; + for (FilterCacheHeapNode& node : b_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step1 c_heap_index : " << std::endl; + for (auto it = c_heap_index.begin(); it != c_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step1 c_heap : " << std::endl; + for (FilterCacheHeapNode& node : c_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step1 visit_cnt_recorder : " << std::endl; + for (auto it = heap_visit_cnt_recorder_.begin(); + it != heap_visit_cnt_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + f_heap << "[DEBUG] step1 units_limit_recorder : " << std::endl; + for (auto it = units_num_limit_recorder_.begin(); + it != units_num_limit_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + + // 2. try to update old data + f_heap << std::endl << std::endl<< "[DEBUG] debug step 2 : batch update (using upsert)" << std::endl << std::endl; + items.clear(); + for (uint32_t id = 0; id < 70; id++) { + items.emplace_back(id % 70, (id % 70) * std::pow(10, (id % 70) / 10), (id % 70) / 10, 0, MAX_UNITS_NUM); + } + batch_upsert(items); + benefit_heap_.heap_index(b_heap_index); + benefit_heap_.heap(b_heap); + cost_heap_.heap_index(c_heap_index); + cost_heap_.heap(c_heap); + f_heap << "[DEBUG] step2 b_heap_index : " << std::endl; + for (auto it = b_heap_index.begin(); it != b_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step2 b_heap : " << std::endl; + for (FilterCacheHeapNode& node : b_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step2 c_heap_index : " << std::endl; + for (auto it = c_heap_index.begin(); it != c_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step2 c_heap : " << std::endl; + for (FilterCacheHeapNode& node : c_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step2 visit_cnt_recorder : " << std::endl; + for (auto it = heap_visit_cnt_recorder_.begin(); + it != heap_visit_cnt_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + f_heap << "[DEBUG] step2 units_limit_recorder : " << std::endl; + for (auto it = units_num_limit_recorder_.begin(); + it != units_num_limit_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + + // 3. try to delete some data + f_heap << std::endl << std::endl<< "[DEBUG] debug step 3 : batch delete" << std::endl << std::endl; + items.clear(); + segment_ids.clear(); + for (uint32_t i = 0; i < 10; i++) { + segment_ids.emplace_back(i); + } + for (uint32_t i = 60; i < 100; i++) { + segment_ids.emplace_back(i); + } + batch_delete(segment_ids); + benefit_heap_.heap_index(b_heap_index); + benefit_heap_.heap(b_heap); + cost_heap_.heap_index(c_heap_index); + cost_heap_.heap(c_heap); + f_heap << "[DEBUG] step3 b_heap_index : " << std::endl; + for (auto it = b_heap_index.begin(); it != b_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step3 b_heap : " << std::endl; + for (FilterCacheHeapNode& node : b_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step3 c_heap_index : " << std::endl; + for (auto it = c_heap_index.begin(); it != c_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step3 c_heap : " << std::endl; + for (FilterCacheHeapNode& node : c_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step3 visit_cnt_recorder : " << std::endl; + for (auto it = heap_visit_cnt_recorder_.begin(); + it != heap_visit_cnt_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + f_heap << "[DEBUG] step3 units_limit_recorder : " << std::endl; + for (auto it = units_num_limit_recorder_.begin(); + it != units_num_limit_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + + // 4. try to sync visit cnt + f_heap << std::endl << std::endl<< "[DEBUG] debug step 4 : sync visit cnt " << std::endl << std::endl; + for (uint32_t id = 0; id < 40; id++) { + if (id % 2 == 0) { + current_visit_cnt_recorder.insert(std::make_pair(id, (id % 70) * std::pow(10, (id % 70) / 10) + 101010)); + } + } + for (uint32_t id = 40; id < 60; id++) { + current_visit_cnt_recorder.insert(std::make_pair(id, (id % 70) * std::pow(10, (id % 70) / 10) + 101010)); + } + sync_visit_cnt(current_visit_cnt_recorder); + benefit_heap_.heap_index(b_heap_index); + benefit_heap_.heap(b_heap); + cost_heap_.heap_index(c_heap_index); + cost_heap_.heap(c_heap); + f_heap << "[DEBUG] step4 b_heap_index : " << std::endl; + for (auto it = b_heap_index.begin(); it != b_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step4 b_heap : " << std::endl; + for (FilterCacheHeapNode& node : b_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step4 c_heap_index : " << std::endl; + for (auto it = c_heap_index.begin(); it != c_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step4 c_heap : " << std::endl; + for (FilterCacheHeapNode& node : c_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step4 visit_cnt_recorder : " << std::endl; + for (auto it = heap_visit_cnt_recorder_.begin(); + it != heap_visit_cnt_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + f_heap << "[DEBUG] step4 units_limit_recorder : " << std::endl; + for (auto it = units_num_limit_recorder_.begin(); + it != units_num_limit_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + + // 5. try to decrease units limit + f_heap << std::endl << std::endl<< "[DEBUG] debug step 5 : decrease units limit " << std::endl << std::endl; + for (uint32_t id = 0; id < 40; id++) { + if (id % 2 == 0) { + current_units_num_limit_recorder.insert(std::make_pair(id, 0)); + } else { + current_units_num_limit_recorder.insert(std::make_pair(id, 1)); + } + } + for (uint32_t id = 40; id < 50; id++) { + current_units_num_limit_recorder.insert(std::make_pair(id, 3)); + } + for (uint32_t id = 50; id < 70; id++) { + current_units_num_limit_recorder.insert(std::make_pair(id, 5)); + } + sync_units_num_limit(current_units_num_limit_recorder); + benefit_heap_.heap_index(b_heap_index); + benefit_heap_.heap(b_heap); + cost_heap_.heap_index(c_heap_index); + cost_heap_.heap(c_heap); + f_heap << "[DEBUG] step5 b_heap_index : " << std::endl; + for (auto it = b_heap_index.begin(); it != b_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step5 b_heap : " << std::endl; + for (FilterCacheHeapNode& node : b_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step5 c_heap_index : " << std::endl; + for (auto it = c_heap_index.begin(); it != c_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step5 c_heap : " << std::endl; + for (FilterCacheHeapNode& node : c_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step5 visit_cnt_recorder : " << std::endl; + for (auto it = heap_visit_cnt_recorder_.begin(); + it != heap_visit_cnt_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + f_heap << "[DEBUG] step5 units_limit_recorder : " << std::endl; + for (auto it = units_num_limit_recorder_.begin(); + it != units_num_limit_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + + // 6. try to increase units limit + f_heap << std::endl << std::endl<< "[DEBUG] debug step 6 : increase units limit " << std::endl << std::endl; + for (uint32_t id = 0; id < 40; id++) { + if (id % 2 == 0) { + current_units_num_limit_recorder[id] = 3; + } else { + current_units_num_limit_recorder[id] = 4; + } + } + for (uint32_t id = 40; id < 50; id++) { + current_units_num_limit_recorder[id] = 5; + } + for (uint32_t id = 50; id < 70; id++) { + current_units_num_limit_recorder[id] = 6; + } + sync_units_num_limit(current_units_num_limit_recorder); + benefit_heap_.heap_index(b_heap_index); + benefit_heap_.heap(b_heap); + cost_heap_.heap_index(c_heap_index); + cost_heap_.heap(c_heap); + f_heap << "[DEBUG] step6 b_heap_index : " << std::endl; + for (auto it = b_heap_index.begin(); it != b_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step6 b_heap : " << std::endl; + for (FilterCacheHeapNode& node : b_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step6 c_heap_index : " << std::endl; + for (auto it = c_heap_index.begin(); it != c_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step6 c_heap : " << std::endl; + for (FilterCacheHeapNode& node : c_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step6 visit_cnt_recorder : " << std::endl; + for (auto it = heap_visit_cnt_recorder_.begin(); + it != heap_visit_cnt_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + f_heap << "[DEBUG] step6 units_limit_recorder : " << std::endl; + for (auto it = units_num_limit_recorder_.begin(); + it != units_num_limit_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + + // 7. try to loop modification + f_heap << std::endl << std::endl<< "[DEBUG] debug step 7 : loop try_modify " << std::endl << std::endl; + f_heap << "[DEBUG] step7 loop start : " << std::endl; + FilterCacheModifyResult result; + while (try_modify(result)) { + f_heap << "enable segment -> " << "id : " << result.enable_segment_id; + f_heap << " , prev units num : " << result.enable_segment_units_num; + f_heap << " , benefit : " << result.enable_benefit << std::endl; + f_heap << "disable segment -> " << "id : " << result.disable_segment_id; + f_heap << " , prev units num : " << result.disable_segment_units_num; + f_heap << " , cost : " << result.disable_cost << std::endl; + } + // write final indexs and heaps + benefit_heap_.heap_index(b_heap_index); + benefit_heap_.heap(b_heap); + cost_heap_.heap_index(c_heap_index); + cost_heap_.heap(c_heap); + f_heap << "[DEBUG] step7 b_heap_index : " << std::endl; + for (auto it = b_heap_index.begin(); it != b_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step7 b_heap : " << std::endl; + for (FilterCacheHeapNode& node : b_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step7 c_heap_index : " << std::endl; + for (auto it = c_heap_index.begin(); it != c_heap_index.end(); it++) { + FilterCacheHeapNode node = it->second; + f_heap << it->first << " -> "; + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step7 c_heap : " << std::endl; + for (FilterCacheHeapNode& node : c_heap) { + f_heap << " id : " << node->segment_id; + f_heap << " , cnt : " << node->approx_visit_cnt; + f_heap << " , units : " << node->current_units_num; + f_heap << " , value : " << node->benefit_or_cost; + f_heap << " , limit : " << node->units_num_limit; + f_heap << " , alive : " << node->is_alive << std::endl; + } + f_heap << "[DEBUG] step7 visit_cnt_recorder : " << std::endl; + for (auto it = heap_visit_cnt_recorder_.begin(); + it != heap_visit_cnt_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + f_heap << "[DEBUG] step7 units_limit_recorder : " << std::endl; + for (auto it = units_num_limit_recorder_.begin(); + it != units_num_limit_recorder_.end(); it++) { + f_heap << it->first << " -> " << it->second << std::endl; + } + + f_heap.close(); +} + +} \ No newline at end of file diff --git a/db/art/filter_cache_heap.h b/db/art/filter_cache_heap.h new file mode 100644 index 000000000..14bca50e2 --- /dev/null +++ b/db/art/filter_cache_heap.h @@ -0,0 +1,269 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include "macros.h" + +namespace ROCKSDB_NAMESPACE { +// when filter cache is full , +// we need to use heap manager to +// clear some space and insert new filter units +// for these coming new segments +// or we may need to use heap manager to adjust filter cache +// to reduce extra I/O caused by false positive + +// reminded that we use this module only when filter cache is full !!! + +struct FilterCacheHeapItem; +typedef FilterCacheHeapItem* FilterCacheHeapNode; +struct FilterCacheModifyResult; +class FilterCacheHeap; +class FilterCacheHeapManager; +inline bool FilterCacheHeapNodeLessComparor(const FilterCacheHeapNode& node_1, const FilterCacheHeapNode& node_2); +inline bool FilterCacheHeapNodeGreaterComparor(const FilterCacheHeapNode& node_1, const FilterCacheHeapNode& node_2); +inline double StandardBenefitWithMaxBound(const uint32_t& visit_cnt, const uint16_t& units_num, const uint16_t& max_bound); +inline double StandardCostWithMinBound(const uint32_t& visit_cnt, const uint16_t& units_num, const uint16_t& min_bound); + +struct FilterCacheHeapItem { + uint32_t segment_id; + uint32_t approx_visit_cnt; // estimated visit cnt + uint16_t current_units_num; // enabled units num for this segment + double benefit_or_cost; // can represent enable benefit or disable cost + uint16_t units_num_limit; // units num prediction model predict maximum units num for every segment + bool is_alive; // sign whether this item still used, if false, that means this segment already merged and freed + // default set heap_value = 0, we will compuate benefit or cost in batch upsert func + FilterCacheHeapItem(const uint32_t& id, const uint32_t& cnt, const uint16_t& units, const double& heap_value, const uint16_t& limit) { + segment_id = id; + approx_visit_cnt = cnt; + current_units_num = units; + benefit_or_cost = heap_value; + units_num_limit = limit; + is_alive = true; + assert(current_units_num >= MIN_UNITS_NUM); + assert(current_units_num <= units_num_limit); + } + /* + FilterCacheHeapItem(const FilterCacheHeapItem& item) { + segment_id = item.segment_id; + approx_visit_cnt = item.approx_visit_cnt; + current_units_num = item.current_units_num; + benefit_or_cost = item.benefit_or_cost; + units_num_limit = item.units_num_limit; + is_alive = item.is_alive; + } + */ +}; + +struct FilterCacheModifyResult { + uint32_t enable_segment_id; + uint32_t disable_segment_id; + uint16_t enable_segment_units_num; + uint16_t disable_segment_units_num; + uint16_t enable_segment_next_units_num; + uint16_t disable_segment_next_units_num; + double enable_benefit; + double disable_cost; +}; + +inline bool FilterCacheHeapNodeLessComparor(const FilterCacheHeapNode& node_1, const FilterCacheHeapNode& node_2) { + return node_1->benefit_or_cost < node_2->benefit_or_cost; +} + +inline bool FilterCacheHeapNodeGreaterComparor(const FilterCacheHeapNode& node_1, const FilterCacheHeapNode& node_2) { + return node_1->benefit_or_cost > node_2->benefit_or_cost; +} + +inline double StandardBenefitWithMaxBound(const uint32_t& visit_cnt, const uint16_t& units_num, const uint16_t& max_bound) { + int bits_per_key = BITS_PER_KEY_PER_UNIT; + // We intentionally round down to reduce probing cost a little bit + int num_probes = static_cast(bits_per_key * 0.69); // 0.69 =~ ln(2) + if (num_probes < 1) num_probes = 1; + if (num_probes > 30) num_probes = 30; + + // compute false positive rate of one filter unit + double rate_per_unit = std::pow(1.0 - std::exp(-double(num_probes) / double(bits_per_key)), num_probes); + + assert(max_bound >= MIN_UNITS_NUM); + assert(max_bound <= MAX_UNITS_NUM); + if (units_num >= max_bound) { + return 0.0; // 0.0 is the lowest value of benefit (benefit >= 0.0) + } + + uint16_t next_units_num = units_num + 1; + double rate = std::pow(rate_per_unit, units_num); + double next_rate = std::pow(rate_per_unit, next_units_num); + + double benefit = double(visit_cnt) * (rate - next_rate); + /* + std::cout << "visit_cnt : " << visit_cnt + << " , rate : " << rate + << " , next_rate : " << next_rate + << " . rate_per_unit : " << rate_per_unit + << std::endl; + */ + assert(benefit >= 0); + return benefit; +} + +inline double StandardCostWithMinBound(const uint32_t& visit_cnt, const uint16_t& units_num, const uint16_t& min_bound) { + int bits_per_key = BITS_PER_KEY_PER_UNIT; + // We intentionally round down to reduce probing cost a little bit + int num_probes = static_cast(bits_per_key * 0.69); // 0.69 =~ ln(2) + if (num_probes < 1) num_probes = 1; + if (num_probes > 30) num_probes = 30; + + // compute false positive rate of one filter unit + double rate_per_unit = std::pow(1.0 - std::exp(-double(num_probes) / double(bits_per_key)), num_probes); + + assert(min_bound >= MIN_UNITS_NUM); + assert(min_bound <= MAX_UNITS_NUM); + if (units_num <= min_bound) { + return __DBL_MAX__; + } + + uint16_t next_units_num = units_num - 1; + double rate = std::pow(rate_per_unit, units_num); + double next_rate = std::pow(rate_per_unit, next_units_num); + + double cost = double(visit_cnt) * (next_rate - rate); + /* + std::cout << "visit_cnt : " << visit_cnt + << " , rate : " << rate + << " , next_rate : " << next_rate + << " . rate_per_unit : " << rate_per_unit + << std::endl; + */ + assert(cost >= 0); + return cost; +} + +class FilterCacheHeap { +private: + int heap_type_; + // map, use this map to fastly locate node in heap + std::map heap_index_; + // use make_heap, push_heap, pop_heap to manage heap + std::vector heap_; + // std::mutex heap_mutex_; + +public: + FilterCacheHeap() { + heap_type_ = UNKNOWN_HEAP; + heap_index_.clear(); + heap_.clear(); + } + + void set_type(const int type) { + heap_type_ = type; + } + + // only rebuild heap_, do nothing to heap_index_ + void rebuild_heap() { + // heap_mutex_.lock(); + + assert(heap_type_ != UNKNOWN_HEAP); + assert(heap_.size() == heap_index_.size()); + if (heap_type_ == BENEFIT_HEAP) { + std::make_heap(heap_.begin(), heap_.end(), FilterCacheHeapNodeLessComparor); + } else if (heap_type_ == COST_HEAP) { + std::make_heap(heap_.begin(), heap_.end(), FilterCacheHeapNodeGreaterComparor); + } + + // heap_mutex_.unlock(); + } + + // return heap top + FilterCacheHeapNode heap_top(); + + // pop one node with deleting node from heap_index_ + // void pop(); + + // push one node with upsert node into heap_index_ + // void push(FilterCacheHeapNode& node); + + // given a batch of segment id, return needed nodes. + // only support batch query and reminded that one return node may be null + // if segment not available or segment not exists in heap_index_ + // result will write into return_nodes + void batch_query(std::vector& segment_ids, std::vector& return_nodes); + + // upsert batch nodes into heap_index_ and heap_ + // only support batch upsert, if one node already exists in heap_index_, it must in heap + // so we only need to update the content of that existing node + void batch_upsert(std::vector& nodes); + + // delete batch nodes from heap_index_ and heap_ + // only support batch delete, if one node not exist in heap_index_, it must not exist in heap + // so we only need to delete these existing nodes + void batch_delete(std::vector& segment_ids); + + // only used in debug !!! + void heap_index(std::map& heap_index) { + heap_index.clear(); + heap_index.insert(heap_index_.begin(), heap_index_.end()); + } + + // only used in debug !!! + void heap(std::vector& heap) { + heap.clear(); + heap.assign(heap_.begin(), heap_.end()); + } +}; + +class FilterCacheHeapManager { +private: + static FilterCacheHeap benefit_heap_; + static FilterCacheHeap cost_heap_; + // set heap node visit cnt = c_1, real estimated visit cnt = c_2 + // we only update c_1 when | c_1 - c_2 | >= VISIT_CNT_UPDATE_BOUND + // update c_1 means we need to update this recorder and heap + // heap_visit_cnt_recorder: map + // when filter cache call delete, this recorder will automately delete these merged segment ids + // when filter cache call upsert, this recorder will automately upsert these segment ids + static std::map heap_visit_cnt_recorder_; + static std::map units_num_limit_recorder_; + static std::mutex manager_mutex_; + +public: + FilterCacheHeapManager() { + benefit_heap_.set_type(BENEFIT_HEAP); + cost_heap_.set_type(COST_HEAP); + heap_visit_cnt_recorder_.clear(); + units_num_limit_recorder_.clear(); + } + + // sync units_num_limit in heap and recorder + // reminded that we will not insert or delete nodes in this method + // we only update these nodes that already exist in two heaps + void sync_units_num_limit(std::map& current_units_num_limit_recorder); + + // sync visit cnt in heap and real estimated visit cnt + // reminded that we will not insert or delete nodes in this method + // we only update these nodes that already exist in two heaps + void sync_visit_cnt(std::map& current_visit_cnt_recorder); + + // try to read benefit_heap top and cost_heap top, then judge whether we need to modify units num in filter cache + // return true when we can modify units num of several segments, return false when we cannot + // reminded that this func only modify heap, we still need to update filter units in filter cache + bool try_modify(FilterCacheModifyResult& result); + + // delete batch segment nodes in benefit_heap and cost_heap, also need to update heap_visit_cnt_recorder_ + void batch_delete(std::vector& segment_ids); + + // upsert batch segment nodes in benefit_heap and cost_heap, also need to update heap_visit_cnt_recorder_ + // only input items, we will allocate space for nodes later + // reminded that we will also update heap_visit_cnt_recorder_ if we update a existed node + // because we need to keep heap visit cnt and recorder visit cnt the same + void batch_upsert(std::vector& items); + + // 1. try debug batch insert + // 2. try debug batch update(use batch_upsert) + void debug(); +}; + +} diff --git a/db/art/greedy_algo.cc b/db/art/greedy_algo.cc new file mode 100644 index 000000000..d81fc00fc --- /dev/null +++ b/db/art/greedy_algo.cc @@ -0,0 +1,91 @@ +#include "greedy_algo.h" +#include +#include +#include +#include + +namespace ROCKSDB_NAMESPACE { + +// this func is not thread-secured, so make only one thread perform this algo!!! +void GreedyAlgo::solve(std::map& segment_algo_infos, + std::map& algo_solution, const uint32_t& cache_size) { + assert(!segment_algo_infos.empty()); + // ready to perform algo + algo_solution.clear(); + std::vector segment_algo_helper_heap; + for (auto it = segment_algo_infos.begin(); it != segment_algo_infos.end(); it++) { + uint32_t segment_id = it->first; + SegmentAlgoInfo segment_algo_info = it->second; + algo_solution[segment_id] = 0; // init algo_solution + + SegmentAlgoHelper segment_algo_helper(segment_id, segment_algo_info); + segment_algo_helper_heap.emplace_back(segment_algo_helper); // init algo heap + } + assert(segment_algo_infos.size() == algo_solution.size()); + assert(segment_algo_infos.size() == segment_algo_helper_heap.size()); + std::make_heap(segment_algo_helper_heap.begin(), + segment_algo_helper_heap.end(), + CompareSegmentAlgoHelper); + + std::fstream f_algo; + f_algo.open("/pg_wal/ycc/algo.log", std::ios::out | std::ios::app); + f_algo << "[DEBUG] start to record algo : " << std::endl; + + // current used space size (bits) of filter cache + uint32_t current_cache_size = 0; + while(!segment_algo_helper_heap.empty()) { + // std::cout << "segment id : " << segment_algo_helper_heap[0].segment_id << std::endl; + + const size_t size = segment_algo_helper_heap.size(); + // heap top item moved to segment_algo_helper_heap[segment_algo_helper_heap.size()-1]; + std::pop_heap(segment_algo_helper_heap.begin(), + segment_algo_helper_heap.end(), + CompareSegmentAlgoHelper); + SegmentAlgoHelper segment_algo_helper_top = segment_algo_helper_heap[size-1]; + // check whether free space (in filter cache) is enough + uint32_t size_needed = segment_algo_helper_top.size_per_unit; + // if not enough, remove this segment helper from heap + // that means we will not consider this segment any longer + if (current_cache_size + size_needed > cache_size) { + segment_algo_helper_heap.pop_back(); + continue; + } + // SegmentAlgoHelper(const uint32_t& id, const uint32_t& cnt, const uint32_t& size, const uint16_t& units) + SegmentAlgoHelper segment_algo_helper_needed(segment_algo_helper_top.segment_id, + segment_algo_helper_top.visit_cnt, + segment_algo_helper_top.size_per_unit, + segment_algo_helper_top.units_num + 1); + // update enabled units + // noticed that if one segment visit cnt == 0, it still enable one unit + // so check visit num before update algo_solution + if (segment_algo_helper_needed.visit_cnt > 0) { + algo_solution[segment_algo_helper_needed.segment_id] = segment_algo_helper_needed.units_num; + current_cache_size += size_needed; + f_algo << "[DEBUG] segment " << segment_algo_helper_needed.segment_id + << " : " << segment_algo_helper_needed.units_num - 1 << " -> " + << segment_algo_helper_needed.units_num << " , cache space left : " + << cache_size - current_cache_size << " , recv benefit : " + << segment_algo_helper_top.enable_benifit << " , next benefit : " + << segment_algo_helper_needed.enable_benifit << std::endl; + } + assert(algo_solution[segment_algo_helper_needed.segment_id] <= MAX_UNITS_NUM); + // enable benefit == 0 means units_num == MAX_UNITS_NUM + // that means we cannot enable one unit for this segment, already enable all units + if (segment_algo_helper_needed.enable_benifit == 0) { + // assert(segment_algo_helper_needed.units_num >= MAX_UNITS_NUM); + segment_algo_helper_heap.pop_back(); + continue; + } + // we can push this new segment helper into heap + segment_algo_helper_heap[size-1] = segment_algo_helper_needed; + std::push_heap(segment_algo_helper_heap.begin(), + segment_algo_helper_heap.end(), + CompareSegmentAlgoHelper); + } + + f_algo << std::endl; + f_algo.close(); + // return nothing, all results should be written into algo_solution +} + +} \ No newline at end of file diff --git a/db/art/greedy_algo.h b/db/art/greedy_algo.h new file mode 100644 index 000000000..a1d03acc6 --- /dev/null +++ b/db/art/greedy_algo.h @@ -0,0 +1,157 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "macros.h" + +namespace ROCKSDB_NAMESPACE { + +struct SegmentAlgoInfo; +struct SegmentAlgoHelper; +class GreedyAlgo; + +inline double StandardBenefit(const uint32_t& visit_cnt, const uint16_t& units_num); +inline double StandardCost(const uint32_t& visit_cnt, const uint16_t& units_num); +inline bool CompareSegmentAlgoHelper(const SegmentAlgoHelper& helper_1, const SegmentAlgoHelper& helper_2); + +// contain visit counter of every segment in last long period +// also contain size of every segment's filter unit +// size of units belonging to one segment should be the same +// size equals to bits that one unit occupies +struct SegmentAlgoInfo { + uint32_t visit_cnt; + uint32_t size_per_unit; + SegmentAlgoInfo(const uint32_t& cnt, const uint32_t& size) { + assert(size > 0); + visit_cnt = cnt; size_per_unit = size; + } +}; + +// helper structure when performing this algo +// exactly, this structure will be the item of algo heap +struct SegmentAlgoHelper { + uint32_t visit_cnt; + uint16_t units_num; + uint32_t size_per_unit; + uint32_t segment_id; + double enable_benifit; + SegmentAlgoHelper(const uint32_t& id, const uint32_t& cnt, const uint32_t& size, const uint16_t& units) { + segment_id = id; visit_cnt = cnt; size_per_unit = size; units_num = units; + enable_benifit = StandardBenefit(visit_cnt, units_num); + // assert(units_num <= MAX_UNITS_NUM); + } + SegmentAlgoHelper(const uint32_t& id, SegmentAlgoInfo& segment_algo_info) { + segment_id = id; visit_cnt = segment_algo_info.visit_cnt; + size_per_unit = segment_algo_info.size_per_unit; units_num = 0; + enable_benifit = StandardBenefit(visit_cnt, units_num); + } +}; + +inline double StandardBenefit(const uint32_t& visit_cnt, const uint16_t& units_num) { + int bits_per_key = BITS_PER_KEY_PER_UNIT; + // We intentionally round down to reduce probing cost a little bit + int num_probes = static_cast(bits_per_key * 0.69); // 0.69 =~ ln(2) + if (num_probes < 1) num_probes = 1; + if (num_probes > 30) num_probes = 30; + + // compute false positive rate of one filter unit + double rate_per_unit = std::pow(1.0 - std::exp(-double(num_probes) / double(bits_per_key)), num_probes); + + if (units_num >= MAX_UNITS_NUM) { + return 0.0; + } + + uint16_t next_units_num = units_num + 1; + double rate = std::pow(rate_per_unit, units_num); + double next_rate = std::pow(rate_per_unit, next_units_num); + + double benefit = double(visit_cnt) * (rate - next_rate); + /* + std::cout << "visit_cnt : " << visit_cnt + << " , rate : " << rate + << " , next_rate : " << next_rate + << " . rate_per_unit : " << rate_per_unit + << std::endl; + */ + assert(benefit >= 0); + return benefit; +} + +inline double StandardCost(const uint32_t& visit_cnt, const uint16_t& units_num) { + int bits_per_key = BITS_PER_KEY_PER_UNIT; + // We intentionally round down to reduce probing cost a little bit + int num_probes = static_cast(bits_per_key * 0.69); // 0.69 =~ ln(2) + if (num_probes < 1) num_probes = 1; + if (num_probes > 30) num_probes = 30; + + // compute false positive rate of one filter unit + double rate_per_unit = std::pow(1.0 - std::exp(-double(num_probes) / double(bits_per_key)), num_probes); + + if (units_num <= MIN_UNITS_NUM) { + return __DBL_MAX__; + } + + uint16_t next_units_num = units_num - 1; + double rate = std::pow(rate_per_unit, units_num); + double next_rate = std::pow(rate_per_unit, next_units_num); + + double cost = double(visit_cnt) * (next_rate - rate); + /* + std::cout << "visit_cnt : " << visit_cnt + << " , rate : " << rate + << " , next_rate : " << next_rate + << " . rate_per_unit : " << rate_per_unit + << std::endl; + */ + assert(cost >= 0); + return cost; +} + +inline bool CompareSegmentAlgoHelper(const SegmentAlgoHelper& helper_1, const SegmentAlgoHelper& helper_2) { + return helper_1.enable_benifit < helper_2.enable_benifit; +} + +class GreedyAlgo { +public: + // segment_algo_infos: map + // algo_solution: map + // cache_size: total size of filter cache, we can left some space for emergency needs + // we can input 95% of real total size as arg cache_size, then left 5% of space for further needs + // full debug process of GreedyAlgo, not thread-secured + // so make sure that only called by one thread + void solve(std::map& segment_algo_infos, + std::map& algo_solution, const uint32_t& cache_size); + // full debug process of GreedyAlgo, not thread-secured + // so make sure that only called by one thread + void debug(std::map& algo_solution, const uint32_t& cache_size) { + // generate debug data + std::map segment_algo_infos; + segment_algo_infos.clear(); + uint32_t min_segment_id = 0, max_segment_id = 9999; + for (uint32_t segment_id = min_segment_id; segment_id <= max_segment_id; segment_id++) { + // SegmentAlgoInfo segment_algo_info(segment_id * 1000, 8 * 1024 * 8); // one unit is 8kb + // segment_algo_infos[segment_id] = SegmentAlgoInfo(segment_id * 1000, 8 * 1024 * 8); + // directly use '=' will cause bug, try use std::map.insert + segment_algo_infos.insert(std::make_pair(segment_id, + SegmentAlgoInfo(segment_id * std::pow(10, (segment_id / 3000) + 1), 8 * 1024 * 4))); // one unit 2 kb + } + assert(segment_algo_infos.size() == max_segment_id + 1); + + // already generate debug data, try perform algo + solve(segment_algo_infos, algo_solution, cache_size); + + // simple check results + // noticed that if segment a visit_cnt >= segment b visit_cnt + // then segment a units_num >= segment b units_num + for (uint32_t segment_id = min_segment_id; segment_id < max_segment_id; segment_id++) { + assert(algo_solution[segment_id] <= algo_solution[segment_id + 1]); + } + } +}; + + +} \ No newline at end of file diff --git a/db/art/macros.h b/db/art/macros.h index 3375c2474..637851e3f 100644 --- a/db/art/macros.h +++ b/db/art/macros.h @@ -138,26 +138,63 @@ namespace ROCKSDB_NAMESPACE { */ // micros for HeatBuckets -#define BUCKETS_ALPHA 0.2 -#define SAMPLES_LIMIT 10000 -#define SAMPLES_MAXCNT 5000000 -#define PERIOD_COUNT 500000 -#define DEFAULT_BUCKETS 500 -#define MAGIC_FACTOR 500 + +// hotness update formula +#define BUCKETS_ALPHA 0.2 +// samples pool max size, using reservoir sampling +#define SAMPLES_LIMIT 10000 +// if recv samples exceed SAMPLES_MAXCNT, end reservoir sampling and init Heat Buckets +#define SAMPLES_MAXCNT 5000000 +// short period get count, if get count equal to or exceed PERIOD_COUNT, +// end this short period and start next short period +#define PERIOD_COUNT 50000 +// number of heat buckets (number of key ranges, see hotness estimating in the paper) +#define DEFAULT_BUCKETS 500 +// magic number in class HeatBuckets +#define MAGIC_FACTOR 500 // micros for Model Train -#define TRAIN_PERIODS 20 -#define MODEL_PATH "/pg_wal/ycc/" -#define MODEL_SUFFIX ".txt" -#define MODEL_PREFIX "model_" -#define DATASET_SUFFIX ".csv" -#define DATASET_PREFIX "dataset_" -#define SIGNIFICANT_DIGITS 6 + +// long period = TRAIN_PERIODS * short period. if one long period end, evaluate model and retrain model if necessary +#define TRAIN_PERIODS 10 +// dataset csv file name +#define DATASET_NAME "dataset.csv" +// the path to save model txt file and train dataset csv file +#define MODEL_PATH "/pg_wal/ycc/" +// we cannot send hotness value (double) to model side, +// so we try multiple hotness value by SIGNIFICANT_DIGITS_FACTOR, then send its integer part to model +#define SIGNIFICANT_DIGITS_FACTOR 1e6 + +// config micro connecting to LightGBM server + +// we use Inet socket to connect server +#define HOST "127.0.0.1" +#define PORT "9090" +// max size of socket receive buffer size +#define BUFFER_SIZE 8 +// socket message prefix +#define TRAIN_PREFIX "t " +#define PREDICT_PREFIX "p " // micros for filter cache -#define DEFAULT_UNITS 5 -#define BITS_PER_KEY 2 -#define MAX_UNITS 10 -#define MIN_UNITS 0 + +// before model work, we enable DEFAULT_UNITS_NUM units for every segments +#define DEFAULT_UNITS_NUM 5 +// bits-per-key for every filter unit of every segment, +// found default bits-per-key = DEFAULT_UNITS_NUM * BITS_PER_KEY_PER_UNIT = 10 +// equal to primary value of paper benchmark config value +#define BITS_PER_KEY_PER_UNIT 4 +// max unit nums for every segment, we only generate MAX_UNITS_NUM units for every segment +#define MAX_UNITS_NUM 6 +// we enable 0 unit for coldest segments +#define MIN_UNITS_NUM 0 +// default max size of cache space : 8 * 1024 * 1024 * 128 = 1073741824 bit = 128 MB +#define CACHE_SPACE_SIZE 1073741824 +// fitler cache helper heap type +#define BENEFIT_HEAP 0 +#define COST_HEAP 1 +#define UNKNOWN_HEAP 2 +// visit cnt update bound +#define VISIT_CNT_UPDATE_BOUND 100 } // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index e4a39bd7a..549b12394 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -25,6 +25,8 @@ #include #include #include +#include +#include #include "db/art/timestamp.h" #include "db/art/logger.h" @@ -245,7 +247,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, #ifdef ART_PLUS get_cnt_ = 0; period_cnt_ = 0; - train_period_ = 0; + last_train_period_ = 0; #endif // !batch_per_trx_ implies seq_per_batch_ because it is only unset for @@ -1755,7 +1757,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, #ifdef ART std::string art_key(key.data(), key.size()); #ifdef ART_PLUS - // ready to estimate hotness + // ready to estimate hotness, update heat buckets if (heat_buckets_.is_ready()) { get_cnt_ += 1; if (get_cnt_ >= PERIOD_COUNT) { @@ -1768,45 +1770,59 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, } if (heat_buckets_.is_ready() && period_cnt_ > 0 && - period_cnt_ - train_period_ >= TRAIN_PERIODS) { + period_cnt_ - last_train_period_ >= TRAIN_PERIODS) { + bool need_train = false; train_mutex_.lock(); - if (period_cnt_ > 0 && - period_cnt_ - train_period_ >= TRAIN_PERIODS) { - std::cout << "[DEBUG] try to train models" << std::endl; - train_period_ = period_cnt_; - pid_t pid = fork(); - assert(pid >= 0); - if (pid == 0) { - if (!clf_model_.is_ready()) { - std::vector feature_nums; - clf_model_.make_ready(feature_nums); - } - - std::vector> datas; - std::vector preds; - std::uint16_t model_cnt; + if (period_cnt_ - last_train_period_ >= TRAIN_PERIODS) { + need_train = true; + last_train_period_ = period_cnt_; + } + train_mutex_.unlock(); + // only one thread can train model. + if (need_train) { + std::fstream f_model; + f_model.open("/pg_wal/ycc/model.log", std::ios::out | std::ios::app); + f_model << "[DEBUG] try to train models" << std::endl; + f_model << "[DEBUG] period_cnt_ : " << period_cnt_ << std::endl; + f_model << "[DEBUG] PERIOD_COUNT : " << PERIOD_COUNT << std::endl; + f_model << "[DEBUG] TRAIN_PERIODS : " << TRAIN_PERIODS << std::endl; + + if (!clf_model_.is_ready()) { + std::vector feature_nums; + clf_model_.make_ready(feature_nums); + } - model_cnt = clf_model_.make_train(datas); + std::vector> datas; + std::vector tags; + std::vector preds; - clf_model_.make_predict(datas, preds); + // std::thread train_thread(make_train, clf_model_, datas, tags); + // train_thread.detach(); + clf_model_.make_train(datas, tags); - std::cout << "[DEBUG] already train " << model_cnt << "models in period " << train_period_ << std::endl; - std::cout << "[DEBUG] predict result: " << std::endl; - for (uint16_t pred : preds) { - std::cout << pred << " "; - } - std::cout << std::endl; + clf_model_.make_predict(datas, preds); - exit(EXIT_SUCCESS); + f_model << "[DEBUG] debug predict result: " << std::endl; + for (uint16_t& pred : preds) { + f_model << pred << " "; } - - if (pid == 0) { - // if child process dont exit, force to exit. - // normally, it will not reach here - exit(EXIT_SUCCESS); + f_model << std::endl << std::endl << std::endl; + f_model.close(); + + std::fstream f_algo; + f_algo.open("/pg_wal/ycc/algo.log", std::ios::out | std::ios::app); + f_algo << "[DEBUG] greedy algo debug results : " << std::endl; + std::map algo_solution; + uint32_t cache_size = CACHE_SPACE_SIZE; + GreedyAlgo greedy_algo; + greedy_algo.debug(algo_solution, cache_size); + for (auto it = algo_solution.begin(); it != algo_solution.end(); it++) { + f_algo << "[DEBUG] " << it->first << " -> " << it->second << std::endl; } - } - train_mutex_.unlock(); + f_algo.close(); + + filter_cache_heap_manager_.debug(); + } } #endif diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index da4c66a59..806b2857b 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -26,6 +26,8 @@ #include "db/art/vlog_manager.h" #include "db/art/heat_buckets.h" #include "db/art/clf_model.h" +#include "db/art/filter_cache_heap.h" +#include "db/art/greedy_algo.h" #include "db/column_family.h" #include "db/compaction/compaction_job.h" #include "db/dbformat.h" @@ -1905,16 +1907,22 @@ class DBImpl : public DB { ClfModel clf_model_; + FilterCacheHeapManager filter_cache_heap_manager_; + // monitor low-level segments min key and max key std::vector> segments_info_; // record get cnt in current period, when equal to PERIOD_COUNT, start next period uint32_t get_cnt_; - // record period cnt, if period_cnt_ % TRAIN_PERIODS = 0, start to evaluate or retrain model - std::mutex train_mutex_; + // record period cnt, if period_cnt_ - last_train_period_ >= TRAIN_PERIODS, start to evaluate or retrain model- uint32_t period_cnt_; - uint32_t train_period_; + + // record in which period last model trained. + uint32_t last_train_period_; + + // train mutex, preventing model trained more than one time + std::mutex train_mutex_; #endif // Offset of last record written by leader writer. uint64_t last_record_offset_; diff --git a/lgb_server/README.md b/lgb_server/README.md new file mode 100644 index 000000000..d74eb4c83 --- /dev/null +++ b/lgb_server/README.md @@ -0,0 +1,5 @@ +## lgb_server + +Because the trouble of LightGBM C++ API, we seperate LightGBM train and predict job to a single socket server (python3.12) + +once WaLSM need to train new model or predict for new segments, it need to init socket client, send this socket server message and wait for response if necessary (especially for predict job) \ No newline at end of file diff --git a/lgb_server/__init__.py b/lgb_server/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lgb_server/model.py b/lgb_server/model.py new file mode 100644 index 000000000..e120ca803 --- /dev/null +++ b/lgb_server/model.py @@ -0,0 +1,77 @@ +import pandas as pd +import lightgbm +import numpy + +model_path = '/pg_wal/ycc/' +# model_path = '' + +class LGBModel(): + def __init__(self) -> None: + self.__model = None + # one unit is 2 bits-per-key, class = 5 mean bits-per-key = 5 * 2 = 10 + # the default bits-per-key value of previous benchmark is 10 + self.__default_class = 5 + self.__model_name = 'model.txt' + # self.__host = '127.0.0.1' + # self.__port = '6666' + # self.__sock = None + # self.__server = None + # normally, one data row will not exceed 1024 Bytes + # we will check this out in WaLSM c++ client + # self.__bufsize = 1024 + # self.__conn = 8 + + def train(self, dataset: str) -> None: + df = pd.read_csv(dataset) + y = df['Target'] + X = df.drop(columns=['Target']) + # clf = lightgbm.LGBMClassifier(min_child_samples=1, n_estimators=1, objective="multiclass") + clf = lightgbm.LGBMClassifier() + clf.fit(X, y) + # if we directly set self.__model = clf, then self.__model always predict class 0 + # we need save clf to txt file, then read this model to init self.__model + clf.booster_.save_model(model_path + self.__model_name) + self.__model = lightgbm.Booster(model_file=model_path+self.__model_name) + # print('load a new model') + + def predict(self, datas: pd.DataFrame) -> str: + # currently, only support one data row + assert len(datas) == 1 + if self.__model is not None: + result = self.__model.predict(datas) + return str(numpy.argmax(result[0])) + else: + return str(self.__default_class) + + ''' + def __close(self) -> None: + if self.__sock is not None: + self.__sock.close() + ''' + + ''' + def start(self) -> None: + self.__sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) + self.__sock.bind((self.__host, self.__port)) + self.__sock.listen(self.__conn) + ''' + + ''' + def serve(self) -> None: + while True: + client, _ = self.__sock.accept() + msg = self.__sock.recv(self.__bufsize) + decoded = lgb_util.parse_msg(msg) + + if type(decoded) is str: + # send client nothing + self.__train(decoded) + elif type(decoded) is list: + decoded = lgb_util.prepare_data(decoded) + # self.__predict(decoded) + # send client target class str (like '0' or '1' or ... ) + client.send(self.__predict(decoded)) + else: + print('msg type unknown, LGBServer exit') + self.__close() + ''' \ No newline at end of file diff --git a/lgb_server/server.py b/lgb_server/server.py new file mode 100644 index 000000000..8eac2cea4 --- /dev/null +++ b/lgb_server/server.py @@ -0,0 +1,53 @@ +import utils +import model +import socketserver +import time + +clf = model.LGBModel() +host = '127.0.0.1' +port = 9090 +bufsize = 1024 + +class LGBhandler(socketserver.BaseRequestHandler): + def handle(self): + try: + while True: + # for example + # if one compaction happen, the new sstable is consisted of 10000 segments + # we can divide these segments into some groups, + # and prefetch filter units for these segments using multi-thread + # that means in rocksdb, one client thread may need to + # predict class for a group of segments + # that means we need keep this connection until all segments of this group done + # use 'while True' and TCP protocol to keep connection + msg = self.request.recv(bufsize).decode('UTF-8', 'ignore').strip() + if not msg: + break + + decoded = utils.parse_msg(msg) + if type(decoded) is str: # mean it is train msg + # send client nothing + clf.train(decoded) + elif type(decoded) is list: # mean it is pred msg, need to send client predict result + # print(decoded) + decoded = utils.prepare_data(decoded) + # send client target class str (like '0' or '1' or ... ) + result = clf.predict(decoded).encode('UTF-8') + # print(clf.predict(decoded)) + self.request.send(result) + except ConnectionResetError: + print('one connection close: ' + + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + +# server = socketserver.ThreadingTCPServer((host,port), LGBhandler) +if __name__ == '__main__': + print('LGBServer start') + assert utils.dataset_path == model.model_path + server = socketserver.ThreadingTCPServer((host,port), LGBhandler) + try: + server.serve_forever() + except KeyboardInterrupt: + print('\nLGBServer end') + + # should not end during benchmark + # print('LGBServer end') \ No newline at end of file diff --git a/lgb_server/test.sh b/lgb_server/test.sh new file mode 100644 index 000000000..c54ead816 --- /dev/null +++ b/lgb_server/test.sh @@ -0,0 +1,3 @@ +rm dataset.csv +gcc test_client.cc -o test -std=c++11 -lstdc++ -lsocket++ +./test \ No newline at end of file diff --git a/lgb_server/test_client.cc b/lgb_server/test_client.cc new file mode 100644 index 000000000..c3031b993 --- /dev/null +++ b/lgb_server/test_client.cc @@ -0,0 +1,166 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +void write_debug_dataset(std::string& path) { + // ready for writer + std::ofstream stream(path); + csv2::Writer> writer(stream); + + // init hotness values + std::map hotness_map; + double base_hotness = 0.01; + for (int i = 0; i < 200; i ++) { + float r = static_cast (rand()) / static_cast (RAND_MAX) + base_hotness; + hotness_map[i] = r; + } + + // init header vector + std::vector> rows; + std::vector header; + header.emplace_back("Level"); + for (int i = 0; i < 20; i ++) { + header.emplace_back("Range_" + std::to_string(i)); + header.emplace_back("Hotness_" + std::to_string(i)); + } + header.emplace_back("Target"); + rows.emplace_back(header); + + // ready for shuffling + std::vector ids; + for(int i = 0; i < 200; i ++) { + ids.emplace_back(i); + } + + // generate values + for (int i = 0; i < 1000; i ++) { + // std::vector value; + std::vector values; + uint32_t level = i / 200; + uint32_t target = 5 - level; + float r = static_cast (rand()) / static_cast (RAND_MAX); + if (r > 0.10 * level) { + target -= 1; + } + + auto seed = std::chrono::system_clock::now().time_since_epoch().count(); + std::shuffle(ids.begin(), ids.end(), std::default_random_engine(seed)); + values.emplace_back(std::to_string(level)); + for (int j = 0; j < 20; j ++) { + values.emplace_back(std::to_string(ids[j])); + values.emplace_back(std::to_string(uint32_t(1e6 * hotness_map[ids[j]]))); + } + values.emplace_back(std::to_string(target)); + + rows.emplace_back(values); + } + + writer.write_rows(rows); + stream.close(); +} + +void make_predict_samples(std::string& path, std::vector>& datas) { + datas.clear(); + csv2::Reader, + csv2::quote_character<'"'>, + csv2::first_row_is_header, + csv2::trim_policy::trim_whitespace> csv; + + if (csv.mmap(path)) { + const auto header = csv.header(); + // int cnt = 0; + for (auto row : csv) { + + /* + if ((++cnt) > 10) { + break; + } + */ + + // cnt ++; + std::vector data; + for (auto cell : row) { + std::string value; + cell.read_value(value); + data.emplace_back(stoul(value)); + } + if (!data.empty()) { + data.pop_back(); + } + datas.emplace_back(data); + } + } +} + +void build_message(std::vector& data, std::string& message) { + message.clear(); + message = std::to_string(data[0]); + for (size_t i = 1; i < data.size(); i ++) { + message = message + " " + std::to_string(data[i]); + } +} + +int main() { + std::string host = "127.0.0.1"; + std::string port = "9090"; + std::string recv; + size_t recv_size = 1024; + std::string file = "dataset.csv"; + + recv.resize(recv_size); + + write_debug_dataset(file); + + libsocket::inet_stream t1_sock(host, port, LIBSOCKET_IPv4); + std::string msg = "t " + file; + // already write dataset, send dataset path to server + // should not receive any message from server + t1_sock << msg; + + // t1_sock.shutdown(); + + std::vector> datas; + + make_predict_samples(file, datas); + libsocket::inet_stream p1_sock(host, port, LIBSOCKET_IPv4); + for (std::vector& data : datas) { + if (!data.empty()) { + build_message(data, msg); + msg = "p " + msg; + p1_sock << msg; + p1_sock >> recv; + // train model need enough time, so should always receive 5 (default class) + std::cout << "receive " << recv.size() << " bytes : " << recv << std::endl; + } + } + + constexpr int sleep_time = 10000; + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); + libsocket::inet_stream p2_sock(host, port, LIBSOCKET_IPv4); + for (std::vector& data : datas) { + if (!data.empty()) { + build_message(data, msg); + msg = "p " + msg; + p2_sock << msg; + p2_sock >> recv; + // already train model, receive data class predicted by the model + std::cout << "receive " << recv.size() << " bytes : " << recv << std::endl; + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); + write_debug_dataset(file); + libsocket::inet_stream t2_sock(host, port, LIBSOCKET_IPv4); + msg = "t " + file; + // already write dataset, send dataset path to server + // should not re + t2_sock << msg; + + return 0; +} \ No newline at end of file diff --git a/lgb_server/utils.py b/lgb_server/utils.py new file mode 100644 index 000000000..3d61825cb --- /dev/null +++ b/lgb_server/utils.py @@ -0,0 +1,42 @@ +from typing import Union +import pandas as pd +import sys + +dataset_path = '/pg_wal/ycc/' +# dataset_path = '' + +# msg should be like 'dataset1.csv' +def parse_train_msg(msg: str) -> str: + assert type(msg) is str + msg_list = msg.split(' ', -1) + assert len(msg_list) == 1 + + return dataset_path + msg_list[0] + +# msg should be like '0 4 12345678 2 2345678', consisted of integer and ' ' +# reminded that that msg shouldn't end with ' ' or start with ' ' +# and every integer should be seperated with single ' ' +def parse_pred_msg(msg: str) -> list[int]: + assert type(msg) is str + assert msg[-1] != ' ' and msg[0] != ' ' + msg_list = msg.split(' ', -1) + return [ int(item) for item in msg_list] + +# build predict data row from list[int] +def prepare_data(data: list[int]) -> pd.DataFrame: + assert type(data) is list and type(data[0]) is int + datas = pd.DataFrame([data]) + return datas + +# socket input should be like 't dataset1.csv' or 'p 0 4 12345678 2 2345678' +def parse_msg(msg: str) -> Union[str, list[int]]: + assert msg[0] == 't' or msg[0] == 'p' + assert msg[1] == ' ' + assert msg[2] != ' ' + # print('new message : ' + msg[2:]) + if msg[0] == 't': + return parse_train_msg(msg[2:]) + elif msg[0] == 'p': + return parse_pred_msg(msg[2:]) + else: + return None \ No newline at end of file diff --git a/models/README.md b/models/README.md new file mode 100644 index 000000000..dbbf845d8 --- /dev/null +++ b/models/README.md @@ -0,0 +1,15 @@ +## models + +used to be called by rocksdb through c++ Python interface(Python.h) + +**deprecated** in latest implement of WaLSM. + +in latest version, We decide to use client-server architecture to train and predict + +## Files + + - lgb.cc: simple c++ demo of calling c++ Python interface + + - lgb.py: simple python func for training or predicting + + - lgb.sh: simple shell for running this c++ demo \ No newline at end of file diff --git a/models/lgb.cc b/models/lgb.cc index 2def252e3..afa662f20 100644 --- a/models/lgb.cc +++ b/models/lgb.cc @@ -238,7 +238,6 @@ int main() { PyRun_SimpleString("sys.path.append('.')"); generate_samples(); - /* train(); std::vector results; @@ -247,8 +246,7 @@ int main() { std::cout << result << " " << std::endl; } std::cout << std::endl; - */ - read_samples(); + //read_samples(); Py_Finalize(); diff --git a/models/lgb.py b/models/lgb.py index 397a3132f..17d3c9756 100644 --- a/models/lgb.py +++ b/models/lgb.py @@ -1,14 +1,17 @@ import pandas as pd import lightgbm import numpy +import sys +from io import StringIO # dataset: train dataset path # output: saved model path def train(dataset: str, output: str): + # sys.stdout = StringIO() df = pd.read_csv(dataset) y = df['Target'] X = df.drop(columns=['Target']) - clf = lightgbm.LGBMClassifier() + clf = lightgbm.LGBMClassifier(n_estimators=2, num_leaves=16, max_depth=8) clf.fit(X, y) # val_pred = clf.predict(X_test) clf.booster_.save_model(output) diff --git a/src.mk b/src.mk index 4eb53d777..d8224f9e5 100644 --- a/src.mk +++ b/src.mk @@ -32,6 +32,8 @@ LIB_SOURCES = \ db/art/global_memtable.cc \ db/art/heat_buckets.cc \ db/art/clf_model.cc \ + db/art/filter_cache_heap.cc \ + db/art/greedy_algo.cc \ db/art/heat_group.cc \ db/art/lock.cc \ db/art/logger.cc \