diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 02281105..3428cb20 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -93,7 +93,7 @@ def multi(watch: nil, &block) ::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block) end - def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block) + def with(key: nil, hashtag: nil, write: true, retry_count: 0) key = process_with_arguments(key, hashtag) node_key = @router.find_node_key_by_key(key, primary: write) @@ -101,7 +101,11 @@ def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block) # Calling #with checks out the underlying connection if this is a pooled connection # Calling it through #try_delegate ensures we handle any redirections and retry the entire # transaction if so. - @router.try_delegate(node, :with, retry_count: retry_count, &block) + @router.try_delegate(node, :with, retry_count: retry_count) do |conn| + conn.locked_to_key_slot(key) do + yield conn + end + end end def pubsub diff --git a/lib/redis_client/cluster/command.rb b/lib/redis_client/cluster/command.rb index dd7ca683..d284b6f3 100644 --- a/lib/redis_client/cluster/command.rb +++ b/lib/redis_client/cluster/command.rb @@ -22,47 +22,33 @@ class Command keyword_init: true ) - class << self - def load(nodes, slow_command_timeout: -1) - cmd = errors = nil - - nodes&.each do |node| - regular_timeout = node.read_timeout - node.read_timeout = slow_command_timeout > 0.0 ? slow_command_timeout : regular_timeout - reply = node.call('COMMAND') - node.read_timeout = regular_timeout - commands = parse_command_reply(reply) - cmd = ::RedisClient::Cluster::Command.new(commands) - break - rescue ::RedisClient::Error => e - errors ||= [] - errors << e - end - - return cmd unless cmd.nil? + def initialize + @commands = EMPTY_HASH + end - raise ::RedisClient::Cluster::InitialSetupError, errors + # n.b. this is not thread safe; it's called under a lock in Node#reload! though. + def load!(nodes, slow_command_timeout: -1) + commands = errors = nil + + nodes&.each do |node| + regular_timeout = node.read_timeout + node.read_timeout = slow_command_timeout > 0.0 ? slow_command_timeout : regular_timeout + reply = node.call('COMMAND') + node.read_timeout = regular_timeout + commands = parse_command_reply(reply) + break + rescue ::RedisClient::Error => e + errors ||= [] + errors << e end - private - - def parse_command_reply(rows) - rows&.each_with_object({}) do |row, acc| - next if row[0].nil? + raise ::RedisClient::Cluster::InitialSetupError, errors unless commands - acc[row[0].downcase] = ::RedisClient::Cluster::Command::Detail.new( - first_key_position: row[3], - last_key_position: row[4], - key_step: row[5], - write?: row[2].include?('write'), - readonly?: row[2].include?('readonly') - ) - end.freeze || EMPTY_HASH - end + @commands = commands end - def initialize(commands) - @commands = commands || EMPTY_HASH + def loaded? + @commands.any? end def extract_first_key(command) @@ -153,6 +139,20 @@ def determine_optional_key_position(command, option_name) # rubocop:disable Metr idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase) idx.nil? ? 0 : idx + 1 end + + def parse_command_reply(rows) + rows&.each_with_object({}) do |row, acc| + next if row[0].nil? + + acc[row[0].downcase] = ::RedisClient::Cluster::Command::Detail.new( + first_key_position: row[3], + last_key_position: row[4], + key_step: row[5], + write?: row[2].include?('write'), + readonly?: row[2].include?('readonly') + ) + end.freeze || EMPTY_HASH + end end end end diff --git a/lib/redis_client/cluster/node.rb b/lib/redis_client/cluster/node.rb index 66ff3936..4fac9757 100644 --- a/lib/redis_client/cluster/node.rb +++ b/lib/redis_client/cluster/node.rb @@ -8,6 +8,7 @@ require 'redis_client/cluster/node/random_replica' require 'redis_client/cluster/node/random_replica_or_primary' require 'redis_client/cluster/node/latency_replica' +require 'redis_client/cluster/pinning' class RedisClient class Cluster @@ -78,14 +79,25 @@ def []=(index, element) end end + class SingleNodeRedisClient < ::RedisClient + include Pinning::ClientMixin + end + class Config < ::RedisClient::Config - def initialize(scale_read: false, middlewares: nil, **kwargs) + def initialize(cluster_commands:, scale_read: false, middlewares: nil, **kwargs) @scale_read = scale_read + @cluster_commands = cluster_commands middlewares ||= [] + middlewares.unshift Pinning::ClientMiddleware middlewares.unshift ErrorIdentification::Middleware - super(middlewares: middlewares, **kwargs) + super( + middlewares: middlewares, + client_implementation: SingleNodeRedisClient, + **kwargs) end + attr_reader :cluster_commands + private def build_connection_prelude @@ -106,11 +118,15 @@ def initialize( @slots = build_slot_node_mappings(EMPTY_ARRAY) @replications = build_replication_mappings(EMPTY_ARRAY) klass = make_topology_class(config.use_replica?, config.replica_affinity) - @topology = klass.new(pool, @concurrent_worker, **kwargs) + @command = ::RedisClient::Cluster::Command.new + @base_connection_configuration = { **kwargs, cluster_commands: @command } + @topology = klass.new(pool, @concurrent_worker, **@base_connection_configuration) @config = config @mutex = Mutex.new end + attr_reader :command + def inspect "#<#{self.class.name} #{node_keys.join(', ')}>" end @@ -212,6 +228,13 @@ def reload! end @slots = build_slot_node_mappings(@node_info) @replications = build_replication_mappings(@node_info) + + # Call COMMAND to find out the commands available on this cluster. We only call this once + # the first time the client is constructed; if you perform a rolling update to a new version + # of Redis, for example, applications won't know about the new commands available until they + # construct new client objects (or, more likely, are restarted). + @command.load!(startup_clients, slow_command_timeout: @config.slow_command_timeout) unless @command.loaded? + @topology.process_topology_update!(@replications, @node_configs) end end @@ -404,7 +427,7 @@ def with_startup_clients(count) # rubocop:disable Metrics/AbcSize # Memoize the startup clients, so we maintain RedisClient's internal circuit breaker configuration # if it's set. @startup_clients ||= @config.startup_nodes.values.sample(count).map do |node_config| - ::RedisClient::Cluster::Node::Config.new(**node_config).new_client + ::RedisClient::Cluster::Node::Config.new(**@base_connection_configuration.merge(node_config)).new_client end yield @startup_clients ensure diff --git a/lib/redis_client/cluster/pinning.rb b/lib/redis_client/cluster/pinning.rb new file mode 100644 index 00000000..278630fb --- /dev/null +++ b/lib/redis_client/cluster/pinning.rb @@ -0,0 +1,59 @@ +# frozen_string_literal: true + +class RedisClient + class Cluster + module Pinning + module ClientMixin + attr_reader :locked_key_slot + + # This gets called when handing out connections in Cluster#with to lock the returned + # connections to a given slot. + def locked_to_key_slot(key_slot) + raise ArgumentError, 'recursive slot locking is not allowed' if @locked_key_slot + + begin + @locked_key_slot = key_slot + yield + ensure + @locked_key_slot = nil + end + end + end + + # This middleware is what actually enforces the slot locking above. + module ClientMiddleware + def initialize(client) + @client = client + super + end + + def assert_slot_valid!(command, config) # rubocop:disable Metrics/AbcSize + return unless @client.locked_key_slot + return unless config.cluster_commands.loaded? + + keys = config.cluster_commands.extract_all_keys(command) + key_slots = keys.map { |k| ::RedisClient::Cluster::KeySlotConverter.convert(k) } + locked_slot = ::RedisClient::Cluster::KeySlotConverter.convert(@client.locked_key_slot) + return if key_slots.all? { |slot| slot == locked_slot } + + key_slot_pairs = keys.zip(key_slots).map { |key, slot| "#{key} => #{slot}" }.join(', ') + raise ::RedisClient::Cluster::Transaction::ConsistencyError, <<~MESSAGE + Connection is pinned to slot #{locked_slot} (via key #{@client.locked_key_slot}). \ + However, command #{command.inspect} has keys hashing to slots #{key_slot_pairs}. \ + Transactions in redis cluster must only refer to keys hashing to the same slot. + MESSAGE + end + + def call(command, config) + assert_slot_valid!(command, config) + super + end + + def call_pipelined(command, config) + assert_slot_valid!(command, config) + super + end + end + end + end +end diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 7fa37d15..b0066b43 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -24,7 +24,6 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs) @client_kwargs = kwargs @node = ::RedisClient::Cluster::Node.new(concurrent_worker, config: config, pool: pool, **kwargs) update_cluster_info! - @command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout) @command_builder = @config.command_builder end @@ -163,12 +162,12 @@ def find_node_key_by_key(key, seed: nil, primary: false) end def find_node_key(command, seed: nil) - key = @command.extract_first_key(command) - find_node_key_by_key(key, seed: seed, primary: @command.should_send_to_primary?(command)) + key = cluster_commands.extract_first_key(command) + find_node_key_by_key(key, seed: seed, primary: cluster_commands.should_send_to_primary?(command)) end def find_primary_node_key(command) - key = @command.extract_first_key(command) + key = cluster_commands.extract_first_key(command) return nil unless key&.size&.> 0 find_node_key_by_key(key, primary: true) @@ -184,8 +183,12 @@ def find_node(node_key, retry_count: 3) retry end + def cluster_commands + @node.command + end + def command_exists?(name) - @command.exists?(name) + cluster_commands.exists?(name) end def assign_redirection_node(err_msg) diff --git a/test/redis_client/cluster/node/test_latency_replica.rb b/test/redis_client/cluster/node/test_latency_replica.rb index bff4f55d..380373fd 100644 --- a/test/redis_client/cluster/node/test_latency_replica.rb +++ b/test/redis_client/cluster/node/test_latency_replica.rb @@ -12,21 +12,21 @@ class TestLatencyReplica < TestingWrapper def test_clients_with_redis_client got = @test_node.clients - got.each { |client| assert_instance_of(::RedisClient, client) } + got.each { |client| assert_kind_of(::RedisClient, client) } assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_clients_with_pooled_redis_client test_node = make_node(pool: { timeout: 3, size: 2 }) got = test_node.clients - got.each { |client| assert_instance_of(::RedisClient::Pooled, client) } + got.each { |client| assert_kind_of(::RedisClient::Pooled, client) } assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_primary_clients got = @test_node.primary_clients got.each do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -34,14 +34,14 @@ def test_primary_clients def test_replica_clients got = @test_node.replica_clients got.each do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning got = @test_node.clients_for_scanning - got.each { |client| assert_instance_of(::RedisClient, client) } + got.each { |client| assert_kind_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end diff --git a/test/redis_client/cluster/node/test_primary_only.rb b/test/redis_client/cluster/node/test_primary_only.rb index 27188319..983c99c3 100644 --- a/test/redis_client/cluster/node/test_primary_only.rb +++ b/test/redis_client/cluster/node/test_primary_only.rb @@ -13,7 +13,7 @@ class TestPrimaryOnly < TestingWrapper def test_clients_with_redis_client got = @test_node.clients got.each do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -22,7 +22,7 @@ def test_clients_with_pooled_redis_client test_node = make_node(pool: { timeout: 3, size: 2 }) got = test_node.clients got.each do |client| - assert_instance_of(::RedisClient::Pooled, client) + assert_kind_of(::RedisClient::Pooled, client) assert_equal('master', client.call('ROLE').first) end end @@ -30,7 +30,7 @@ def test_clients_with_pooled_redis_client def test_primary_clients got = @test_node.primary_clients got.each do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -38,7 +38,7 @@ def test_primary_clients def test_replica_clients got = @test_node.replica_clients got.each do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -46,7 +46,7 @@ def test_replica_clients def test_clients_for_scanning got = @test_node.clients_for_scanning got.each do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end diff --git a/test/redis_client/cluster/node/test_random_replica.rb b/test/redis_client/cluster/node/test_random_replica.rb index 73bc226f..faef689e 100644 --- a/test/redis_client/cluster/node/test_random_replica.rb +++ b/test/redis_client/cluster/node/test_random_replica.rb @@ -12,21 +12,21 @@ class TestRandomReplica < TestingWrapper def test_clients_with_redis_client got = @test_node.clients - got.each { |client| assert_instance_of(::RedisClient, client) } + got.each { |client| assert_kind_of(::RedisClient, client) } assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_clients_with_pooled_redis_client test_node = make_node(pool: { timeout: 3, size: 2 }) got = test_node.clients - got.each { |client| assert_instance_of(::RedisClient::Pooled, client) } + got.each { |client| assert_kind_of(::RedisClient::Pooled, client) } assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_primary_clients got = @test_node.primary_clients got.each do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -34,14 +34,14 @@ def test_primary_clients def test_replica_clients got = @test_node.replica_clients got.each do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning got = @test_node.clients_for_scanning - got.each { |client| assert_instance_of(::RedisClient, client) } + got.each { |client| assert_kind_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end diff --git a/test/redis_client/cluster/node/test_random_replica_or_primary.rb b/test/redis_client/cluster/node/test_random_replica_or_primary.rb index ea0ad2b7..d3151970 100644 --- a/test/redis_client/cluster/node/test_random_replica_or_primary.rb +++ b/test/redis_client/cluster/node/test_random_replica_or_primary.rb @@ -12,21 +12,21 @@ class TestRandomReplicaWithPrimary < TestingWrapper def test_clients_with_redis_client got = @test_node.clients - got.each { |client| assert_instance_of(::RedisClient, client) } + got.each { |client| assert_kind_of(::RedisClient, client) } assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_clients_with_pooled_redis_client test_node = make_node(pool: { timeout: 3, size: 2 }) got = test_node.clients - got.each { |client| assert_instance_of(::RedisClient::Pooled, client) } + got.each { |client| assert_kind_of(::RedisClient::Pooled, client) } assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_primary_clients got = @test_node.primary_clients got.each do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end @@ -34,14 +34,14 @@ def test_primary_clients def test_replica_clients got = @test_node.replica_clients got.each do |client| - assert_instance_of(::RedisClient, client) + assert_kind_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning got = @test_node.clients_for_scanning - got.each { |client| assert_instance_of(::RedisClient, client) } + got.each { |client| assert_kind_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end diff --git a/test/redis_client/cluster/test_command.rb b/test/redis_client/cluster/test_command.rb index 34a992ad..b8823e29 100644 --- a/test/redis_client/cluster/test_command.rb +++ b/test/redis_client/cluster/test_command.rb @@ -22,7 +22,7 @@ def test_load { nodes: nil, error: ::RedisClient::Cluster::InitialSetupError } ].each_with_index do |c, idx| msg = "Case: #{idx}" - got = -> { ::RedisClient::Cluster::Command.load(c[:nodes]) } + got = -> { ::RedisClient::Cluster::Command.new.tap { |cmd| cmd.load!(c[:nodes]) } } if c[:error].nil? assert_instance_of(::RedisClient::Cluster::Command, got.call, msg) else @@ -40,7 +40,7 @@ def call(...) super end end) - ::RedisClient::Cluster::Command.load(nodes, slow_command_timeout: 9) + ::RedisClient::Cluster::Command.new.tap { |c| c.load!(nodes, slow_command_timeout: 9) } assert_equal(9, nodes.first.instance_variable_get(:@slow_timeout)) assert_equal(TEST_TIMEOUT_SEC, nodes.first.read_timeout) end @@ -70,7 +70,7 @@ def test_parse_command_reply { rows: nil, want: {} } ].each_with_index do |c, idx| msg = "Case: #{idx}" - got = ::RedisClient::Cluster::Command.send(:parse_command_reply, c[:rows]) + got = ::RedisClient::Cluster::Command.new.send(:parse_command_reply, c[:rows]) assert_equal(c[:want].size, got.size, msg) assert_equal(c[:want].keys.sort, got.keys.sort, msg) c[:want].each do |k, v| @@ -80,7 +80,7 @@ def test_parse_command_reply end def test_extract_first_key - cmd = ::RedisClient::Cluster::Command.load(@raw_clients) + cmd = ::RedisClient::Cluster::Command.new.tap { |c| c.load!(@raw_clients) } [ { command: %w[SET foo 1], want: 'foo' }, { command: %w[GET foo], want: 'foo' }, @@ -99,7 +99,7 @@ def test_extract_first_key end def test_should_send_to_primary? - cmd = ::RedisClient::Cluster::Command.load(@raw_clients) + cmd = ::RedisClient::Cluster::Command.new.tap { |c| c.load!(@raw_clients) } [ { command: %w[SET foo 1], want: true }, { command: %w[GET foo], want: false }, @@ -114,7 +114,7 @@ def test_should_send_to_primary? end def test_should_send_to_replica? - cmd = ::RedisClient::Cluster::Command.load(@raw_clients) + cmd = ::RedisClient::Cluster::Command.new.tap { |c| c.load!(@raw_clients) } [ { command: %w[SET foo 1], want: false }, { command: %w[GET foo], want: true }, @@ -129,7 +129,7 @@ def test_should_send_to_replica? end def test_exists? - cmd = ::RedisClient::Cluster::Command.load(@raw_clients) + cmd = ::RedisClient::Cluster::Command.new.tap { |c| c.load!(@raw_clients) } [ { name: 'ping', want: true }, { name: :ping, want: true }, @@ -148,7 +148,7 @@ def test_exists? end def test_determine_first_key_position - cmd = ::RedisClient::Cluster::Command.load(@raw_clients) + cmd = ::RedisClient::Cluster::Command.new.tap { |c| c.load!(@raw_clients) } [ { command: %w[EVAL "return ARGV[1]" 0 hello], want: 3 }, { command: [['EVAL'], '"return ARGV[1]"', 0, 'hello'], want: 3 }, @@ -174,7 +174,7 @@ def test_determine_first_key_position end def test_determine_optional_key_position - cmd = ::RedisClient::Cluster::Command.load(@raw_clients) + cmd = ::RedisClient::Cluster::Command.new.tap { |c| c.load!(@raw_clients) } [ { params: { command: %w[XREAD COUNT 2 STREAMS mystream writers 0-0 0-0], option_name: 'streams' }, want: 4 }, { params: { command: %w[XREADGROUP GROUP group consumer STREAMS key id], option_name: 'streams' }, want: 5 }, @@ -192,7 +192,7 @@ def test_determine_optional_key_position end def test_extract_all_keys - cmd = ::RedisClient::Cluster::Command.load(@raw_clients) + cmd = ::RedisClient::Cluster::Command.new.tap { |c| c.load!(@raw_clients) } [ { command: ['EVAL', 'return ARGV[1]', '0', 'hello'], want: [] }, { command: ['EVAL', 'return ARGV[1]', '3', 'key1', 'key2', 'key3', 'arg1', 'arg2'], want: %w[key1 key2 key3] }, diff --git a/test/redis_client/cluster/test_node.rb b/test/redis_client/cluster/test_node.rb index dc50188a..b67de5fb 100644 --- a/test/redis_client/cluster/test_node.rb +++ b/test/redis_client/cluster/test_node.rb @@ -5,21 +5,6 @@ class RedisClient class Cluster - class Node - class TestConfig < TestingWrapper - def test_connection_prelude - [ - { params: { scale_read: true }, want: [%w[HELLO 3], %w[READONLY]] }, - { params: { scale_read: false }, want: [%w[HELLO 3]] }, - { params: {}, want: [%w[HELLO 3]] } - ].each_with_index do |c, idx| - got = ::RedisClient::Cluster::Node::Config.new(**c[:params]).connection_prelude - assert_equal(c[:want], got, "Case: #{idx}") - end - end - end - end - # rubocop:disable Metrics/ClassLength class TestNode < TestingWrapper def setup @@ -47,6 +32,19 @@ def make_node(capture_buffer: CommandCaptureMiddleware::CommandBuffer.new, pool: end end + def test_connection_prelude + [ + { params: { scale_read: true }, want: [%w[HELLO 3], %w[READONLY]] }, + { params: { scale_read: false }, want: [%w[HELLO 3]] }, + { params: {}, want: [%w[HELLO 3]] } + ].each_with_index do |c, idx| + make_node(**c[:params]).clients.each do |conn| + got = conn.config.connection_prelude + assert_equal(c[:want], got, "Case: #{idx}") + end + end + end + def test_parse_cluster_node_reply_continuous_slots info = <<~INFO 07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1:30004@31004 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238317239 4 connected @@ -297,14 +295,14 @@ def test_find_by msg = "Case: primary only: #{info.node_key}" got = -> { @test_node.find_by(info.node_key) } if info.primary? - assert_instance_of(::RedisClient, got.call, msg) + assert_kind_of(::RedisClient, got.call, msg) else assert_raises(::RedisClient::Cluster::Node::ReloadNeeded, msg, &got) end msg = "Case: scale read: #{info.node_key}" got = @test_node_with_scale_read.find_by(info.node_key) - assert_instance_of(::RedisClient, got, msg) + assert_kind_of(::RedisClient, got, msg) end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 433a9af6..17fe7c9d 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -533,8 +533,6 @@ def test_pinning_two_keys end def test_pinning_cross_slot - skip 'This is not implemented yet!' - assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do @client.with(hashtag: 'slot1') do |conn| conn.call('GET', '{slot2}')