Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,4 @@ __pycache__/

include/csv2/
debug.*
*.log
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ set(SOURCES
db/art/global_memtable.cc
db/art/heat_buckets.cc
db/art/clf_model.cc
db/art/filter_cache_heap.cc
db/art/greedy_algo.cc
db/art/heat_group.cc
db/art/lock.cc
db/art/logger.cc
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ cpp_library(
"db/art/global_memtable.cc",
"db/art/heat_buckets.cc",
"db/art/clf_model.cc",
"db/art/filter_cache_heap.cc",
"db/art/greedy_algo.cc",
"db/art/heat_group.cc",
"db/art/lock.cc",
"db/art/logger.cc",
Expand Down
7 changes: 4 additions & 3 deletions YCSB/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ BIND_LEVELDB ?= 0
BIND_LMDB ?= 0

EXTRA_LDFLAGS += -lstdc++
EXTRA_LDFLAGS += -lpython3.12
EXTRA_CXXFLAGS += -I$(PYTHON_INCLUDE_PATH)
EXTRA_CXXFLAGS += -L$(PYTHON_LIBRARY_PATH)
EXTRA_LDFLAGS += -lsocket++
# EXTRA_LDFLAGS += -lpython3.12
# EXTRA_CXXFLAGS += -I$(PYTHON_INCLUDE_PATH)
# EXTRA_CXXFLAGS += -L$(PYTHON_LIBRARY_PATH)

#----------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion YCSB/rocksdb/rocksdb.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
rocksdb.dbname=/pg_wal/ycc
rocksdb.dbname=/tmp/ycc
rocksdb.nvm_path=/pg_wal/ycc/memory_art
rocksdb.format=single
rocksdb.destroy=false
Expand Down
2 changes: 1 addition & 1 deletion YCSB/workloads/workloadt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


recordcount=5000000
operationcount=100000000
operationcount=600000
workload=com.yahoo.ycsb.workloads.CoreWorkload

readallfields=true
Expand Down
7 changes: 4 additions & 3 deletions build_tools/build_detect_platform
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,10 @@ JAVA_STATIC_LDFLAGS="$PLATFORM_LDFLAGS"
JAVAC_ARGS="-source 7"

COMMON_FLAGS="$COMMON_FLAGS -DUSE_PMEM -DART -DART_PLUS"
COMMON_FLAGS="$COMMON_FLAGS -lstdc++ -lpython3.12"
COMMON_FLAGS="$COMMON_FLAGS -I$PYTHON_INCLUDE_PATH"
COMMON_FLAGS="$COMMON_FLAGS -L$PYTHON_LIBRARY_PATH"
COMMON_FLAGS="$COMMON_FLAGS -lstdc++ -lsocket++"
# COMMON_FLAGS="$COMMON_FLAGS -lstdc++ -lpython3.12"
# COMMON_FLAGS="$COMMON_FLAGS -I$PYTHON_INCLUDE_PATH"
# COMMON_FLAGS="$COMMON_FLAGS -L$PYTHON_LIBRARY_PATH"
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lpmem"
JAVA_LDFLAGS="$JAVA_LDFLAGS -lpmem"

Expand Down
151 changes: 72 additions & 79 deletions db/art/clf_model.cc
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
#include "clf_model.h"
#include <csv2/writer.hpp>
#include <csv2/reader.hpp>
#include <libsocket/exception.hpp>
#include <libsocket/inetclientstream.hpp>
#include <vector>
#include <map>
#include <random>
#include <chrono>

namespace ROCKSDB_NAMESPACE {

std::string ClfModel::base_dir_;
uint16_t ClfModel::model_cnt_;
uint16_t ClfModel::dataset_cnt_;
uint16_t ClfModel::feature_num_;
std::string ClfModel::dataset_name_;
std::string ClfModel::dataset_path_;
std::string ClfModel::host_, ClfModel::port_;
size_t ClfModel::buffer_size_;

void ClfModel::write_debug_dataset() {
assert(feature_num_ > 0);
// ready for writer
std::ofstream stream(next_dataset_path());
std::ofstream stream(dataset_path_);
csv2::Writer<csv2::delimiter<','>> writer(stream);

// init hotness values
Expand Down Expand Up @@ -59,20 +63,24 @@ void ClfModel::write_debug_dataset() {
values.emplace_back(std::to_string(level));
for (int j = 0; j < 20; j ++) {
values.emplace_back(std::to_string(ids[j]));
values.emplace_back(std::to_string(uint32_t(SIGNIFICANT_DIGITS * hotness_map[ids[j]])));
values.emplace_back(std::to_string(uint32_t(SIGNIFICANT_DIGITS_FACTOR * hotness_map[ids[j]])));
}
values.emplace_back(std::to_string(target));

assert(values.size() == feature_num_ + 1);
rows.emplace_back(values);
}

writer.write_rows(rows);
stream.close();
}

void ClfModel::write_real_dataset(std::vector<std::vector<uint32_t>>& datas) {
void ClfModel::write_real_dataset(std::vector<std::vector<uint32_t>>& datas, std::vector<uint16_t>& tags) {
assert(feature_num_ > 0);
// tags is real class of all segments,
// we also need to write these tags to dataset besides features
assert(datas.size()==tags.size());
// ready for writer
std::ofstream stream(next_dataset_path());
std::ofstream stream(dataset_path_);
csv2::Writer<csv2::delimiter<','>> writer(stream);

// init csv header vector
Expand All @@ -90,20 +98,26 @@ void ClfModel::write_real_dataset(std::vector<std::vector<uint32_t>>& datas) {
rows.emplace_back(header);

std::vector<std::string> values;
size_t idx = 0;
for (std::vector<uint32_t>& data : datas) {
// resize features vector to size feature_num_
prepare_data(data);
values.clear();
for (uint32_t& value : data) {
values.emplace_back(std::to_string(value));
}
// remember to write real tag to dataset
values.emplace_back(std::to_string(tags[idx++]));
assert(values.size() == feature_num_ + 1);
rows.emplace_back(values);
}

writer.write_rows(rows);
stream.close();
}

void ClfModel::write_dataset(std::vector<std::vector<uint32_t>>& datas) {
void ClfModel::write_dataset(std::vector<std::vector<uint32_t>>& datas, std::vector<uint16_t>& tags) {
assert(feature_num_ > 0);
if (datas.empty()) {
write_debug_dataset();
// dataset_cnt_ += 1;
Expand All @@ -112,114 +126,93 @@ void ClfModel::write_dataset(std::vector<std::vector<uint32_t>>& datas) {

assert(feature_num_ % 2 != 0); // features num: 2r + 1

write_real_dataset(datas);
write_real_dataset(datas, tags);
// dataset_cnt_ += 1;
return;
}

uint16_t ClfModel::make_train(std::vector<std::vector<uint32_t>>& datas) {
write_dataset(datas);

PyObject* pModule = PyImport_ImportModule("lgb");
assert(pModule != nullptr);

PyObject* pFunc = PyObject_GetAttrString(pModule, "train");
assert(pFunc != nullptr && PyCallable_Check(pFunc));


PyObject* pArg = PyTuple_New(2);
PyTuple_SetItem(pArg, 0, Py_BuildValue("s", next_dataset_path().c_str()));
PyTuple_SetItem(pArg, 1, Py_BuildValue("s", next_model_path().c_str()));

PyObject_CallObject(pFunc, pArg);

dataset_cnt_ += 1;
model_cnt_ += 1;

Py_DECREF(pModule);
Py_DECREF(pFunc);
Py_DECREF(pArg);

return model_cnt_;
void ClfModel::make_train(std::vector<std::vector<uint32_t>>& datas, std::vector<uint16_t>& tags) {
assert(feature_num_ > 0);
write_dataset(datas, tags);

// already write dataset
// send msg to LightGBM server, let server read dataset and train new model
libsocket::inet_stream sock(host_, port_, LIBSOCKET_IPv4);
std::string message = TRAIN_PREFIX + dataset_name_;
// already write dataset, send dataset path to server
// should not receive any message from server
sock << message;
// will destroy sock when leaving this func scope
}

void ClfModel::make_predict_samples(std::vector<std::vector<uint32_t>>& datas) {
assert(feature_num_ > 0);
csv2::Reader<csv2::delimiter<','>,
csv2::quote_character<'"'>,
csv2::first_row_is_header<true>,
csv2::trim_policy::trim_whitespace> csv;

if (csv.mmap(latest_dataset_path())) {
std::vector<uint32_t> data;
if (csv.mmap(dataset_path_)) {
// const auto header = csv.header();
int cnt = 0;
for (auto row : csv) {
if ((cnt++) > 10) {
// only choose first 10 samples
if ((++cnt) > 10) {
break;
}

std::vector<uint32_t> data;
data.clear();
for (auto cell : row) {
std::string value;
cell.read_value(value);
data.emplace_back(stoul(value));
}
// remind that csv reader will read a empty row in the end, that is why !data.empty()
// csv file last column is real tag
// we need to pop out last column
if (!data.empty()) {
data.pop_back();
}
assert(data.size() == feature_num_);
datas.emplace_back(data);
}
}
}

void ClfModel::make_real_predict(std::vector<std::vector<uint32_t>>& datas, std::vector<uint16_t>& preds) {
PyObject* pModule = PyImport_ImportModule("lgb");
assert(pModule == nullptr);

PyObject* pFunc = PyObject_GetAttrString(pModule, "predict");
assert(pFunc != nullptr && PyCallable_Check(pFunc));

PyObject* pArg = PyTuple_New(2);
PyTuple_SetItem(pArg, 0, Py_BuildValue("s", latest_model_path()));

PyObject* pDatas = PyList_New(0);
PyObject* pData = nullptr;
assert(preds.empty());
libsocket::inet_stream sock(host_, port_, LIBSOCKET_IPv4);
std::string message, recv_buffer;
for (std::vector<uint32_t>& data : datas) {
pData = PyList_New(0);
prepare_data(data);
for (uint32_t& feature : data) {
PyList_Append(pData, Py_BuildValue("i", feature));
if (!data.empty()) {
message.clear();
recv_buffer.clear();
recv_buffer.resize(buffer_size_);
message = std::to_string(data[0]);
for (size_t i = 1; i < data.size(); i ++) {
message = message + " " + std::to_string(data[i]);
}
message = PREDICT_PREFIX + message;
assert(message.size() <= buffer_size_);
sock << message;
// only receive pred tag integer
sock >> recv_buffer;
uint16_t pred = std::stoul(recv_buffer);
assert(pred >= MIN_UNITS_NUM && pred <= MAX_UNITS_NUM);
preds.emplace_back(pred);
}
PyList_Append(pDatas, pData);
}

PyTuple_SetItem(pArg, 1, pDatas);

PyObject* pReturn = PyObject_CallObject(pFunc, pArg); // should return list
assert(pReturn != nullptr);

for (size_t i = 0; i < datas.size(); i ++) {
int nResult = 0;
PyArg_Parse(PyList_GetItem(pReturn, i), "i", &nResult);
preds.emplace_back(nResult);
}
assert(preds.size() != 0);

Py_DECREF(pModule);
Py_DECREF(pFunc);
Py_DECREF(pArg);
Py_DECREF(pDatas);
Py_DECREF(pReturn);
// only write pred result to vector preds, and return nothing
assert(datas.size() == preds.size());
}

void ClfModel::make_predict(std::vector<std::vector<uint32_t>>& datas, std::vector<uint16_t>& preds) {
preds.clear();

if (model_cnt_ == 0) {
preds.resize(datas.size(), DEFAULT_UNITS);
return;
}

// datas empty means we are debuging class ClfModel
if (datas.empty()) {
make_predict_samples(datas);
}

// only write pred result to vector preds, and return nothing
make_real_predict(datas, preds);
return;
}
Expand Down
Loading