Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Implement the FNV (Fowler–Noll–Vo) hashing algorithm in the project and drop dependency on the `fnv-hash` gem. [PR #14](https://github.com/riverqueue/riverqueue-ruby/pull/14).

## [0.3.0] - 2024-04-27

### Added
Expand Down
2 changes: 0 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ PATH
remote: .
specs:
riverqueue (0.3.0)
fnv-hash

GEM
remote: https://rubygems.org/
Expand Down Expand Up @@ -32,7 +31,6 @@ GEM
drb (2.2.1)
ffi (1.16.3)
fileutils (1.7.2)
fnv-hash (0.2.0)
i18n (1.14.4)
concurrent-ruby (~> 1.0)
io-console (0.7.2)
Expand Down
4 changes: 2 additions & 2 deletions lib/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ def insert_many(args)

@driver.transaction do
lock_key = if @advisory_lock_prefix.nil?
Fnv::Hash.fnv_1(lock_str, size: 64)
FNV.fnv1_hash(lock_str, size: 64)
else
# Steep should be able to tell that this is not nil, but it can't.
prefix = @advisory_lock_prefix #: Integer # rubocop:disable Layout/LeadingCommentSpace
prefix << 32 | Fnv::Hash.fnv_1(lock_str, size: 32)
prefix << 32 | FNV.fnv1_hash(lock_str, size: 32)
end

# Packs a uint64 then unpacks to int64, which we need to do to keep the
Expand Down
41 changes: 41 additions & 0 deletions lib/fnv.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
module River
# FNV is the Fowler–Noll–Vo hash function, a simple hash that's very easy to
# implement, and hash the perfect characteristics for use with the 64 bits of
# available space in a PG advisory lock.
#
# I'm implemented it myself so that the River gem can stay dependency free
# (and because it's quite easy to do).
module FNV
def self.fnv1_hash(str, size:)
hash = OFFSET_BASIS.fetch(size)
mask = MASK.fetch(size)
prime = PRIME.fetch(size)

str.each_byte do |byte|
hash *= prime
hash &= mask # take lower N bits of multiplication product
hash ^= byte
end

hash
end

MASK = {
32 => 0xffffffff, # mask 32 bits long
64 => 0xffffffffffffffff # mask 64 bits long
}.freeze
private_constant :MASK

OFFSET_BASIS = {
32 => 0x811c9dc5,
64 => 0xcbf29ce484222325
}.freeze
private_constant :OFFSET_BASIS

PRIME = {
32 => 0x01000193,
64 => 0x00000100000001B3
}.freeze
private_constant :PRIME
end
end
1 change: 1 addition & 0 deletions lib/riverqueue.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require "json"

require_relative "fnv"
require_relative "insert_opts"
require_relative "job"

Expand Down
2 changes: 0 additions & 2 deletions riverqueue.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,4 @@ Gem::Specification.new do |s|
"rubygems_mfa_required" => "true",
"source_code_uri" => "https://github.com/riverqueue/riverqueue-ruby"
}

s.add_dependency "fnv-hash"
end
9 changes: 9 additions & 0 deletions sig/fnv.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module River
module FNV
def self.fnv1_hash: (String, size: 32 | 64) -> Integer

MASK: Hash[Integer, Integer]
OFFSET_BASIS: Hash[Integer, Integer]
PRIME: Hash[Integer, Integer]
end
end
8 changes: 4 additions & 4 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def check_bigint_bounds(int)
lock_str = "unique_keykind=#{job_args.kind}" \
"&queue=#{River::QUEUE_DEFAULT}" \
"&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}"
expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, Fnv::Hash.fnv_1(lock_str, size: 64)))])
expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))])
end

it "inserts a new unique job with all options" do
Expand All @@ -245,7 +245,7 @@ def check_bigint_bounds(int)
"&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \
"&queue=#{River::QUEUE_DEFAULT}" \
"&state=#{[River::JOB_STATE_AVAILABLE].join(",")}"
expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, Fnv::Hash.fnv_1(lock_str, size: 64)))])
expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))])
end

it "inserts a new unique job with advisory lock prefix" do
Expand All @@ -265,7 +265,7 @@ def check_bigint_bounds(int)
lock_str = "unique_keykind=#{job_args.kind}" \
"&queue=#{River::QUEUE_DEFAULT}" \
"&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}"
expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, 123456 << 32 | Fnv::Hash.fnv_1(lock_str, size: 32)))])
expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, 123456 << 32 | River::FNV.fnv1_hash(lock_str, size: 32)))])

lock_key = mock_driver.advisory_lock_calls[0]
expect(lock_key >> 32).to eq(123456)
Expand Down Expand Up @@ -300,7 +300,7 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new)
"&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \
"&queue=#{River::QUEUE_DEFAULT}" \
"&state=#{[River::JOB_STATE_AVAILABLE].join(",")}"
expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, Fnv::Hash.fnv_1(lock_str, size: 64)))])
expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))])
end

it "skips unique check if unique opts empty" do
Expand Down
Loading