diff --git a/lib/Redis.pm b/lib/Redis.pm index 17cce86..3f06ab0 100644 --- a/lib/Redis.pm +++ b/lib/Redis.pm @@ -51,6 +51,19 @@ our $VERSION = '1.926'; $redis->sort('list', 'DESC'); $redis->sort(qw{list LIMIT 0 5 ALPHA DESC}); + ## Add a coderef argument to run a command in the background + $redis->sort(qw{list LIMIT 0 5 ALPHA DESC}, sub { + my ($reply, $error) = @_; + die "Oops, got an error: $error\n" if defined $error; + print "$_\n" for @$reply; + }); + long_computation(); + $redis->wait_all_responses; + + ## Or run a large batch of commands in a pipeline + $redis->hset('h', $_, $hash{$_}, sub {}) for keys %hash; + $redis->wait_all_responses; + ## Publish/Subscribe $redis->subscribe( 'topic_1', @@ -205,55 +218,88 @@ sub DESTROY { } our $AUTOLOAD; sub AUTOLOAD { - my $self = shift; - my $command = $AUTOLOAD; $command =~ s/.*://; + + my $method = sub { shift->__std_cmd($command, @_) }; + + # Save this method for future calls + no strict 'refs'; + *$AUTOLOAD = $method; + + goto $method; +} + +sub __std_cmd { + my $self = shift; + my $command = shift; + $self->__is_valid_command($command); - ## Fast path, no reconnect - return $self->__run_cmd($command, @_) unless $self->{reconnect}; + my $ret; + my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef; + + # If this is an EXEC command, in pipelined mode, and one of the commands + # executed in the transaction yields an error, we must collect all errors + # from that command, rather than throwing an exception immediately. + my $collect_errors = $cb && uc($command) eq 'EXEC'; + + ## Fast path, no reconnect; + return $self->__run_cmd($command, $collect_errors, undef, $cb, @_) + unless $self->{reconnect}; my @cmd_args = @_; - return try { - $self->__run_cmd($command, @cmd_args); - } - catch { + $self->__with_reconnect(sub { + $self->__run_cmd($command, $collect_errors, undef, $cb, @cmd_args); + }); +} + +sub __with_reconnect { + my ($self, $cb) = @_; + + ## Fast path, no reconnect + return $cb->() unless $self->{reconnect}; + + return &try($cb, catch { die $_ unless ref($_) eq 'Redis::X::Reconnect'; $self->__connect; - $self->__run_cmd($command, @cmd_args); - }; + $cb->(); + }); } sub __run_cmd { - my $self = shift; - my $command = shift; - my $sock = $self->{sock} || $self->__try_reconnect('Not connected to any server'); - my $enc = $self->{encoding}; - my $deb = $self->{debug}; + my ($self, $command, $collect_errors, $custom_decode, $cb, @args) = @_; + + my $ret; + my $wrapper = $cb && $custom_decode ? sub { + my ($reply, $error) = @_; + $cb->(scalar $custom_decode->($reply), $error); + } : $cb || sub { + my ($reply, $error) = @_; + confess "[$command] $error, " if defined $error; + $ret = $reply; + }; - ## PubSub commands use a different answer handling - if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) { - $pr = '' unless $pr; + $self->__send_command($command, @args); + push @{ $self->{queue} }, [$command, $wrapper, $collect_errors]; - my $cb = pop; - confess("Missing required callback in call to $command(), ") - unless ref($cb) eq 'CODE'; + return 1 if $cb; - my @subs = @_; - @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs) - if $unsub; - return unless @subs; + $self->wait_all_responses; + return $custom_decode ? $custom_decode->($ret, !wantarray) + : wantarray && ref $ret eq 'ARRAY' ? @$ret : $ret; +} - $self->__send_command($command, @subs); +sub wait_all_responses { + my ($self) = @_; - my %cbs = map { ("${pr}message:$_" => $cb) } @subs; - return $self->__process_subscription_changes($command, \%cbs); + for my $handler (splice @{ $self->{queue} }) { + my ($command, $cb, $collect_errors) = @$handler; + $cb->($self->__read_response($command, $collect_errors)); } - $self->__send_command($command, @_); - return $self->__read_response($command); + return; } @@ -262,6 +308,10 @@ sub quit { my ($self) = @_; return unless $self->{sock}; + confess "[quit] only works in synchronous mode, " + if @_ && ref $_[-1] eq 'CODE'; + + $self->wait_all_responses; $self->__send_command('QUIT'); close(delete $self->{sock}) || confess("Can't close socket: $!"); @@ -270,8 +320,14 @@ sub quit { sub shutdown { my ($self) = @_; + $self->__is_valid_command('SHUTDOWN'); + + confess "[shutdown] only works in synchronous mode, " + if @_ && ref $_[-1] eq 'CODE'; + return unless $self->{sock}; + $self->wait_all_responses; $self->__send_command('SHUTDOWN'); close(delete $self->{sock}) || confess("Can't close socket: $!"); @@ -279,46 +335,69 @@ sub shutdown { } sub ping { - my ($self) = @_; + my $self = shift; + $self->__is_valid_command('PING'); + + confess "[ping] only works in synchronous mode, " + if @_ && ref $_[-1] eq 'CODE'; + return unless exists $self->{sock}; - my $reply; - eval { - $self->__send_command('PING'); - $reply = $self->__read_response('PING'); - }; - if ($@) { + $self->wait_all_responses; + return scalar try { + $self->__std_cmd('PING'); + } + catch { close(delete $self->{sock}); return; - } - - return $reply; + }; } sub info { - my ($self) = @_; + my $self = shift; $self->__is_valid_command('INFO'); - $self->__send_command('INFO'); + my $custom_decode = sub { + my ($reply) = @_; + return $reply if !defined $reply || ref $reply; + return { map { split(/:/, $_, 2) } split(/\r\n/, $reply) }; + }; + + my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef; - my $info = $self->__read_response('INFO'); + ## Fast path, no reconnect + return $self->__run_cmd('INFO', 0, $custom_decode, $cb, @_) + unless $self->{reconnect}; - return {map { split(/:/, $_, 2) } split(/\r\n/, $info)}; + my @cmd_args = @_; + $self->__with_reconnect(sub { + $self->__run_cmd('INFO', 0, $custom_decode, $cb, @cmd_args); + }); } sub keys { my $self = shift; $self->__is_valid_command('KEYS'); - $self->__send_command('KEYS', @_); + my $custom_decode = sub { + my ($reply, $synchronous_scalar) = @_; - my @keys = $self->__read_response('KEYS', \my $type); - ## Support redis > 1.26 - return @keys if $type eq '*'; + ## Support redis <= 1.2.6 + $reply = [split(/\s/, $reply)] if defined $reply && !ref $reply; - ## Support redis <= 1.2.6 - return split(/\s/, $keys[0]) if $keys[0]; - return; + return ref $reply && ($synchronous_scalar || wantarray) ? @$reply : $reply; + }; + + my $cb = @_ && ref $_[-1] eq 'CODE' ? pop : undef; + + ## Fast path, no reconnect + return $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @_) + unless $self->{reconnect}; + + my @cmd_args = @_; + $self->__with_reconnect(sub { + $self->__run_cmd('KEYS', 0, $custom_decode, $cb, @cmd_args); + }); } @@ -333,8 +412,9 @@ sub wait_for_messages { my $count = 0; while ($s->can_read($timeout)) { while (__try_read_sock($sock)) { - my @m = $self->__read_response('WAIT_FOR_MESSAGES'); - $self->__process_pubsub_msg(\@m); + my ($reply, $error) = $self->__read_response('WAIT_FOR_MESSAGES'); + confess "[WAIT_FOR_MESSAGES] $error, " if defined $error; + $self->__process_pubsub_msg($reply); $count++; } } @@ -342,6 +422,39 @@ sub wait_for_messages { return $count; } +sub __subscription_cmd { + my $self = shift; + my $pr = shift; + my $unsub = shift; + my $command = shift; + my $cb = pop; + + confess("Missing required callback in call to $command(), ") + unless ref($cb) eq 'CODE'; + + $self->wait_all_responses; + + my @subs = @_; + $self->__with_reconnect(sub { + $self->__throw_reconnect('Not connected to any server') + unless $self->{sock}; + + @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs) + if $unsub; + return unless @subs; + + $self->__send_command($command, @subs); + + my %cbs = map { ("${pr}message:$_" => $cb) } @subs; + return $self->__process_subscription_changes($command, \%cbs); + }); +} + +sub subscribe { shift->__subscription_cmd('', 0, subscribe => @_) } +sub psubscribe { shift->__subscription_cmd('p', 0, psubscribe => @_) } +sub unsubscribe { shift->__subscription_cmd('', 1, unsubscribe => @_) } +sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) } + sub __process_unsubscribe_requests { my ($self, $cb, $pr, @unsubs) = @_; my $subs = $self->{subscribers}; @@ -364,21 +477,22 @@ sub __process_subscription_changes { my $subs = $self->{subscribers}; while (%$expected) { - my @m = $self->__read_response($cmd); + my ($m, $error) = $self->__read_response($cmd); + confess "[$cmd] $error, " if defined $error; ## Deal with pending PUBLISH'ed messages - if ($m[0] =~ /^p?message$/) { - $self->__process_pubsub_msg(\@m); + if ($m->[0] =~ /^p?message$/) { + $self->__process_pubsub_msg($m); next; } - my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/; - $key .= "message:$m[1]"; + my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/; + $key .= "message:$m->[1]"; my $cb = delete $expected->{$key}; push @{$subs->{$key}}, $cb unless $unsub; - $self->{is_subscriber} = $m[2]; + $self->{is_subscriber} = $m->[2]; } } @@ -407,9 +521,8 @@ sub __process_pubsub_msg { sub __is_valid_command { my ($self, $cmd) = @_; - return unless $self->{is_subscriber}; - return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i; - confess("Cannot use command '$cmd' while in SUBSCRIBE mode, "); + confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ") + if $self->{is_subscriber}; } @@ -418,6 +531,11 @@ sub __connect { my ($self) = @_; delete $self->{sock}; + # Suppose we have at least one command response pending, but we're about + # to reconnect. The new connection will never get a response to any of + # the pending commands, so delete all those pending responses now. + $self->{queue} = []; + ## Fast path, no reconnect return $self->__build_sock() unless $self->{reconnect}; @@ -453,7 +571,7 @@ sub __send_command { my $deb = $self->{debug}; my $sock = $self->{sock} - || $self->__try_reconnect('Not connected to any server'); + || $self->__throw_reconnect('Not connected to any server'); warn "[SEND] $cmd ", Dumper([@_]) if $deb; @@ -467,14 +585,14 @@ sub __send_command { ## Check to see if socket was closed: reconnect on EOF my $status = __try_read_sock($sock); - $self->__try_reconnect('Not connected to any server') + $self->__throw_reconnect('Not connected to any server') if defined $status && $status == 0; ## Send command, take care for partial writes warn "[SEND RAW] $buf" if $deb; while ($buf) { my $len = syswrite $sock, $buf, length $buf; - $self->__try_reconnect("Could not write to Redis server: $!") + $self->__throw_reconnect("Could not write to Redis server: $!") unless $len; substr $buf, 0, $len, ""; } @@ -483,55 +601,50 @@ sub __send_command { } sub __read_response { - my ($self, $cmd) = @_; + my ($self, $cmd, $collect_errors) = @_; confess("Not connected to any server") unless $self->{sock}; local $/ = "\r\n"; ## no debug => fast path - return __read_response_r(@_) unless $self->{debug}; + return $self->__read_response_r($cmd, $collect_errors) unless $self->{debug}; - if (wantarray) { - my @r = __read_response_r(@_); - warn "[RECV] $cmd ", Dumper(\@r); - return @r; - } - else { - my $r = __read_response_r(@_); - warn "[RECV] $cmd ", Dumper($r); - return $r; - } + my ($result, $error) = $self->__read_response_r($cmd, $collect_errors); + warn "[RECV] $cmd ", Dumper($result, $error) if $self->{debug}; + return $result, $error; } sub __read_response_r { - my ($self, $command, $type_r) = @_; + my ($self, $command, $collect_errors) = @_; my ($type, $result) = $self->__read_line; - $$type_r = $type if $type_r; if ($type eq '-') { - confess "[$command] $result, "; + return undef, $result; } - elsif ($type eq '+') { - return $result; + elsif ($type eq '+' || $type eq ':') { + return $result, undef; } elsif ($type eq '$') { - return if $result < 0; - return $self->__read_len($result + 2); + return undef, undef if $result < 0; + return $self->__read_len($result + 2), undef; } elsif ($type eq '*') { - return if $result < 0; + return undef, undef if $result < 0; my @list; while ($result--) { - push @list, scalar($self->__read_response_r($command)); + my @nested = $self->__read_response_r($command, $collect_errors); + if ($collect_errors) { + push @list, \@nested; + } + else { + confess "[$command] $nested[1], " if defined $nested[1]; + push @list, $nested[0]; + } } - return @list if wantarray; - return \@list; - } - elsif ($type eq ':') { - return $result; + return \@list, undef; } else { confess "unknown answer type: $type ($result), "; @@ -632,7 +745,7 @@ BEGIN { ########################## # I take exception to that -sub __try_reconnect { +sub __throw_reconnect { my ($self, $m) = @_; die bless(\$m, 'Redis::X::Reconnect') if $self->{reconnect}; die $m; @@ -643,16 +756,27 @@ sub __try_reconnect { __END__ +=head1 Pipeline management + +=head2 wait_all_responses + +Waits until all pending pipelined responses have been received, and invokes +the pipeline callback for each one. See L. + =head1 Connection Handling =head2 quit $r->quit; +The C method does not support pipelined operation. + =head2 ping $r->ping || die "no server?"; +The C method does not support pipelined operation. + =head1 Commands operating on string values =head2 set @@ -698,6 +822,13 @@ __END__ =head2 keys my @keys = $r->keys( '*glob_pattern*' ); + my $keys = $r->keys( '*glob_pattern*' ); # count of matching keys + +Note that synchronous C calls in a scalar context return the number of +matching keys (not an array ref of matching keys as you might expect). This +does not apply in pipelined mode: assuming the server returns a list of +keys, as expected, it is always passed to the pipeline callback as an array +ref. =head2 randomkey @@ -860,12 +991,85 @@ See also L for tie interface. $r->shutdown; +The C method does not support pipelined operation. + =head1 Remote server control commands =head2 info my $info_hash = $r->info; +The C method is unique in that it decodes the server's response into a +hashref, if possible. This decoding happens in both synchronous and +pipelined modes. + +=head1 Transaction-handling commands + +=head2 multi + + $r->multi; + +=head2 discard + + $r->discard; + +=head2 exec + + my @individual_replies = $r->exec; + +C has special behaviour when run in a pipeline: the C<$reply> argument +to the pipeline callback is an array ref whose elements are themselves +C<[$reply, $error]> pairs. This means that you can accurately detect errors +yielded by any command in the transaction, and without any exceptions being +thrown. + + +=head1 PIPELINING + +Usually, running a command will wait for a response. However, if you're +doing large numbers of requests, it can be more efficient to use what Redis +calls I: send multiple commands to Redis without waiting for a +response, then wait for the responses that come in. + +To use pipelining, add a coderef argument as the last argument to a command +method call: + + $r->set('foo', 'bar', sub {}); + +Pending responses to pipelined commands are processed in a single batch, as +soon as at least one of the following conditions holds: + +=over 4 + +=item * + +A non-pipelined (synchronous) command has been sent on the same connection + +=item * + +A pub/sub subscription command (one of C, C, +C, or C) is about to be sent on the same +connection. + +=item * + +The L method is called explicitly. + +=back + +The coderef you supply to a pipelined command method is invoked once the +response is available. It takes two arguments, C<$reply> and C<$error>. If +C<$error> is defined, it contains the text of an error reply sent by the +Redis server. Otherwise, C<$reply> is the non-error reply. For almost all +commands, that means it's C, or a defined but non-reference scalar, +or an array ref of any of those; but see L, L, and L. + +Note the contrast with synchronous commands, which throw an exception on +receipt of an error reply, or return a non-error reply directly. + +The fact that pipelined commands never throw an exception can be +particularly useful for Redis transactions; see L. + =head1 ENCODING diff --git a/t/01-basic.t b/t/01-basic.t index a8b0712..8076fb7 100755 --- a/t/01-basic.t +++ b/t/01-basic.t @@ -92,6 +92,10 @@ ok($@, 'rename to existing key'); ok(my $nr_keys = $o->dbsize, 'dbsize'); +throws_ok sub { $o->lpush('foo', 'bar') }, + qr/\[lpush\] ERR Operation against a key holding the wrong kind of value,/, + 'Error responses throw exception'; + ## Commands operating on lists diff --git a/t/02-responses.t b/t/02-responses.t old mode 100644 new mode 100755 index 0b73255..3049f59 --- a/t/02-responses.t +++ b/t/02-responses.t @@ -22,67 +22,75 @@ sub r { ## -ERR responses r('-you must die!!'); -throws_ok sub { $r->__read_response('cmd') }, qr/\[cmd\] you must die!!/, - 'Error response must throw exception'; +is_deeply([$r->__read_response('cmd')], [undef, 'you must die!!'], + 'Error response detected'); ## +TEXT responses my $m; r('+all your text are belong to us'); -lives_ok sub { $m = $r->__read_response('cmd') }, 'Text response ok'; -is($m, 'all your text are belong to us', '... with the expected message'); +is_deeply([$r->__read_response('cmd')], + ['all your text are belong to us', undef], + 'Text response ok'); ## :NUMBER responses r(':234'); -lives_ok sub { $m = $r->__read_response('cmd') }, 'Integer response ok'; -is($m, 234, '... with the expected value'); +is_deeply([$r->__read_response('cmd')], [234, undef], + 'Integer response ok'); ## $SIZE PAYLOAD responses r('$19', "Redis\r\nis\r\ngreat!\r\n"); -lives_ok sub { $m = $r->__read_response('cmd') }, 'Size+payload response ok'; -is($m, "Redis\r\nis\r\ngreat!\r\n", '... with the expected message'); +is_deeply([$r->__read_response('cmd')], ["Redis\r\nis\r\ngreat!\r\n", undef], + 'Size+payload response ok'); r('$0', ""); -lives_ok sub { $m = $r->__read_response('cmd') }, - 'Zero-size+payload response ok'; -is($m, "", '... with the expected message'); +is_deeply([$r->__read_response('cmd')], ['', undef], + 'Zero-size+payload response ok'); r('$-1'); -lives_ok sub { $m = $r->__read_response('cmd') }, - 'Negative-size+payload response ok'; -ok(!defined($m), '... with the expected undefined message'); +is_deeply([$r->__read_response('cmd')], [undef, undef], + 'Negative-size+payload response ok'); ## Multi-bulk responses my @m; r('*4', '$5', 'Redis', ':42', '$-1', '+Cool stuff'); -lives_ok sub { @m = $r->__read_response('cmd') }, - 'Simple multi-bulk response ok'; -cmp_deeply( - \@m, - ['Redis', 42, undef, 'Cool stuff'], - '... with the expected list of values' -); +cmp_deeply([$r->__read_response('cmd')], + [['Redis', 42, undef, 'Cool stuff'], undef], + 'Simple multi-bulk response ok'); ## Nested Multi-bulk responses r('*5', '$5', 'Redis', ':42', '*4', ':1', ':2', '$4', 'hope', '*2', ':4', ':5', '$-1', '+Cool stuff'); -lives_ok sub { @m = $r->__read_response('cmd') }, - 'Nested multi-bulk response ok'; cmp_deeply( - \@m, - ['Redis', 42, [1, 2, 'hope', [4, 5]], undef, 'Cool stuff'], - '... with the expected list of values' + [$r->__read_response('cmd')], + [['Redis', 42, [1, 2, 'hope', [4, 5]], undef, 'Cool stuff'], undef], + 'Nested multi-bulk response ok' ); ## Nil multi-bulk responses r('*-1'); -lives_ok sub { $m = $r->__read_response('blpop') }, - 'Read a NIL multi-bulk response'; -is($m, undef, '... with the expected "undef" value'); +is_deeply([$r->__read_response('cmd')], [undef, undef], + 'Read a NIL multi-bulk response'); + + +## Multi-bulk responses with nested error +r('*3', '$5', 'Redis', '-you must die!!', ':42'); +throws_ok sub { $r->__read_response('cmd') }, + qr/\[cmd\] you must die!!/, + 'Nested errors must usually throw exceptions'; + +r('*3', '$5', 'Redis', '-you must die!!', ':42'); +is_deeply([$r->__read_response('cmd', 1)], [ + [['Redis', undef], + [undef, 'you must die!!'], + [42, undef]], + undef, +], 'Nested errors must be collected in collect-errors mode'); + done_testing(); diff --git a/t/03-pubsub.t b/t/03-pubsub.t old mode 100644 new mode 100755 index 0b3ce51..7d5874d --- a/t/03-pubsub.t +++ b/t/03-pubsub.t @@ -25,11 +25,16 @@ ok( is($pub->publish('aa', 'v1'), 0, "No subscribers to 'aa' topic"); +my $db_size = -1; +$sub->dbsize(sub { $db_size = $_[0] }); + ## Basic pubsub my $sub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" }; $sub->subscribe('aa', 'bb', $sub_cb); is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'"); +is($db_size, 0, 'subscribing processes pending queued commands'); + is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); cmp_deeply(\%got, {'aa' => 'v1:aa'}, "... for the expected topic, 'aa'"); @@ -116,9 +121,11 @@ is($sub->wait_for_messages(1), 0, '... yep, no messages delivered'); cmp_deeply(\%got, {}, '... and an empty messages recorded set'); is($sub->is_subscriber, 1, 'Still some pending subcriptions active'); -throws_ok sub { $sub->info }, - qr/Cannot use command 'INFO' while in SUBSCRIBE mode/, - '... still an error to try commands in subscribe mode'; +for my $cmd (qw) { + throws_ok sub { $sub->$cmd }, + qr/Cannot use command '(?i:$cmd)' while in SUBSCRIBE mode/, + ".. still an error to try \U$cmd\E while in SUBSCRIBE mode"; +} $sub->punsubscribe('c*', $psub_cb); is($sub->is_subscriber, 0, '... but none anymore'); diff --git a/t/04-pipeline.t b/t/04-pipeline.t new file mode 100755 index 0000000..70c1d00 --- /dev/null +++ b/t/04-pipeline.t @@ -0,0 +1,100 @@ +#!perl + +use warnings; +use strict; +use Test::More; +use Redis; +use lib 't/tlib'; +use Test::SpawnRedisServer; +use Test::Exception; + +my ($c, $srv) = redis(); +END { $c->() if $c } + +ok(my $r = Redis->new(server => $srv), 'connected to our test redis-server'); + +sub pipeline_ok { + my ($desc, @commands) = @_; + my (@responses, @expected_responses); + for my $cmd (@commands) { + my ($method, $args, $expected, $expected_err) = @$cmd; + push @expected_responses, [$expected, $expected_err]; + $r->$method(@$args, sub { push @responses, [@_] }); + } + $r->wait_all_responses; + + # An expected response consisting of a hashref means that any non-empty + # hashref should be accepted. But reimplementing is_deeply() sounds like + # a pain, so fake it: + for my $i (0 .. $#expected_responses) { + $expected_responses[$i] = $responses[$i] + if ref $expected_responses[$i][0] eq 'HASH' + && ref $responses[$i][0] eq 'HASH' + && keys %{ $responses[$i][0] }; + } + + is_deeply(\@responses, \@expected_responses, $desc); +} + +pipeline_ok 'single-command pipeline', ( + [set => [foo => 'bar'], 'OK'], +); + +pipeline_ok 'pipeline with embedded error', ( + [set => [clunk => 'eth'], 'OK'], + [oops => [], undef, q[ERR unknown command 'OOPS']], + [get => ['clunk'], 'eth'], +); + +pipeline_ok 'keys in pipelined mode', ( + [keys => ['*'], [qw]], + [keys => [], undef, q[ERR wrong number of arguments for 'keys' command]], +); + +pipeline_ok 'info in pipelined mode', ( + [info => [], {}], # any non-empty hashref + [info => ['oops'], undef, q[ERR wrong number of arguments for 'info' command]], +); + +pipeline_ok 'pipeline with multi-bulk reply', ( + [hmset => [kapow => (a => 1, b => 2, c => 3)], 'OK'], + [hmget => [kapow => qw], [3, 2, 1]], +); + +pipeline_ok 'large pipeline', ( + (map { [hset => [zzapp => $_ => -$_], 1] } 1 .. 5000), + [hmget => [zzapp => (1 .. 5000)], [reverse -5000 .. -1]], + [del => ['zzapp'], 1], +); + +subtest 'synchronous request with pending pipeline' => sub { + my $clunk; + is($r->get('clunk', sub { $clunk = $_[0] }), 1, 'queue a request'); + is($r->set('kapow', 'zzapp', sub {}), 1, 'queue another request'); + is($r->get('kapow'), 'zzapp', 'synchronous request has expected return'); + is($clunk, 'eth', 'synchronous request processes pending ones'); +}; + +pipeline_ok 'transaction', ( + [multi => [], 'OK'], + [set => ['clunk' => 'eth'], 'QUEUED'], + [rpush => ['clunk' => 'oops'], 'QUEUED'], + [get => ['clunk'], 'QUEUED'], + [exec => [], [ + ['OK', undef], + [undef, 'ERR Operation against a key holding the wrong kind of value'], + ['eth', undef], + ]], +); + +subtest 'transaction with error and no pipeline' => sub { + is($r->multi, 'OK', 'multi'); + is($r->set('clunk', 'eth'), 'QUEUED', 'transactional SET'); + is($r->rpush('clunk', 'oops'), 'QUEUED', 'transactional bad RPUSH'); + is($r->get('clunk'), 'QUEUED', 'transactional GET'); + throws_ok sub { $r->exec }, + qr/\[exec\] ERR Operation against a key holding the wrong kind of value,/, + 'synchronous EXEC dies for intervening error'; +}; + +done_testing(); diff --git a/t/07-reconnect.t b/t/07-reconnect.t old mode 100644 new mode 100755 index 87fa990..90b45d7 --- a/t/07-reconnect.t +++ b/t/07-reconnect.t @@ -42,6 +42,45 @@ subtest 'Command without connection or timeout, with reconnect' => sub { }; +subtest 'Reconnection discards pending commands' => sub { + ok(my $r = Redis->new(reconnect => 2, server => $srv), + 'connected to our test redis-server'); + + my $processed_pending = 0; + $r->dbsize(sub { $processed_pending++ }); + + ok(close(delete $r->{sock}), 'evilly close connection to the server'); + ok($r->set(foo => 'bar'), 'send command with reconnect'); + + is($processed_pending, 0, 'pending command discarded on reconnect'); +}; + + +subtest 'INFO commands with extra logic triggers reconnect' => sub { + ok(my $r = Redis->new(reconnect => 2, server => $srv), + 'connected to our test redis-server'); + + ok($r->quit, 'close connection to the server'); + + my $info = $r->info; + is(ref $info, 'HASH', 'reconnect on INFO command'); +}; + + +subtest 'KEYS commands with extra logic triggers reconnect' => sub { + ok(my $r = Redis->new(reconnect => 2, server => $srv), + 'connected to our test redis-server'); + + ok($r->flushdb, 'delete all keys'); + ok($r->set(reconnect => $$), 'set known key'); + + ok($r->quit, 'close connection to the server'); + + my @keys = $r->keys('*'); + is_deeply(\@keys, ['reconnect'], 'reconnect on KEYS command'); +}; + + subtest "Bad commnands don't trigger reconnect" => sub { ok(my $r = Redis->new(reconnect => 2, server => $srv), 'connected to our test redis-server');