Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,8 @@ cmake-build-release

*_example
inode_vptrs
.cache/
.cache/
.conda/
*.txt
__pycache__/
*.csv
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ set(SOURCES
db/art/compactor.cc
db/art/global_memtable.cc
db/art/heat_buckets.cc
db/art/clf_model.cc
db/art/heat_group.cc
db/art/lock.cc
db/art/logger.cc
Expand Down
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ cpp_library(
"db/art/compactor.cc"
"db/art/global_memtable.cc",
"db/art/heat_buckets.cc",
"db/art/clf_model.cc",
"db/art/heat_group.cc",
"db/art/lock.cc",
"db/art/logger.cc",
Expand Down
182 changes: 182 additions & 0 deletions db/art/clf_model.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#include "clf_model.h"
#include <csv2/writer.hpp>
#include <map>
#include <random>
#include <chrono>

namespace ROCKSDB_NAMESPACE {

std::string ClfModel::base_dir_;
uint16_t ClfModel::model_cnt_;
uint16_t ClfModel::dataset_cnt_;
uint16_t ClfModel::feature_num_;

void ClfModel::write_debug_dataset() {
// ready for writer
std::ofstream stream(next_dataset_path());
csv2::Writer<csv2::delimiter<','>> writer(stream);

// init hotness values
std::map<uint32_t, double> hotness_map;
double base_hotness = 0.01;
for (int i = 0; i < 200; i ++) {
float r = static_cast <float> (rand()) / static_cast <float> (RAND_MAX) + base_hotness;
hotness_map[i] = r;
}

// init header vector
std::vector<std::vector<std::string>> rows;
std::vector<std::string> header;
header.emplace_back("Level");
for (int i = 0; i < 20; i ++) {
header.emplace_back("Range_" + std::to_string(i));
header.emplace_back("Hotness_" + std::to_string(i));
}
header.emplace_back("Target");
rows.emplace_back(header);

// ready for shuffling
std::vector<uint32_t> ids;
for(int i = 0; i < 200; i ++) {
ids.emplace_back(i);
}

// generate values
for (int i = 0; i < 1000; i ++) {
// std::vector<double> value;
std::vector<std::string> values;
uint32_t level = i / 200;
uint32_t target = 5 - level;
float r = static_cast <float> (rand()) / static_cast <float> (RAND_MAX);
if (r > 0.10 * level) {
target -= 1;
}

auto seed = std::chrono::system_clock::now().time_since_epoch().count();
std::shuffle(ids.begin(), ids.end(), std::default_random_engine(seed));
values.emplace_back(std::to_string(level));
for (int i = 0; i < 20; i ++) {
values.emplace_back(std::to_string(ids[i]));
values.emplace_back(std::to_string(uint32_t(SIGNIFICANT_DIGITS * hotness_map[ids[i]])));
}
values.emplace_back(std::to_string(target));

rows.emplace_back(values);
}

writer.write_rows(rows);
stream.close();
}

void ClfModel::write_true_dataset(std::vector<std::vector<uint32_t>>& datas) {
// ready for writer
std::ofstream stream(next_dataset_path());
csv2::Writer<csv2::delimiter<','>> writer(stream);

// init csv header vector
std::vector<std::vector<std::string>> rows;
std::vector<std::string> header;
uint16_t ranges_num = (feature_num_ - 1) / 2;
header.emplace_back("Level");
for (int i = 0; i < ranges_num; i ++) {
header.emplace_back("Range_" + std::to_string(i));
header.emplace_back("Hotness_" + std::to_string(i));
}
// remind that targeted class is in csv Target column
// corresponding to code of lgb.py in ../models dir
header.emplace_back("Target");
rows.emplace_back(header);

for (std::vector<uint32_t>& data : datas) {
prepare_data(data);
rows.emplace_back(data);
}

writer.write_rows(rows);
stream.close();
}

void ClfModel::write_dataset(std::vector<std::vector<uint32_t>>& datas) {
if (datas.empty()) {
write_debug_dataset();
// dataset_cnt_ += 1;
return;
}

assert(feature_num_ % 2 != 0); // features num: 2r + 1

write_true_dataset(datas);
// dataset_cnt_ += 1;
return;
}

uint16_t ClfModel::make_train(std::vector<std::vector<uint32_t>>& datas) {
write_dataset(datas);

PyObject* pModule = PyImport_ImportModule("lgb");
assert(pModule != nullptr);

PyObject* pFunc = PyObject_GetAttrString(pModule, "train");
assert(pFunc != nullptr && PyCallable_Check(pFunc));


PyObject* pArg = PyTuple_New(2);
PyTuple_SetItem(pArg, 0, Py_BuildValue("s", next_dataset_path().c_str()));
PyTuple_SetItem(pArg, 1, Py_BuildValue("s", next_model_path().c_str()));

PyObject_CallObject(pFunc, pArg);

dataset_cnt_ += 1;
model_cnt_ += 1;

Py_DECREF(pModule);
Py_DECREF(pFunc);
Py_DECREF(pArg);

return model_cnt_;
}

void ClfModel::make_predict(std::vector<std::vector<uint32_t>>& datas, std::vector<uint16_t>& preds) {
preds.clear();
for (int i = 0; i < datas.size(); i ++) {
prepare_data(datas[i]);
}

PyObject* pModule = PyImport_ImportModule("lgb");
assert(pModule == nullptr);

PyObject* pFunc = PyObject_GetAttrString(pModule, "predict");
assert(pFunc != nullptr && PyCallable_Check(pFunc));

PyObject* pArg = PyTuple_New(2);
PyTuple_SetItem(pArg, 0, Py_BuildValue("s", latest_model_path()));

PyObject* pDatas = PyList_New(0);
PyObject* pData = nullptr;
for (std::vector<uint32_t>& data : datas) {
pData = PyList_New(0);
for (uint32_t& feature : data) {
PyList_Append(pData, Py_BuildValue("i", feature));
}
PyList_Append(pDatas, pData);
}

PyTuple_SetItem(pArg, 1, pDatas);

PyObject* pReturn = PyObject_CallObject(pFunc, pArg); // should return list

for (int i = 0; i < datas.size(); i ++) {
int nResult = 0;
PyArg_Parse(PyList_GetItem(pReturn, i), "i", &nResult);
preds.emplace_back(nResult);
}
assert(preds.size() != 0);

Py_DECREF(pModule);
Py_DECREF(pFunc);
Py_DECREF(pArg);
Py_DECREF(pDatas);
Py_DECREF(pReturn);
}

}
94 changes: 94 additions & 0 deletions db/art/clf_model.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#pragma once

#include <string>
#include <vector>
#include <algorithm>
#include <string>
#include <Python.h>
#include <cassert>
#include "macros.h"

// dataset data point format:
// every data point accounts for one segment
// supposed that considering r key range
// every key range have id and hotness ( see heat_buckets )
// so data point features format :
// LSM-Tree level, Key Range 1 id, Key Range 1 hotness, Key Range 2 id, Key Range 2 hotness, ..., Key Range r id, Key Range r hotness
// remind that heat_buckets recorded hotness is double type, we use uint32_t(uint32_t(SIGNIFICANT_DIGITS * hotness))

namespace ROCKSDB_NAMESPACE {

class ClfModel;

class ClfModel {
private:
static std::string base_dir_; // dir path for dataset and model
static uint16_t model_cnt_; // trained model file cnt
static uint16_t dataset_cnt_; // written dataset file cnt
static uint16_t feature_num_; // model input features num
public:
// init member vars
ClfModel() {
base_dir_ = MODEL_PATH;
model_cnt_ = 0;
dataset_cnt_ = 0;
feature_num_ = 0;
}

// next model file path
std::string next_model_path() { return base_dir_ + MODEL_PREFIX + std::to_string(model_cnt_) + MODEL_SUFFIX; }

// next dataset file path
std::string next_dataset_path() { return base_dir_ + DATASET_PREFIX + std::to_string(dataset_cnt_) + DATASET_SUFFIX; }

// latest model file path
std::string latest_model_path() {
assert(model_cnt_ > 0);
return base_dir_ + MODEL_PREFIX + std::to_string(model_cnt_ - 1) + MODEL_SUFFIX;
}

// latest dataset file path
std::string latest_dataset_path() {
assert(dataset_cnt_ > 0);
return base_dir_ + DATASET_PREFIX + std::to_string(dataset_cnt_ - 1) + DATASET_SUFFIX;
}

// check whether ready, only need check feature_nums_ now
bool is_ready() { return feature_num_ > 0; }

// make ready for training, only need init feature_nums_ now
void make_ready(std::vector<uint16_t>& features_nums) {
feature_num_ = *max_element(features_nums.begin(), features_nums.end());

Py_Initialize();
assert(Py_IsInitialized());

PyRun_SimpleString("import sys");
PyRun_SimpleString("sys.path.append('./models')");
}

~ClfModel() {
Py_Finalize();
}

// resize data point features
void prepare_data(std::vector<uint32_t>& data) {
assert(data.size() >= 3);
data.resize(feature_num_, 0);
}

// resize every data point and write to csv file for training
void write_debug_dataset();
void write_true_dataset(std::vector<std::vector<uint32_t>>& datas);
void write_dataset(std::vector<std::vector<uint32_t>>& datas);

// write dataset and train, return model cnt
// filter cache caller will check this model cnt and cnt it records,
// if model cnt not equal to caller cnt, it will do update job in filter cache
uint16_t make_train(std::vector<std::vector<uint32_t>>& datas);

// predict
void make_predict(std::vector<std::vector<uint32_t>>& datas, std::vector<uint16_t>& preds);
};

}
2 changes: 2 additions & 0 deletions db/art/heat_buckets.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class HeatBuckets {
uint32_t locate(const std::string& key); // helper func: locate which bucket hitted by this key

const bool& is_ready() { return is_ready_; }
std::vector<std::string>& seperators() { return seperators_; }
std::vector<Bucket>& buckets() { return buckets_; }
void sample(const std::string& key, std::vector<std::vector<std::string>>& segments); // before init buckets, we need to sample keys;
// input segment-related key range (segments), will use them when SamplesPool ready.

Expand Down
14 changes: 13 additions & 1 deletion db/art/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,19 @@ namespace ROCKSDB_NAMESPACE {
#define DEFAULT_BUCKETS 500
#define MAGIC_FACTOR 500

// micors for Model Train
// micros for Model Train
#define TRAIN_PERIODS 10
#define MODEL_PATH "/tmp/models/"
#define MODEL_SUFFIX ".txt"
#define MODEL_PREFIX "model_"
#define DATASET_SUFFIX ".csv"
#define DATASET_PREFIX "dataset_"
#define SIGNIFICANT_DIGITS 6

// micros for filter cache
#define DEFAULT_UNITS 5
#define BITS_PER_KEY 2
#define MAX_UNITS 10
#define MIN_UNITS 0

} // namespace ROCKSDB_NAMESPACE
Loading