Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,7 @@ void ColumnFamilyData::InstallSuperVersion(
return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
}

// TODO: update filter cache (WaLSM+)
void ColumnFamilyData::InstallSuperVersion(
SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options) {
Expand Down
6 changes: 6 additions & 0 deletions db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ inline Slice ExtractUserKey(const Slice& internal_key) {
return Slice(internal_key.data(), internal_key.size() - kNumInternalBytes);
}

// Returns the internal bytes portion of an internal key. (WaLSM+)
inline Slice ExtractInternalBytes(const Slice& internal_key) {
assert(internal_key.size() >= kNumInternalBytes);
return Slice(internal_key.data() + internal_key.size(), kNumInternalBytes);
}

inline Slice ExtractUserKeyAndStripTimestamp(const Slice& internal_key,
size_t ts_sz) {
assert(internal_key.size() >= kNumInternalBytes + ts_sz);
Expand Down
1 change: 1 addition & 0 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ Version::~Version() {
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
// TODO: update filter cache (WaLSM+)
assert(cfd_ != nullptr);
uint32_t path_id = f->fd.GetPathId();
assert(path_id < cfd_->ioptions()->cf_paths.size());
Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/comparator.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#pragma once

#include <memory>
#include <string>

#include "rocksdb/rocksdb_namespace.h"
Expand Down Expand Up @@ -134,4 +135,8 @@ extern const Comparator* BytewiseComparator();
// ordering.
extern const Comparator* ReverseBytewiseComparator();

// Create a comparator that uses the given comparator to perform the comparison
// but ignoring the last 4 bytes of the given key. (WaLSM+)
extern std::unique_ptr<Comparator> SegmentIdRemovingComparator(const Comparator* real_comparator);

} // namespace ROCKSDB_NAMESPACE
26 changes: 26 additions & 0 deletions include/rocksdb/filter_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

#include <stdlib.h>

#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <memory>
#include <stdexcept>
#include <string>
Expand All @@ -38,6 +41,7 @@ struct ConfigOptions;
// A class that takes a bunch of keys, then generates filter
class FilterBitsBuilder {
public:
int filter_count_{1};
virtual ~FilterBitsBuilder() {}

// Add Key to filter, you could use any way to store the key.
Expand All @@ -50,6 +54,16 @@ class FilterBitsBuilder {
// The ownership of actual data is set to buf
virtual Slice Finish(std::unique_ptr<const char[]>* buf) = 0;

// Generate the filter using the keys that are added, and the specified hash
// function id. The return value of this function would be the filter bits, The
// ownership of actual data is set to buf
virtual Slice Finish(std::unique_ptr<const char[]>* buf, const int /* filter_id */) {
buf->reset();
fprintf(stderr, "error call FilterBitsBuilder::Finish(buf, filter_id)\n");
exit(1);
return Slice();
}

// Calculate num of keys that can be added and generate a filter
// <= the specified number of bytes.
#if defined(_MSC_VER)
Expand Down Expand Up @@ -84,6 +98,18 @@ class FilterBitsReader {
may_match[i] = MayMatch(*keys[i]);
}
}
// Check if the entry match the bits in filter using the specified hash function (WaLSM+)
virtual bool MayMatch(const Slice& /* entry */, const int /* hash_id */) {
fprintf(stderr, "Error call FilterBitsReader::MayMatch(entry, hash_id)");
exit(1);
return true;
}

// Check if an array of entries match the bits in filter using the specified hash function (WaLSM+)
virtual void MayMatch(int /* num_keys */, Slice** /* keys */, bool* /* may_match */, const int /* hash_id */) {
fprintf(stderr, "Error call FilterBitsReader::MayMatch(num_keys, keys, may_match, hash_id)");
exit(1);
}
};

// Contextual information passed to BloomFilterPolicy at filter building time.
Expand Down
4 changes: 4 additions & 0 deletions table/block_based/block_based_table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

#pragma once

#include <memory>
#include "db/range_tombstone_fragmenter.h"
#include "file/filename.h"
#include "rocksdb/comparator.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_type.h"
#include "table/block_based/cachable_entry.h"
Expand Down Expand Up @@ -527,6 +529,7 @@ struct BlockBasedTable::Rep {
table_options(_table_opt),
filter_policy(skip_filters ? nullptr : _table_opt.filter_policy.get()),
internal_comparator(_internal_comparator),
segment_id_removing_comparator(SegmentIdRemovingComparator(_internal_comparator.user_comparator())),
filter_type(FilterType::kNoFilter),
index_type(BlockBasedTableOptions::IndexType::kBinarySearch),
hash_index_allow_collision(false),
Expand All @@ -542,6 +545,7 @@ struct BlockBasedTable::Rep {
const BlockBasedTableOptions table_options;
const FilterPolicy* const filter_policy;
const InternalKeyComparator& internal_comparator;
const std::unique_ptr<Comparator> segment_id_removing_comparator;
Status status;
std::unique_ptr<RandomAccessFileReader> file;
char cache_key_prefix[kMaxCacheKeyPrefixSize];
Expand Down
121 changes: 113 additions & 8 deletions table/block_based/filter_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <array>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include <vector>

#include "rocksdb/filter_policy.h"

Expand Down Expand Up @@ -354,13 +358,13 @@ class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder {
void operator=(const LegacyBloomBitsBuilder&) = delete;

~LegacyBloomBitsBuilder() override;

// hash to one value and push into hash_entries_
// noticed that Hash use double hashing, we only need one hash value h
// then use double hashing
void AddKey(const Slice& key) override;

// already collect hash values, just write to filter,
// already collect hash values, just write to filter,
// return slice(real filter bits + num_probes(1 bit) + num_lines(4 bits))
Slice Finish(std::unique_ptr<const char[]>* buf) override;

Expand All @@ -380,6 +384,8 @@ class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder {
num_probes_);
}

int hash_id_;

private:
int bits_per_key_;
int num_probes_;
Expand All @@ -406,7 +412,8 @@ class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder {

LegacyBloomBitsBuilder::LegacyBloomBitsBuilder(const int bits_per_key,
Logger* info_log)
: bits_per_key_(bits_per_key),
: hash_id_(0),
bits_per_key_(bits_per_key),
num_probes_(LegacyNoLocalityBloomImpl::ChooseNumProbes(bits_per_key_)),
info_log_(info_log) {
assert(bits_per_key_);
Expand All @@ -415,7 +422,7 @@ LegacyBloomBitsBuilder::LegacyBloomBitsBuilder(const int bits_per_key,
LegacyBloomBitsBuilder::~LegacyBloomBitsBuilder() {}

void LegacyBloomBitsBuilder::AddKey(const Slice& key) {
uint32_t hash = BloomHash(key);
uint32_t hash = BloomHashId(key, hash_id_);
if (hash_entries_.size() == 0 || hash != hash_entries_.back()) {
hash_entries_.push_back(hash);
}
Expand Down Expand Up @@ -538,6 +545,67 @@ inline void LegacyBloomBitsBuilder::AddHash(uint32_t h, char* data,
folly::constexpr_log2(CACHE_LINE_SIZE));
}

class MultiLegacyBloomBitsBuilder : public FilterBitsBuilder {
public:
explicit MultiLegacyBloomBitsBuilder(const size_t filter_count,
const int bits_per_key,
Logger* info_log);
~MultiLegacyBloomBitsBuilder();

// No copy allowed
MultiLegacyBloomBitsBuilder(const MultiLegacyBloomBitsBuilder&) = delete;
void operator=(const MultiLegacyBloomBitsBuilder&) = delete;

virtual void AddKey(const Slice& key) override;
virtual Slice Finish(std::unique_ptr<const char[]>* buf) override;
virtual Slice Finish(std::unique_ptr<const char[]>* buf,
const int hash_id) override;

private:
std::vector<LegacyBloomBitsBuilder*> bits_builders_;

void AddHash(uint32_t h, char* data, uint32_t num_lines, uint32_t total_bits);
};

MultiLegacyBloomBitsBuilder::MultiLegacyBloomBitsBuilder(
const size_t filter_count, const int bits_per_key, Logger* info_log) {
filter_count_ = filter_count;
bits_builders_.reserve(filter_count);

for (size_t i = 0; i < filter_count; ++i) {
// TODO determine num_probes
LegacyBloomBitsBuilder* bits_builder =
new LegacyBloomBitsBuilder(bits_per_key, info_log);
bits_builder->hash_id_ = i;
bits_builders_.push_back(bits_builder);
}
}

MultiLegacyBloomBitsBuilder::~MultiLegacyBloomBitsBuilder() {
for (size_t i = 0; i < bits_builders_.size(); ++i) {
delete bits_builders_[i];
bits_builders_[i] = nullptr;
}
}

void MultiLegacyBloomBitsBuilder::AddKey(const Slice& key) {
for (size_t i = 0; i < bits_builders_.size(); ++i) {
bits_builders_[i]->AddKey(key);
}
}

Slice MultiLegacyBloomBitsBuilder::Finish(std::unique_ptr<const char[]>* buf) {
buf->reset();
fprintf(stderr, "error call MultiLegacyBloomBitsBuilder::Finish(buf)\n");
exit(1);
return Slice();
}

Slice MultiLegacyBloomBitsBuilder::Finish(std::unique_ptr<const char[]>* buf,
int hash_id) {
return bits_builders_[hash_id]->Finish(buf);
}

class LegacyBloomBitsReader : public FilterBitsReader {
public:
// init func
Expand Down Expand Up @@ -586,6 +654,38 @@ class LegacyBloomBitsReader : public FilterBitsReader {
}
}

// check whether key is in filter array
// "contents" contains the data built by a preceding call to
// FilterBitsBuilder::Finish. MayMatch must return true if the key was
// passed to FilterBitsBuilder::AddKey. This method may return true or false
// if the key was not on the list, but it should aim to return false with a
// high probability. (WaLSM+)
bool MayMatch(const Slice& key, const int hash_id) override {
uint32_t hash = BloomHashId(key, hash_id);
uint32_t byte_offset;
LegacyBloomImpl::PrepareHashMayMatch(
hash, num_lines_, data_, /*out*/ &byte_offset, log2_cache_line_size_);
return LegacyBloomImpl::HashMayMatchPrepared(
hash, num_probes_, data_ + byte_offset, log2_cache_line_size_);
}

// check whether keys is in filter array (WaLSM+)
virtual void MayMatch(int num_keys, Slice** keys, bool* may_match, const int hash_id) override {
std::array<uint32_t, MultiGetContext::MAX_BATCH_SIZE> hashes;
std::array<uint32_t, MultiGetContext::MAX_BATCH_SIZE> byte_offsets;
for (int i = 0; i < num_keys; ++i) {
hashes[i] = BloomHashId(*keys[i], hash_id);
LegacyBloomImpl::PrepareHashMayMatch(hashes[i], num_lines_, data_,
/*out*/ &byte_offsets[i],
log2_cache_line_size_);
}
for (int i = 0; i < num_keys; ++i) {
may_match[i] = LegacyBloomImpl::HashMayMatchPrepared(
hashes[i], num_probes_, data_ + byte_offsets[i],
log2_cache_line_size_);
}
}

private:
const char* data_;
const int num_probes_;
Expand Down Expand Up @@ -618,7 +718,7 @@ const std::vector<BloomFilterPolicy::Mode> BloomFilterPolicy::kAllUserModes = {
kAuto,
};

// init BloomFilterPolicy, only used for old Block Filter Format,
// init BloomFilterPolicy, only used for old Block Filter Format,
// BloomFilterPolicy not used in our work -- WaLSM and WaLSM+
BloomFilterPolicy::BloomFilterPolicy(double bits_per_key, Mode mode)
: mode_(mode), warned_(false), aggregate_rounding_balance_(0) {
Expand Down Expand Up @@ -754,15 +854,20 @@ FilterBitsBuilder* BloomFilterPolicy::GetBuilderWithContext(
"with format_version>=5.",
whole_bits_per_key_, adjective);
}
return new LegacyBloomBitsBuilder(whole_bits_per_key_,
context.info_log);
// return new LegacyBloomBitsBuilder(whole_bits_per_key_,
// context.info_log);

// TODO: determine filter_count,
// and maybe move this property to some kind of options (WaLSM+)
const int filter_count = 10;
new MultiLegacyBloomBitsBuilder(filter_count, whole_bits_per_key_, context.info_log);
}
}
assert(false);
return nullptr; // something legal
}

// only return FilterBuilder,
// only return FilterBuilder,
// return LegacyBloomBitsBuilder in our work WaLSM and WaLSM+
FilterBitsBuilder* BloomFilterPolicy::GetBuilderFromContext(
const FilterBuildingContext& context) {
Expand Down
20 changes: 11 additions & 9 deletions table/block_based/full_filter_block.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ Slice FullFilterBlockBuilder::Finish(const BlockHandle& /*tmp*/,

FullFilterBlockReader::FullFilterBlockReader(
const BlockBasedTable* t,
CachableEntry<ParsedFullFilterBlock>&& filter_block)
: FilterBlockReaderCommon(t, std::move(filter_block)) {
CachableEntry<ParsedFullFilterBlock>&& filter_block,
const int hash_id)
: FilterBlockReaderCommon(t, std::move(filter_block)),
hash_id_(hash_id) {
const SliceTransform* const prefix_extractor = table_prefix_extractor();
if (prefix_extractor) {
full_length_enabled_ =
Expand All @@ -115,7 +117,7 @@ bool FullFilterBlockReader::KeyMayMatch(
if (!whole_key_filtering()) {
return true;
}
return MayMatch(key, no_io, get_context, lookup_context);
return MayMatch(key, no_io, get_context, lookup_context, hash_id_);
}

std::unique_ptr<FilterBlockReader> FullFilterBlockReader::Create(
Expand Down Expand Up @@ -154,12 +156,12 @@ bool FullFilterBlockReader::PrefixMayMatch(
(void)block_offset;
#endif
assert(block_offset == kNotValid);
return MayMatch(prefix, no_io, get_context, lookup_context);
return MayMatch(prefix, no_io, get_context, lookup_context, hash_id_);
}

bool FullFilterBlockReader::MayMatch(
const Slice& entry, bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context) const {
BlockCacheLookupContext* lookup_context, const int hash_id) const {
CachableEntry<ParsedFullFilterBlock> filter_block;

const Status s =
Expand All @@ -175,7 +177,7 @@ bool FullFilterBlockReader::MayMatch(
filter_block.GetValue()->filter_bits_reader();

if (filter_bits_reader) {
if (filter_bits_reader->MayMatch(entry)) {
if (filter_bits_reader->MayMatch(entry, hash_id)) {
PERF_COUNTER_ADD(bloom_sst_hit_count, 1);
return true;
} else {
Expand All @@ -199,7 +201,7 @@ void FullFilterBlockReader::KeysMayMatch(
// present
return;
}
MayMatch(range, no_io, nullptr, lookup_context);
MayMatch(range, no_io, nullptr, lookup_context, hash_id_);
}

void FullFilterBlockReader::PrefixesMayMatch(
Expand All @@ -215,7 +217,7 @@ void FullFilterBlockReader::PrefixesMayMatch(

void FullFilterBlockReader::MayMatch(
MultiGetRange* range, bool no_io, const SliceTransform* prefix_extractor,
BlockCacheLookupContext* lookup_context) const {
BlockCacheLookupContext* lookup_context, const int hash_id) const {
CachableEntry<ParsedFullFilterBlock> filter_block;

const Status s = GetOrReadFilterBlock(no_io, range->begin()->get_context,
Expand Down Expand Up @@ -254,7 +256,7 @@ void FullFilterBlockReader::MayMatch(
}
}

filter_bits_reader->MayMatch(num_keys, &keys[0], &may_match[0]);
filter_bits_reader->MayMatch(num_keys, &keys[0], &may_match[0], hash_id);

int i = 0;
for (auto iter = filter_range.begin(); iter != filter_range.end(); ++iter) {
Expand Down
Loading