From f4611f0b1aea0cabc87c177f776fcf617632c3a5 Mon Sep 17 00:00:00 2001 From: dayuanx <602457300@qq.com> Date: Thu, 25 Jul 2024 02:33:00 +0800 Subject: [PATCH] add basic filter unit support --- db/column_family.cc | 1 + db/dbformat.h | 6 + db/version_set.cc | 1 + include/rocksdb/comparator.h | 5 + include/rocksdb/filter_policy.h | 26 ++++ table/block_based/block_based_table_reader.h | 4 + table/block_based/filter_policy.cc | 121 ++++++++++++++++-- table/block_based/full_filter_block.cc | 20 +-- table/block_based/full_filter_block.h | 9 +- table/block_based/partitioned_filter_block.cc | 109 +++++++++++++--- table/block_based/partitioned_filter_block.h | 14 +- util/comparator.cc | 56 ++++++++ 12 files changed, 335 insertions(+), 37 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 0385aac4a..d9344f4bb 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1255,6 +1255,7 @@ void ColumnFamilyData::InstallSuperVersion( return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_); } +// TODO: update filter cache (WaLSM+) void ColumnFamilyData::InstallSuperVersion( SuperVersionContext* sv_context, InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options) { diff --git a/db/dbformat.h b/db/dbformat.h index 81c852ac4..7545ff261 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -174,6 +174,12 @@ inline Slice ExtractUserKey(const Slice& internal_key) { return Slice(internal_key.data(), internal_key.size() - kNumInternalBytes); } +// Returns the internal bytes portion of an internal key. (WaLSM+) +inline Slice ExtractInternalBytes(const Slice& internal_key) { + assert(internal_key.size() >= kNumInternalBytes); + return Slice(internal_key.data() + internal_key.size(), kNumInternalBytes); +} + inline Slice ExtractUserKeyAndStripTimestamp(const Slice& internal_key, size_t ts_sz) { assert(internal_key.size() >= kNumInternalBytes + ts_sz); diff --git a/db/version_set.cc b/db/version_set.cc index 710f21867..19c8abb63 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -645,6 +645,7 @@ Version::~Version() { assert(f->refs > 0); f->refs--; if (f->refs <= 0) { + // TODO: update filter cache (WaLSM+) assert(cfd_ != nullptr); uint32_t path_id = f->fd.GetPathId(); assert(path_id < cfd_->ioptions()->cf_paths.size()); diff --git a/include/rocksdb/comparator.h b/include/rocksdb/comparator.h index 53a46ad33..f0c7d5e86 100644 --- a/include/rocksdb/comparator.h +++ b/include/rocksdb/comparator.h @@ -8,6 +8,7 @@ #pragma once +#include #include #include "rocksdb/rocksdb_namespace.h" @@ -134,4 +135,8 @@ extern const Comparator* BytewiseComparator(); // ordering. extern const Comparator* ReverseBytewiseComparator(); +// Create a comparator that uses the given comparator to perform the comparison +// but ignoring the last 4 bytes of the given key. (WaLSM+) +extern std::unique_ptr SegmentIdRemovingComparator(const Comparator* real_comparator); + } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/filter_policy.h b/include/rocksdb/filter_policy.h index 3cd85a226..b634f8a6c 100644 --- a/include/rocksdb/filter_policy.h +++ b/include/rocksdb/filter_policy.h @@ -21,6 +21,9 @@ #include +#include +#include +#include #include #include #include @@ -38,6 +41,7 @@ struct ConfigOptions; // A class that takes a bunch of keys, then generates filter class FilterBitsBuilder { public: + int filter_count_{1}; virtual ~FilterBitsBuilder() {} // Add Key to filter, you could use any way to store the key. @@ -50,6 +54,16 @@ class FilterBitsBuilder { // The ownership of actual data is set to buf virtual Slice Finish(std::unique_ptr* buf) = 0; + // Generate the filter using the keys that are added, and the specified hash + // function id. The return value of this function would be the filter bits, The + // ownership of actual data is set to buf + virtual Slice Finish(std::unique_ptr* buf, const int /* filter_id */) { + buf->reset(); + fprintf(stderr, "error call FilterBitsBuilder::Finish(buf, filter_id)\n"); + exit(1); + return Slice(); + } + // Calculate num of keys that can be added and generate a filter // <= the specified number of bytes. #if defined(_MSC_VER) @@ -84,6 +98,18 @@ class FilterBitsReader { may_match[i] = MayMatch(*keys[i]); } } + // Check if the entry match the bits in filter using the specified hash function (WaLSM+) + virtual bool MayMatch(const Slice& /* entry */, const int /* hash_id */) { + fprintf(stderr, "Error call FilterBitsReader::MayMatch(entry, hash_id)"); + exit(1); + return true; + } + + // Check if an array of entries match the bits in filter using the specified hash function (WaLSM+) + virtual void MayMatch(int /* num_keys */, Slice** /* keys */, bool* /* may_match */, const int /* hash_id */) { + fprintf(stderr, "Error call FilterBitsReader::MayMatch(num_keys, keys, may_match, hash_id)"); + exit(1); + } }; // Contextual information passed to BloomFilterPolicy at filter building time. diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 6dfe2c5cb..e1e9d4300 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -9,8 +9,10 @@ #pragma once +#include #include "db/range_tombstone_fragmenter.h" #include "file/filename.h" +#include "rocksdb/comparator.h" #include "table/block_based/block_based_table_factory.h" #include "table/block_based/block_type.h" #include "table/block_based/cachable_entry.h" @@ -527,6 +529,7 @@ struct BlockBasedTable::Rep { table_options(_table_opt), filter_policy(skip_filters ? nullptr : _table_opt.filter_policy.get()), internal_comparator(_internal_comparator), + segment_id_removing_comparator(SegmentIdRemovingComparator(_internal_comparator.user_comparator())), filter_type(FilterType::kNoFilter), index_type(BlockBasedTableOptions::IndexType::kBinarySearch), hash_index_allow_collision(false), @@ -542,6 +545,7 @@ struct BlockBasedTable::Rep { const BlockBasedTableOptions table_options; const FilterPolicy* const filter_policy; const InternalKeyComparator& internal_comparator; + const std::unique_ptr segment_id_removing_comparator; Status status; std::unique_ptr file; char cache_key_prefix[kMaxCacheKeyPrefixSize]; diff --git a/table/block_based/filter_policy.cc b/table/block_based/filter_policy.cc index 7a3c4fad2..c7112ab68 100644 --- a/table/block_based/filter_policy.cc +++ b/table/block_based/filter_policy.cc @@ -8,7 +8,11 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include +#include +#include #include +#include +#include #include "rocksdb/filter_policy.h" @@ -354,13 +358,13 @@ class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder { void operator=(const LegacyBloomBitsBuilder&) = delete; ~LegacyBloomBitsBuilder() override; - + // hash to one value and push into hash_entries_ // noticed that Hash use double hashing, we only need one hash value h // then use double hashing void AddKey(const Slice& key) override; - // already collect hash values, just write to filter, + // already collect hash values, just write to filter, // return slice(real filter bits + num_probes(1 bit) + num_lines(4 bits)) Slice Finish(std::unique_ptr* buf) override; @@ -380,6 +384,8 @@ class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder { num_probes_); } + int hash_id_; + private: int bits_per_key_; int num_probes_; @@ -406,7 +412,8 @@ class LegacyBloomBitsBuilder : public BuiltinFilterBitsBuilder { LegacyBloomBitsBuilder::LegacyBloomBitsBuilder(const int bits_per_key, Logger* info_log) - : bits_per_key_(bits_per_key), + : hash_id_(0), + bits_per_key_(bits_per_key), num_probes_(LegacyNoLocalityBloomImpl::ChooseNumProbes(bits_per_key_)), info_log_(info_log) { assert(bits_per_key_); @@ -415,7 +422,7 @@ LegacyBloomBitsBuilder::LegacyBloomBitsBuilder(const int bits_per_key, LegacyBloomBitsBuilder::~LegacyBloomBitsBuilder() {} void LegacyBloomBitsBuilder::AddKey(const Slice& key) { - uint32_t hash = BloomHash(key); + uint32_t hash = BloomHashId(key, hash_id_); if (hash_entries_.size() == 0 || hash != hash_entries_.back()) { hash_entries_.push_back(hash); } @@ -538,6 +545,67 @@ inline void LegacyBloomBitsBuilder::AddHash(uint32_t h, char* data, folly::constexpr_log2(CACHE_LINE_SIZE)); } +class MultiLegacyBloomBitsBuilder : public FilterBitsBuilder { + public: + explicit MultiLegacyBloomBitsBuilder(const size_t filter_count, + const int bits_per_key, + Logger* info_log); + ~MultiLegacyBloomBitsBuilder(); + + // No copy allowed + MultiLegacyBloomBitsBuilder(const MultiLegacyBloomBitsBuilder&) = delete; + void operator=(const MultiLegacyBloomBitsBuilder&) = delete; + + virtual void AddKey(const Slice& key) override; + virtual Slice Finish(std::unique_ptr* buf) override; + virtual Slice Finish(std::unique_ptr* buf, + const int hash_id) override; + + private: + std::vector bits_builders_; + + void AddHash(uint32_t h, char* data, uint32_t num_lines, uint32_t total_bits); +}; + +MultiLegacyBloomBitsBuilder::MultiLegacyBloomBitsBuilder( + const size_t filter_count, const int bits_per_key, Logger* info_log) { + filter_count_ = filter_count; + bits_builders_.reserve(filter_count); + + for (size_t i = 0; i < filter_count; ++i) { + // TODO determine num_probes + LegacyBloomBitsBuilder* bits_builder = + new LegacyBloomBitsBuilder(bits_per_key, info_log); + bits_builder->hash_id_ = i; + bits_builders_.push_back(bits_builder); + } +} + +MultiLegacyBloomBitsBuilder::~MultiLegacyBloomBitsBuilder() { + for (size_t i = 0; i < bits_builders_.size(); ++i) { + delete bits_builders_[i]; + bits_builders_[i] = nullptr; + } +} + +void MultiLegacyBloomBitsBuilder::AddKey(const Slice& key) { + for (size_t i = 0; i < bits_builders_.size(); ++i) { + bits_builders_[i]->AddKey(key); + } +} + +Slice MultiLegacyBloomBitsBuilder::Finish(std::unique_ptr* buf) { + buf->reset(); + fprintf(stderr, "error call MultiLegacyBloomBitsBuilder::Finish(buf)\n"); + exit(1); + return Slice(); +} + +Slice MultiLegacyBloomBitsBuilder::Finish(std::unique_ptr* buf, + int hash_id) { + return bits_builders_[hash_id]->Finish(buf); +} + class LegacyBloomBitsReader : public FilterBitsReader { public: // init func @@ -586,6 +654,38 @@ class LegacyBloomBitsReader : public FilterBitsReader { } } + // check whether key is in filter array + // "contents" contains the data built by a preceding call to + // FilterBitsBuilder::Finish. MayMatch must return true if the key was + // passed to FilterBitsBuilder::AddKey. This method may return true or false + // if the key was not on the list, but it should aim to return false with a + // high probability. (WaLSM+) + bool MayMatch(const Slice& key, const int hash_id) override { + uint32_t hash = BloomHashId(key, hash_id); + uint32_t byte_offset; + LegacyBloomImpl::PrepareHashMayMatch( + hash, num_lines_, data_, /*out*/ &byte_offset, log2_cache_line_size_); + return LegacyBloomImpl::HashMayMatchPrepared( + hash, num_probes_, data_ + byte_offset, log2_cache_line_size_); + } + + // check whether keys is in filter array (WaLSM+) + virtual void MayMatch(int num_keys, Slice** keys, bool* may_match, const int hash_id) override { + std::array hashes; + std::array byte_offsets; + for (int i = 0; i < num_keys; ++i) { + hashes[i] = BloomHashId(*keys[i], hash_id); + LegacyBloomImpl::PrepareHashMayMatch(hashes[i], num_lines_, data_, + /*out*/ &byte_offsets[i], + log2_cache_line_size_); + } + for (int i = 0; i < num_keys; ++i) { + may_match[i] = LegacyBloomImpl::HashMayMatchPrepared( + hashes[i], num_probes_, data_ + byte_offsets[i], + log2_cache_line_size_); + } + } + private: const char* data_; const int num_probes_; @@ -618,7 +718,7 @@ const std::vector BloomFilterPolicy::kAllUserModes = { kAuto, }; -// init BloomFilterPolicy, only used for old Block Filter Format, +// init BloomFilterPolicy, only used for old Block Filter Format, // BloomFilterPolicy not used in our work -- WaLSM and WaLSM+ BloomFilterPolicy::BloomFilterPolicy(double bits_per_key, Mode mode) : mode_(mode), warned_(false), aggregate_rounding_balance_(0) { @@ -754,15 +854,20 @@ FilterBitsBuilder* BloomFilterPolicy::GetBuilderWithContext( "with format_version>=5.", whole_bits_per_key_, adjective); } - return new LegacyBloomBitsBuilder(whole_bits_per_key_, - context.info_log); + // return new LegacyBloomBitsBuilder(whole_bits_per_key_, + // context.info_log); + + // TODO: determine filter_count, + // and maybe move this property to some kind of options (WaLSM+) + const int filter_count = 10; + new MultiLegacyBloomBitsBuilder(filter_count, whole_bits_per_key_, context.info_log); } } assert(false); return nullptr; // something legal } -// only return FilterBuilder, +// only return FilterBuilder, // return LegacyBloomBitsBuilder in our work WaLSM and WaLSM+ FilterBitsBuilder* BloomFilterPolicy::GetBuilderFromContext( const FilterBuildingContext& context) { diff --git a/table/block_based/full_filter_block.cc b/table/block_based/full_filter_block.cc index a104bec47..27ad293b0 100644 --- a/table/block_based/full_filter_block.cc +++ b/table/block_based/full_filter_block.cc @@ -94,8 +94,10 @@ Slice FullFilterBlockBuilder::Finish(const BlockHandle& /*tmp*/, FullFilterBlockReader::FullFilterBlockReader( const BlockBasedTable* t, - CachableEntry&& filter_block) - : FilterBlockReaderCommon(t, std::move(filter_block)) { + CachableEntry&& filter_block, + const int hash_id) + : FilterBlockReaderCommon(t, std::move(filter_block)), + hash_id_(hash_id) { const SliceTransform* const prefix_extractor = table_prefix_extractor(); if (prefix_extractor) { full_length_enabled_ = @@ -115,7 +117,7 @@ bool FullFilterBlockReader::KeyMayMatch( if (!whole_key_filtering()) { return true; } - return MayMatch(key, no_io, get_context, lookup_context); + return MayMatch(key, no_io, get_context, lookup_context, hash_id_); } std::unique_ptr FullFilterBlockReader::Create( @@ -154,12 +156,12 @@ bool FullFilterBlockReader::PrefixMayMatch( (void)block_offset; #endif assert(block_offset == kNotValid); - return MayMatch(prefix, no_io, get_context, lookup_context); + return MayMatch(prefix, no_io, get_context, lookup_context, hash_id_); } bool FullFilterBlockReader::MayMatch( const Slice& entry, bool no_io, GetContext* get_context, - BlockCacheLookupContext* lookup_context) const { + BlockCacheLookupContext* lookup_context, const int hash_id) const { CachableEntry filter_block; const Status s = @@ -175,7 +177,7 @@ bool FullFilterBlockReader::MayMatch( filter_block.GetValue()->filter_bits_reader(); if (filter_bits_reader) { - if (filter_bits_reader->MayMatch(entry)) { + if (filter_bits_reader->MayMatch(entry, hash_id)) { PERF_COUNTER_ADD(bloom_sst_hit_count, 1); return true; } else { @@ -199,7 +201,7 @@ void FullFilterBlockReader::KeysMayMatch( // present return; } - MayMatch(range, no_io, nullptr, lookup_context); + MayMatch(range, no_io, nullptr, lookup_context, hash_id_); } void FullFilterBlockReader::PrefixesMayMatch( @@ -215,7 +217,7 @@ void FullFilterBlockReader::PrefixesMayMatch( void FullFilterBlockReader::MayMatch( MultiGetRange* range, bool no_io, const SliceTransform* prefix_extractor, - BlockCacheLookupContext* lookup_context) const { + BlockCacheLookupContext* lookup_context, const int hash_id) const { CachableEntry filter_block; const Status s = GetOrReadFilterBlock(no_io, range->begin()->get_context, @@ -254,7 +256,7 @@ void FullFilterBlockReader::MayMatch( } } - filter_bits_reader->MayMatch(num_keys, &keys[0], &may_match[0]); + filter_bits_reader->MayMatch(num_keys, &keys[0], &may_match[0], hash_id); int i = 0; for (auto iter = filter_range.begin(); iter != filter_range.end(); ++iter) { diff --git a/table/block_based/full_filter_block.h b/table/block_based/full_filter_block.h index 109031f60..98ce22cc2 100644 --- a/table/block_based/full_filter_block.h +++ b/table/block_based/full_filter_block.h @@ -99,7 +99,8 @@ class FullFilterBlockReader // set prefix_extractor if needed // In our work, dont use prefix_extractor FullFilterBlockReader(const BlockBasedTable* t, - CachableEntry&& filter_block); + CachableEntry&& filter_block, + const int hash_id = 0); // call FullFilterBlockReader() to return std::unique_ptr static std::unique_ptr Create( const BlockBasedTable* table, const ReadOptions& ro, @@ -142,11 +143,12 @@ class FullFilterBlockReader private: // Get From Cache Or Read From SST, to get filter, then check whether entry hit bool MayMatch(const Slice& entry, bool no_io, GetContext* get_context, - BlockCacheLookupContext* lookup_context) const; + BlockCacheLookupContext* lookup_context, const int hash_id = 0) const; // range is the key range in the SST, check out these keys may fit in the filter void MayMatch(MultiGetRange* range, bool no_io, const SliceTransform* prefix_extractor, - BlockCacheLookupContext* lookup_context) const; + BlockCacheLookupContext* lookup_context, + const int hash_id = 0) const; // when disable prefix bloom, never call this method bool IsFilterCompatible(const Slice* iterate_upper_bound, const Slice& prefix, const Comparator* comparator) const; @@ -154,6 +156,7 @@ class FullFilterBlockReader private: bool full_length_enabled_; size_t prefix_extractor_full_length_; + const int hash_id_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index dc25abbea..79fc8a057 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -5,18 +5,28 @@ #include "table/block_based/partitioned_filter_block.h" +#include +#include +#include #include +#include "db/dbformat.h" #include "file/file_util.h" #include "monitoring/perf_context_imp.h" #include "port/malloc.h" #include "port/port.h" +#include "rocksdb/comparator.h" #include "rocksdb/filter_policy.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" #include "table/block_based/block.h" #include "table/block_based/block_based_table_reader.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { +Slice generate_modified_internal_key(std::unique_ptr& buf, + Slice original_internal_key, + int filter_index, int segment_id); PartitionedFilterBlockBuilder::PartitionedFilterBlockBuilder( const SliceTransform* _prefix_extractor, bool whole_key_filtering, @@ -55,6 +65,11 @@ PartitionedFilterBlockBuilder::PartitionedFilterBlockBuilder( } } } + + filter_count_ = filter_bits_builder->filter_count_; + filter_gc.resize(filter_count_); + filters.resize(filter_count_); + finishing_filter_index_ = 0; } PartitionedFilterBlockBuilder::~PartitionedFilterBlockBuilder() {} @@ -70,7 +85,6 @@ void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock( if (!p_index_builder_->ShouldCutFilterBlock()) { return; } - filter_gc.push_back(std::unique_ptr(nullptr)); // Add the prefix of the next key before finishing the partition. This hack, // fixes a bug with format_verison=3 where seeking for the prefix would lead @@ -81,9 +95,12 @@ void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock( FullFilterBlockBuilder::AddPrefix(*next_key); } - Slice filter = filter_bits_builder_->Finish(&filter_gc.back()); - std::string& index_key = p_index_builder_->GetPartitionKey(); - filters.push_back({index_key, filter}); + for (int i = 0; i < filter_count_; ++i) { + filter_gc[i].push_back(std::unique_ptr(nullptr)); + Slice filter = filter_bits_builder_->Finish(&filter_gc[i].back(), i); + std::string& index_key = p_index_builder_->GetPartitionKey(); + filters[i].push_back({index_key, filter, segment_id_base_.fetch_add(1, std::memory_order_relaxed)}); + } keys_added_to_partition_ = 0; Reset(); } @@ -102,7 +119,7 @@ Slice PartitionedFilterBlockBuilder::Finish( const BlockHandle& last_partition_block_handle, Status* status) { if (finishing_filters == true) { // Record the handle of the last written filter block in the index - FilterEntry& last_entry = filters.front(); + FilterEntry& last_entry = filters[finishing_filter_index_].front(); std::string handle_encoding; last_partition_block_handle.EncodeTo(&handle_encoding); std::string handle_delta_encoding; @@ -111,20 +128,32 @@ Slice PartitionedFilterBlockBuilder::Finish( last_partition_block_handle.size() - last_encoded_handle_.size()); last_encoded_handle_ = last_partition_block_handle; const Slice handle_delta_encoding_slice(handle_delta_encoding); - index_on_filter_block_builder_.Add(last_entry.key, handle_encoding, + + std::unique_ptr modified_key_buf; + Slice modified_key = generate_modified_internal_key( + modified_key_buf, last_entry.key, finishing_filter_index_, + last_entry.segment_id); + index_on_filter_block_builder_.Add(modified_key, handle_encoding, &handle_delta_encoding_slice); if (!p_index_builder_->seperator_is_key_plus_seq()) { index_on_filter_block_builder_without_seq_.Add( - ExtractUserKey(last_entry.key), handle_encoding, + ExtractUserKey(modified_key), handle_encoding, &handle_delta_encoding_slice); } - filters.pop_front(); + filters[finishing_filter_index_].pop_front(); } else { MaybeCutAFilterBlock(nullptr); } // If there is no filter partition left, then return the index on filter // partitions - if (UNLIKELY(filters.empty())) { + if (UNLIKELY(filters[finishing_filter_index_].empty())) { + finishing_filter_index_++; + if (finishing_filter_index_ < filter_count_) { + *status = Status::Incomplete(); + finishing_filters = true; + return filters[finishing_filter_index_].front().filter; + } + *status = Status::OK(); if (finishing_filters) { if (p_index_builder_->seperator_is_key_plus_seq()) { @@ -141,7 +170,7 @@ Slice PartitionedFilterBlockBuilder::Finish( // indicate we expect more calls to Finish *status = Status::Incomplete(); finishing_filters = true; - return filters.front().filter; + return filters[finishing_filter_index_].front().filter; } } @@ -235,10 +264,12 @@ void PartitionedFilterBlockReader::PrefixesMayMatch( BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle( const CachableEntry& filter_block, const Slice& entry) const { IndexBlockIter iter; - const InternalKeyComparator* const comparator = internal_comparator(); + // const InternalKeyComparator* const comparator = internal_comparator(); + const Comparator* const segment_id_removing_comparator = table()->get_rep()->segment_id_removing_comparator.get(); Statistics* kNullStats = nullptr; filter_block.GetValue()->NewIndexIterator( - comparator->user_comparator(), + // comparator->user_comparator(), + segment_id_removing_comparator, table()->get_rep()->get_global_seqno(BlockType::kFilter), &iter, kNullStats, true /* total_order_seek */, false /* have_first_key */, index_key_includes_seq(), index_value_is_full()); @@ -256,6 +287,7 @@ BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle( return fltr_blk_handle; } +// TODO: retrieve filter block from filter cache (WaLSM+) Status PartitionedFilterBlockReader::GetFilterPartitionBlock( FilePrefetchBuffer* prefetch_buffer, const BlockHandle& fltr_blk_handle, bool no_io, GetContext* get_context, @@ -289,6 +321,7 @@ Status PartitionedFilterBlockReader::GetFilterPartitionBlock( return s; } +// TODO: retrieve filter block from filter cache (WaLSM+) bool PartitionedFilterBlockReader::MayMatch( const Slice& slice, const SliceTransform* prefix_extractor, uint64_t block_offset, bool no_io, const Slice* const_ikey_ptr, @@ -306,11 +339,18 @@ bool PartitionedFilterBlockReader::MayMatch( return true; } - auto filter_handle = GetFilterPartitionHandle(filter_block, *const_ikey_ptr); + // find key "0 original_internal key". filter_index=segment_id=0. (WaLSM+) + // segment_id itself is useless in comparison, + // but must be appended otherwise the extracted user key will be incorrect. + std::unique_ptr modified_key_buf; + Slice modified_key = + generate_modified_internal_key(modified_key_buf, *const_ikey_ptr, 0, 0); + auto filter_handle = GetFilterPartitionHandle(filter_block, modified_key); if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range return false; } + // TODO: get some filter blocks from the filter cache and check (WaLSM+) CachableEntry filter_partition_block; s = GetFilterPartitionBlock(nullptr /* prefetch_buffer */, filter_handle, no_io, get_context, lookup_context, @@ -322,11 +362,16 @@ bool PartitionedFilterBlockReader::MayMatch( FullFilterBlockReader filter_partition(table(), std::move(filter_partition_block)); + // initialize the reader with hash_id (WaLSM+) + // FullFilterBlockReader filter_partition(table(), + // std::move(filter_partition_block), + // 1); return (filter_partition.*filter_function)( slice, prefix_extractor, block_offset, no_io, const_ikey_ptr, get_context, lookup_context); } +// TODO: retrieve filter block from filter cache (WaLSM+) void PartitionedFilterBlockReader::MayMatch( MultiGetRange* range, const SliceTransform* prefix_extractor, uint64_t block_offset, bool no_io, BlockCacheLookupContext* lookup_context, @@ -350,9 +395,15 @@ void PartitionedFilterBlockReader::MayMatch( // share block cache lookup and use full filter multiget on the partition // filter. for (auto iter = start_iter_same_handle; iter != range->end(); ++iter) { + // find key "0 original_internal key". filter_index=segment_id=0. (WaLSM+) + // segment_id itself is useless in comparison, + // but must be appended otherwise the extracted user key will be incorrect. + std::unique_ptr modified_key_buf; + Slice modified_key = + generate_modified_internal_key(modified_key_buf, iter->ikey, 0, 0); // TODO: re-use one top-level index iterator BlockHandle this_filter_handle = - GetFilterPartitionHandle(filter_block, iter->ikey); + GetFilterPartitionHandle(filter_block, modified_key); if (!prev_filter_handle.IsNull() && this_filter_handle != prev_filter_handle) { MultiGetRange subrange(*range, start_iter_same_handle, iter); @@ -380,6 +431,7 @@ void PartitionedFilterBlockReader::MayMatch( } } +// TODO: retrieve filter block from filter cache (WaLSM+) void PartitionedFilterBlockReader::MayMatchPartition( MultiGetRange* range, const SliceTransform* prefix_extractor, uint64_t block_offset, BlockHandle filter_handle, bool no_io, @@ -394,6 +446,10 @@ void PartitionedFilterBlockReader::MayMatchPartition( return; // Any/all may match } + // initialize the reader with hash_id (WaLSM+) + // FullFilterBlockReader filter_partition(table(), + // std::move(filter_partition_block), + // 1); FullFilterBlockReader filter_partition(table(), std::move(filter_partition_block)); (filter_partition.*filter_function)(range, prefix_extractor, block_offset, @@ -439,9 +495,10 @@ void PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro, IndexBlockIter biter; const InternalKeyComparator* const comparator = internal_comparator(); + const Comparator* const segment_id_removing_comparator = rep->segment_id_removing_comparator.get(); Statistics* kNullStats = nullptr; filter_block.GetValue()->NewIndexIterator( - comparator->user_comparator(), rep->get_global_seqno(BlockType::kFilter), + segment_id_removing_comparator, rep->get_global_seqno(BlockType::kFilter), &biter, kNullStats, true /* total_order_seek */, false /* have_first_key */, index_key_includes_seq(), index_value_is_full()); @@ -512,4 +569,26 @@ bool PartitionedFilterBlockReader::index_value_is_full() const { return table()->get_rep()->index_value_is_full; } +std::atomic PartitionedFilterBlockBuilder::segment_id_base_{0}; + +Slice generate_modified_internal_key(std::unique_ptr& buf, Slice original_internal_key, int filter_index, int segment_id) { + // calculate modified_key (WaLSM+) + // +--------------+------------------------------------+------------+-------------------------+ + // | filter_index | original_user_key | segment_id | original_internal_bytes | + // | 4 bytes | (key.size() - kInternalBytes) bytes| 4 bytes | kInternalBytes bytes | + // +--------------+------------------------------------+------------+-------------------------+ + size_t modified_key_buf_size = 4 + original_internal_key.size() + 4; + char *modified_key_buf = new char[modified_key_buf_size]; + Slice original_user_key = ExtractUserKey(original_internal_key); + Slice original_internal_bytes = ExtractInternalBytes(original_internal_key); + EncodeFixed32R(modified_key_buf, filter_index); + std::memcpy(modified_key_buf + 4, original_user_key.data(), original_user_key.size()); + EncodeFixed32R(modified_key_buf + 4 + original_user_key.size(), segment_id); + std::memcpy(modified_key_buf + 4 + original_user_key.size() + 4, original_user_key.data_, original_user_key.size()); + Slice modified_key = Slice(modified_key_buf, modified_key_buf_size); + + buf.reset(modified_key_buf); + return modified_key; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/partitioned_filter_block.h b/table/block_based/partitioned_filter_block.h index 2ccc8f8bc..7ba3973e1 100644 --- a/table/block_based/partitioned_filter_block.h +++ b/table/block_based/partitioned_filter_block.h @@ -5,9 +5,12 @@ #pragma once +#include +#include #include #include #include +#include #include "db/dbformat.h" #include "index_builder.h" #include "rocksdb/options.h" @@ -45,10 +48,11 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder { struct FilterEntry { std::string key; Slice filter; + uint32_t segment_id; }; - std::list filters; // list of partitioned indexes and their keys + std::vector> filters; // list of partitioned indexes and their keys std::unique_ptr value; - std::vector> filter_gc; + std::vector>> filter_gc; bool finishing_filters = false; // true if Finish is called once but not complete yet. // The policy of when cut a filter block and Finish it @@ -63,6 +67,12 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder { // The number of keys added to the last partition so far uint32_t keys_added_to_partition_; BlockHandle last_encoded_handle_; + + // The number of filter builders(hash functions) for each segment. (WaLSM+) + int filter_count_; + // When Finish() is called, return filters[filter_index].front() (WaLSM+) + int finishing_filter_index_; + static std::atomic segment_id_base_; }; class PartitionedFilterBlockReader : public FilterBlockReaderCommon { diff --git a/util/comparator.cc b/util/comparator.cc index 44d45732a..3d7565692 100644 --- a/util/comparator.cc +++ b/util/comparator.cc @@ -207,6 +207,56 @@ class ReverseBytewiseComparatorImpl : public BytewiseComparatorImpl { }; }// namespace +// Remove the last 4 bytes of the key to be compared +// before performing the comparison +// (WaLSM+) +class SegmentIdRemovingComparatorImpl : public Comparator { + public: + SegmentIdRemovingComparatorImpl(const Comparator* comparator) + : real_comparator(comparator) {} + + const char* Name() const override { + return "walsmplus.SegmentIdRemovingComparator"; + } + + int Compare(const Slice& a, const Slice& b) const override { + return real_comparator->Compare(Slice(a.data(), a.size() - 4), + Slice(b.data(), b.size() - 4)); + } + + bool Equal(const Slice& a, const Slice& b) const override { + return real_comparator->Equal(Slice(a.data(), a.size() - 4), + Slice(b.data(), b.size() - 4)); + } + + void FindShortestSeparator(std::string* start, + const Slice& limit) const override { + real_comparator->FindShortestSeparator(start, limit); + } + + void FindShortSuccessor(std::string* key) const override { + real_comparator->FindShortSuccessor(key); + } + + bool IsSameLengthImmediateSuccessor(const Slice& s, + const Slice& t) const override { + return real_comparator->IsSameLengthImmediateSuccessor( + Slice(s.data(), s.size() - 4), Slice(t.data(), t.size() - 4)); + } + + bool CanKeysWithDifferentByteContentsBeEqual() const override { + return real_comparator->CanKeysWithDifferentByteContentsBeEqual(); + } + + int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b, + bool b_has_ts) const override { + return CompareWithoutTimestamp(a, a_has_ts, b, b_has_ts); + } + + private: + const Comparator* real_comparator; +}; + const Comparator* BytewiseComparator() { static BytewiseComparatorImpl bytewise; return &bytewise; @@ -217,4 +267,10 @@ const Comparator* ReverseBytewiseComparator() { return &rbytewise; } +std::unique_ptr SegmentIdRemovingComparator( + const Comparator* real_comparator) { + return std::unique_ptr( + new SegmentIdRemovingComparatorImpl(real_comparator)); +} + } // namespace ROCKSDB_NAMESPACE