diff --git a/.gitignore b/.gitignore index 20b9be1f4..397b420a9 100644 --- a/.gitignore +++ b/.gitignore @@ -95,4 +95,8 @@ cmake-build-release *_example inode_vptrs -.cache/ \ No newline at end of file +.cache/ +.conda/ +*.txt +__pycache__/ +*.csv \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 559f18835..c28fe204d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -590,6 +590,7 @@ set(SOURCES db/art/compactor.cc db/art/global_memtable.cc db/art/heat_buckets.cc + db/art/clf_model.cc db/art/heat_group.cc db/art/lock.cc db/art/logger.cc diff --git a/TARGETS b/TARGETS index 98b3a3520..6aa3b118b 100644 --- a/TARGETS +++ b/TARGETS @@ -448,6 +448,7 @@ cpp_library( "db/art/compactor.cc" "db/art/global_memtable.cc", "db/art/heat_buckets.cc", + "db/art/clf_model.cc", "db/art/heat_group.cc", "db/art/lock.cc", "db/art/logger.cc", diff --git a/db/art/clf_model.cc b/db/art/clf_model.cc new file mode 100644 index 000000000..1e1811946 --- /dev/null +++ b/db/art/clf_model.cc @@ -0,0 +1,182 @@ +#include "clf_model.h" +#include +#include +#include +#include + +namespace ROCKSDB_NAMESPACE { + +std::string ClfModel::base_dir_; +uint16_t ClfModel::model_cnt_; +uint16_t ClfModel::dataset_cnt_; +uint16_t ClfModel::feature_num_; + +void ClfModel::write_debug_dataset() { + // ready for writer + std::ofstream stream(next_dataset_path()); + csv2::Writer> writer(stream); + + // init hotness values + std::map hotness_map; + double base_hotness = 0.01; + for (int i = 0; i < 200; i ++) { + float r = static_cast (rand()) / static_cast (RAND_MAX) + base_hotness; + hotness_map[i] = r; + } + + // init header vector + std::vector> rows; + std::vector header; + header.emplace_back("Level"); + for (int i = 0; i < 20; i ++) { + header.emplace_back("Range_" + std::to_string(i)); + header.emplace_back("Hotness_" + std::to_string(i)); + } + header.emplace_back("Target"); + rows.emplace_back(header); + + // ready for shuffling + std::vector ids; + for(int i = 0; i < 200; i ++) { + ids.emplace_back(i); + } + + // generate values + for (int i = 0; i < 1000; i ++) { + // std::vector value; + std::vector values; + uint32_t level = i / 200; + uint32_t target = 5 - level; + float r = static_cast (rand()) / static_cast (RAND_MAX); + if (r > 0.10 * level) { + target -= 1; + } + + auto seed = std::chrono::system_clock::now().time_since_epoch().count(); + std::shuffle(ids.begin(), ids.end(), std::default_random_engine(seed)); + values.emplace_back(std::to_string(level)); + for (int i = 0; i < 20; i ++) { + values.emplace_back(std::to_string(ids[i])); + values.emplace_back(std::to_string(uint32_t(SIGNIFICANT_DIGITS * hotness_map[ids[i]]))); + } + values.emplace_back(std::to_string(target)); + + rows.emplace_back(values); + } + + writer.write_rows(rows); + stream.close(); +} + +void ClfModel::write_true_dataset(std::vector>& datas) { + // ready for writer + std::ofstream stream(next_dataset_path()); + csv2::Writer> writer(stream); + + // init csv header vector + std::vector> rows; + std::vector header; + uint16_t ranges_num = (feature_num_ - 1) / 2; + header.emplace_back("Level"); + for (int i = 0; i < ranges_num; i ++) { + header.emplace_back("Range_" + std::to_string(i)); + header.emplace_back("Hotness_" + std::to_string(i)); + } + // remind that targeted class is in csv Target column + // corresponding to code of lgb.py in ../models dir + header.emplace_back("Target"); + rows.emplace_back(header); + + for (std::vector& data : datas) { + prepare_data(data); + rows.emplace_back(data); + } + + writer.write_rows(rows); + stream.close(); +} + +void ClfModel::write_dataset(std::vector>& datas) { + if (datas.empty()) { + write_debug_dataset(); + // dataset_cnt_ += 1; + return; + } + + assert(feature_num_ % 2 != 0); // features num: 2r + 1 + + write_true_dataset(datas); + // dataset_cnt_ += 1; + return; +} + +uint16_t ClfModel::make_train(std::vector>& datas) { + write_dataset(datas); + + PyObject* pModule = PyImport_ImportModule("lgb"); + assert(pModule != nullptr); + + PyObject* pFunc = PyObject_GetAttrString(pModule, "train"); + assert(pFunc != nullptr && PyCallable_Check(pFunc)); + + + PyObject* pArg = PyTuple_New(2); + PyTuple_SetItem(pArg, 0, Py_BuildValue("s", next_dataset_path().c_str())); + PyTuple_SetItem(pArg, 1, Py_BuildValue("s", next_model_path().c_str())); + + PyObject_CallObject(pFunc, pArg); + + dataset_cnt_ += 1; + model_cnt_ += 1; + + Py_DECREF(pModule); + Py_DECREF(pFunc); + Py_DECREF(pArg); + + return model_cnt_; +} + +void ClfModel::make_predict(std::vector>& datas, std::vector& preds) { + preds.clear(); + for (int i = 0; i < datas.size(); i ++) { + prepare_data(datas[i]); + } + + PyObject* pModule = PyImport_ImportModule("lgb"); + assert(pModule == nullptr); + + PyObject* pFunc = PyObject_GetAttrString(pModule, "predict"); + assert(pFunc != nullptr && PyCallable_Check(pFunc)); + + PyObject* pArg = PyTuple_New(2); + PyTuple_SetItem(pArg, 0, Py_BuildValue("s", latest_model_path())); + + PyObject* pDatas = PyList_New(0); + PyObject* pData = nullptr; + for (std::vector& data : datas) { + pData = PyList_New(0); + for (uint32_t& feature : data) { + PyList_Append(pData, Py_BuildValue("i", feature)); + } + PyList_Append(pDatas, pData); + } + + PyTuple_SetItem(pArg, 1, pDatas); + + PyObject* pReturn = PyObject_CallObject(pFunc, pArg); // should return list + + for (int i = 0; i < datas.size(); i ++) { + int nResult = 0; + PyArg_Parse(PyList_GetItem(pReturn, i), "i", &nResult); + preds.emplace_back(nResult); + } + assert(preds.size() != 0); + + Py_DECREF(pModule); + Py_DECREF(pFunc); + Py_DECREF(pArg); + Py_DECREF(pDatas); + Py_DECREF(pReturn); +} + +} \ No newline at end of file diff --git a/db/art/clf_model.h b/db/art/clf_model.h new file mode 100644 index 000000000..aca470d20 --- /dev/null +++ b/db/art/clf_model.h @@ -0,0 +1,94 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "macros.h" + +// dataset data point format: +// every data point accounts for one segment +// supposed that considering r key range +// every key range have id and hotness ( see heat_buckets ) +// so data point features format : +// LSM-Tree level, Key Range 1 id, Key Range 1 hotness, Key Range 2 id, Key Range 2 hotness, ..., Key Range r id, Key Range r hotness +// remind that heat_buckets recorded hotness is double type, we use uint32_t(uint32_t(SIGNIFICANT_DIGITS * hotness)) + +namespace ROCKSDB_NAMESPACE { + +class ClfModel; + +class ClfModel { +private: + static std::string base_dir_; // dir path for dataset and model + static uint16_t model_cnt_; // trained model file cnt + static uint16_t dataset_cnt_; // written dataset file cnt + static uint16_t feature_num_; // model input features num +public: + // init member vars + ClfModel() { + base_dir_ = MODEL_PATH; + model_cnt_ = 0; + dataset_cnt_ = 0; + feature_num_ = 0; + } + + // next model file path + std::string next_model_path() { return base_dir_ + MODEL_PREFIX + std::to_string(model_cnt_) + MODEL_SUFFIX; } + + // next dataset file path + std::string next_dataset_path() { return base_dir_ + DATASET_PREFIX + std::to_string(dataset_cnt_) + DATASET_SUFFIX; } + + // latest model file path + std::string latest_model_path() { + assert(model_cnt_ > 0); + return base_dir_ + MODEL_PREFIX + std::to_string(model_cnt_ - 1) + MODEL_SUFFIX; + } + + // latest dataset file path + std::string latest_dataset_path() { + assert(dataset_cnt_ > 0); + return base_dir_ + DATASET_PREFIX + std::to_string(dataset_cnt_ - 1) + DATASET_SUFFIX; + } + + // check whether ready, only need check feature_nums_ now + bool is_ready() { return feature_num_ > 0; } + + // make ready for training, only need init feature_nums_ now + void make_ready(std::vector& features_nums) { + feature_num_ = *max_element(features_nums.begin(), features_nums.end()); + + Py_Initialize(); + assert(Py_IsInitialized()); + + PyRun_SimpleString("import sys"); + PyRun_SimpleString("sys.path.append('./models')"); + } + + ~ClfModel() { + Py_Finalize(); + } + + // resize data point features + void prepare_data(std::vector& data) { + assert(data.size() >= 3); + data.resize(feature_num_, 0); + } + + // resize every data point and write to csv file for training + void write_debug_dataset(); + void write_true_dataset(std::vector>& datas); + void write_dataset(std::vector>& datas); + + // write dataset and train, return model cnt + // filter cache caller will check this model cnt and cnt it records, + // if model cnt not equal to caller cnt, it will do update job in filter cache + uint16_t make_train(std::vector>& datas); + + // predict + void make_predict(std::vector>& datas, std::vector& preds); +}; + +} \ No newline at end of file diff --git a/db/art/heat_buckets.h b/db/art/heat_buckets.h index 6d2d0d543..f754e3b7e 100644 --- a/db/art/heat_buckets.h +++ b/db/art/heat_buckets.h @@ -58,6 +58,8 @@ class HeatBuckets { uint32_t locate(const std::string& key); // helper func: locate which bucket hitted by this key const bool& is_ready() { return is_ready_; } + std::vector& seperators() { return seperators_; } + std::vector& buckets() { return buckets_; } void sample(const std::string& key, std::vector>& segments); // before init buckets, we need to sample keys; // input segment-related key range (segments), will use them when SamplesPool ready. diff --git a/db/art/macros.h b/db/art/macros.h index 0d59e9005..7cf360245 100644 --- a/db/art/macros.h +++ b/db/art/macros.h @@ -145,7 +145,19 @@ namespace ROCKSDB_NAMESPACE { #define DEFAULT_BUCKETS 500 #define MAGIC_FACTOR 500 -// micors for Model Train +// micros for Model Train #define TRAIN_PERIODS 10 +#define MODEL_PATH "/tmp/models/" +#define MODEL_SUFFIX ".txt" +#define MODEL_PREFIX "model_" +#define DATASET_SUFFIX ".csv" +#define DATASET_PREFIX "dataset_" +#define SIGNIFICANT_DIGITS 6 + +// micros for filter cache +#define DEFAULT_UNITS 5 +#define BITS_PER_KEY 2 +#define MAX_UNITS 10 +#define MIN_UNITS 0 } // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/models/lgb.cc b/models/lgb.cc new file mode 100644 index 000000000..a64c0c1e1 --- /dev/null +++ b/models/lgb.cc @@ -0,0 +1,232 @@ +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +void generate_samples() { + // ready for writer + std::ofstream stream("lgb.csv"); + csv2::Writer> writer(stream); + + // init hotness values + std::map hotness_map; + double base_hotness = 0.1; + for (int i=0; i<200; i++) { + float r = static_cast (rand()) / static_cast (RAND_MAX) + base_hotness; + hotness_map[i] = r; + } + + // init header vector + std::vector> rows; + std::vector header; + header.emplace_back("Level"); + for (int i=0; i<20; i++) { + header.emplace_back("Range_" + std::to_string(i)); + header.emplace_back("Hotness_" + std::to_string(i)); + } + header.emplace_back("Target"); + rows.emplace_back(header); + + // ready for shuffling + std::vector ids; + for(int i=0; i<200; i++) { + ids.emplace_back(i); + } + + // generate values + for (int i=0; i<1000; i++) { + // std::vector value; + std::vector values; + int level = i / 200; + int target = 5 - level; + float r = static_cast (rand()) / static_cast (RAND_MAX); + if (r > 0.10 * level) { + target -= 1; + } + + auto seed = std::chrono::system_clock::now().time_since_epoch().count(); + std::shuffle(ids.begin(), ids.end(), std::default_random_engine(seed)); + values.emplace_back(std::to_string(level)); + for (int i=0; i<20; i++) { + values.emplace_back(std::to_string(ids[i])); + values.emplace_back(std::to_string(int(1e8 * hotness_map[ids[i]]))); + } + values.emplace_back(std::to_string(target)); + + rows.emplace_back(values); + } + + writer.write_rows(rows); + stream.close(); +} + +void train() { + PyObject* pModule = PyImport_ImportModule("lgb"); + if( pModule == nullptr ){ + std::cout <<"module not found" << std::endl; + exit(EXIT_FAILURE); + } + + PyObject* pFunc = PyObject_GetAttrString(pModule, "train"); + if( !pFunc || !PyCallable_Check(pFunc)){ + std::cout <<"not found function" << std::endl; + exit(EXIT_FAILURE); + } + + PyObject* pArg = PyTuple_New(2); + PyTuple_SetItem(pArg, 0, Py_BuildValue("s", "lgb.csv")); + PyTuple_SetItem(pArg, 1, Py_BuildValue("s", "lgb.txt")); + + PyObject_CallObject(pFunc, pArg); + + Py_DECREF(pModule); + Py_DECREF(pFunc); + Py_DECREF(pArg); +} + +/* +uint16_t predict_one() { + std::vector sample = { + 0,77,83853435,86,32896816,164,109999358,88, + 45036017,191,97761380,192,84780931,40,62674498, + 71,13928034,187,85729384,85,43033713,95, + 102396976,93,95867633,185,19964006,154,62021011,21, + 34288677,161,85558086,181,65248507,162,15193881, + 136,22547489,99,101097202 + }; + + PyObject* pModule = PyImport_ImportModule("lgb"); + if( pModule == nullptr ){ + std::cout <<"module not found" << std::endl; + exit(EXIT_FAILURE); + } + + PyObject* pFunc = PyObject_GetAttrString(pModule, "predict"); + if( !pFunc || !PyCallable_Check(pFunc)){ + std::cout <<"not found function" << std::endl; + exit(EXIT_FAILURE); + } + + PyObject* pArg = PyTuple_New(2); + PyTuple_SetItem(pArg, 0, Py_BuildValue("s", "lgb.txt")); + + PyObject* pData = PyList_New(0); + for (int feature : sample) { + PyList_Append(pData, Py_BuildValue("i", feature)); + } + PyTuple_SetItem(pArg, 1, pData); + + PyObject* pReturn = PyObject_CallObject(pFunc, pArg); + + int nResult; + PyArg_Parse(pReturn, "i", &nResult); + + // std::cout << "return result is " << nResult << std::endl; + return nResult; +} +*/ + +void predict(std::vector& results) { + results.clear(); + std::vector> samples = { + { + 0,77,83853435,86,32896816,164,109999358,88, + 45036017,191,97761380,192,84780931,40,62674498, + 71,13928034,187,85729384,85,43033713,95, + 102396976,93,95867633,185,19964006,154,62021011,21, + 34288677,161,85558086,181,65248507,162,15193881, + 136,22547489,99,101097202 + }, + { + 2,113,32610663,147,83265441,100,58249068,136,22547489, + 166,98995566,141,105010402,99,101097202,146,89779806, + 102,105025231,21,34288677,49,104932701,126,78444504,25, + 50094437,48,16975528,1,49438291,191,97761380,31, + 93911224,107,53195345,129,46866354,111,40745785 + }, + { + 4,125,103500401,33,39603161,64,36666575,75,82095235,182, + 67943000,42,50022864,96,49843665,148,75656366,18,24160255, + 57,12002304,110,88600212,185,19964006,8,37777471,16,73571175, + 26,22979043,153,23490241,104,24766001,100,58249068, + 137,89347040,69,76772379 + } + }; + + PyObject* pModule = PyImport_ImportModule("lgb"); + if( pModule == nullptr ){ + std::cout <<"module not found" << std::endl; + exit(EXIT_FAILURE); + } + + PyObject* pFunc = PyObject_GetAttrString(pModule, "predict"); + if( !pFunc || !PyCallable_Check(pFunc)){ + std::cout <<"not found function" << std::endl; + exit(EXIT_FAILURE); + } + + PyObject* pArg = PyTuple_New(2); + PyTuple_SetItem(pArg, 0, Py_BuildValue("s", "lgb.txt")); + + PyObject* pDatas = PyList_New(0); + PyObject* pData = nullptr; + size_t cnt = 0; + for (std::vector& sample : samples) { + pData = PyList_New(0); + for (uint32_t& feature : sample) { + PyList_Append(pData, Py_BuildValue("i", feature)); + } + PyList_Append(pDatas, pData); + cnt += 1; + } + + PyTuple_SetItem(pArg, 1, pDatas); + + PyObject* pReturn = PyObject_CallObject(pFunc, pArg); // should return list + + for (size_t i = 0; i < cnt; i ++) { + int nResult = 0; + PyArg_Parse(PyList_GetItem(pReturn, i), "i", &nResult); + results.emplace_back(nResult); + } + + Py_DECREF(pModule); + Py_DECREF(pFunc); + Py_DECREF(pArg); + Py_DECREF(pDatas); + Py_DECREF(pReturn); +} + +int main() { + Py_Initialize(); + if(!Py_IsInitialized()){ + std::cout << "python init fail" << std::endl; + exit(EXIT_FAILURE); + } + + PyRun_SimpleString("import sys"); + PyRun_SimpleString("sys.path.append('.')"); + + generate_samples(); + train(); + + std::vector results; + predict(results); + for (uint16_t result : results) { + std::cout << result << " " << std::endl; + } + std::cout << std::endl; + + Py_Finalize(); + + return EXIT_SUCCESS; +} \ No newline at end of file diff --git a/models/lgb.py b/models/lgb.py new file mode 100644 index 000000000..397a3132f --- /dev/null +++ b/models/lgb.py @@ -0,0 +1,50 @@ +import pandas as pd +import lightgbm +import numpy + +# dataset: train dataset path +# output: saved model path +def train(dataset: str, output: str): + df = pd.read_csv(dataset) + y = df['Target'] + X = df.drop(columns=['Target']) + clf = lightgbm.LGBMClassifier() + clf.fit(X, y) + # val_pred = clf.predict(X_test) + clf.booster_.save_model(output) + +# model_file: saved model path +# data: predicted data point +''' +def predict_one(model_file: str, data: list[int]): + # df = pd.read_csv("lgb.csv") + # y = df['Target'] + # X = df.drop(columns=['Target']) + # print(model_file) + assert type(data) is list + data = pd.DataFrame(data).T + model = lightgbm.Booster(model_file=model_file) + result = model.predict(data) + # print(result) + return numpy.argmax(result[0]) +''' + +# model_file: saved model path +# data: predicted data batch +def predict(model_file: str, datas: list[list[int]]): + # df = pd.read_csv("lgb.csv") + # y = df['Target'] + # X = df.drop(columns=['Target']) + # print(model_file) + assert type(datas) is list + datas = pd.DataFrame(datas) + model = lightgbm.Booster(model_file=model_file) + results = model.predict(datas) + # print(result) + # print(results) + return [ numpy.argmax(result) for result in results ] + + +if __name__ == '__main__': + train() + predict() diff --git a/models/lgb.sh b/models/lgb.sh new file mode 100644 index 000000000..c9bbdf735 --- /dev/null +++ b/models/lgb.sh @@ -0,0 +1,3 @@ +rm lgb lgb.txt log.txt +gcc lgb.cc -o lgb -l_lightgbm -std=c++11 -lstdc++ -lpython3.12 -I/home/ycc/miniconda3/include/python3.12 -L/home/ycc/miniconda3/lib +./lgb \ No newline at end of file diff --git a/src.mk b/src.mk index 7c4f3d85b..4eb53d777 100644 --- a/src.mk +++ b/src.mk @@ -31,6 +31,7 @@ LIB_SOURCES = \ db/art/compactor.cc \ db/art/global_memtable.cc \ db/art/heat_buckets.cc \ + db/art/clf_model.cc \ db/art/heat_group.cc \ db/art/lock.cc \ db/art/logger.cc \