Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
27591cd
wip
ndyakov Oct 23, 2025
606264e
wip, used and unusable states
ndyakov Oct 23, 2025
0a75466
Merge remote-tracking branch 'origin/master' into ndyakov/state-machi…
ndyakov Oct 23, 2025
5721512
polish state machine
ndyakov Oct 24, 2025
663a60e
correct handling OnPut
ndyakov Oct 24, 2025
7526e67
better errors for tests, hook should work now
ndyakov Oct 24, 2025
92433e6
fix linter
ndyakov Oct 24, 2025
21bd243
improve reauth state management. fix tests
ndyakov Oct 24, 2025
3f29463
Update internal/pool/conn.go
ndyakov Oct 24, 2025
de2f8ba
Update internal/pool/conn.go
ndyakov Oct 24, 2025
94fa920
better timeouts
ndyakov Oct 24, 2025
cfcf37d
empty endpoint handoff case
ndyakov Oct 24, 2025
3a53e1b
fix handoff state when queued for handoff
ndyakov Oct 24, 2025
c4ed467
try to detect the deadlock
ndyakov Oct 24, 2025
9ad6288
try to detect the deadlock x2
ndyakov Oct 24, 2025
03b0003
delete should be called
ndyakov Oct 24, 2025
84e856e
improve tests
ndyakov Oct 24, 2025
a2c7a25
fix mark on uninitialized connection
ndyakov Oct 24, 2025
23d0e0f
Merge branch 'master' into ndyakov/state-machine-conn
ndyakov Oct 25, 2025
ffbe1e5
Update internal/pool/conn_state_test.go
ndyakov Oct 25, 2025
65a6ece
Update internal/pool/conn_state_test.go
ndyakov Oct 25, 2025
0964dcc
Update internal/pool/pool.go
ndyakov Oct 25, 2025
bc42307
Update internal/pool/conn_state.go
ndyakov Oct 25, 2025
33696fb
Update internal/pool/conn.go
ndyakov Oct 25, 2025
13a4b3f
fix error from copilot
ndyakov Oct 25, 2025
07e665f
address copilot comment
ndyakov Oct 25, 2025
080a33c
fix(pool): pool performance (#3565)
ndyakov Oct 27, 2025
9448059
initConn sets IDLE state
ndyakov Oct 27, 2025
b862bf5
Merge remote-tracking branch 'origin/master' into ndyakov/state-machi…
ndyakov Oct 28, 2025
d5db534
fix precision of time cache and usedAt
ndyakov Oct 28, 2025
dcd8f9c
allow e2e tests to run longer
ndyakov Oct 28, 2025
f1c8884
Merge branch 'master' into ndyakov/state-machine-conn
ndyakov Oct 28, 2025
0752aec
Fix broken initialization of idle connections
ndyakov Oct 28, 2025
54281d6
optimize push notif
ndyakov Oct 28, 2025
600dfe2
100ms -> 50ms
ndyakov Oct 29, 2025
dccf01f
use correct timer for last health check
ndyakov Oct 29, 2025
7201275
verify pass auth on conn creation
ndyakov Oct 29, 2025
62eecaa
fix assertion
ndyakov Oct 29, 2025
43eeae7
fix unsafe test
ndyakov Oct 29, 2025
2965e3d
fix benchmark test
ndyakov Oct 29, 2025
59da35b
improve remove conn
ndyakov Oct 29, 2025
09a2f07
re doesn't support requirepass
ndyakov Oct 29, 2025
fc2da24
wait more in e2e test
ndyakov Oct 30, 2025
5f0b58b
Merge branch 'master' into ndyakov/state-machine-conn
ndyakov Oct 30, 2025
d207749
flaky test
ndyakov Oct 30, 2025
ef3e06f
Merge remote-tracking branch 'origin/master' into ndyakov/state-machi…
ndyakov Oct 30, 2025
5fa97c8
add missed method in interface
ndyakov Oct 30, 2025
d91800d
fix test assertions
ndyakov Oct 30, 2025
c5ca81d
Merge branch 'master' into ndyakov/state-machine-conn
ndyakov Nov 4, 2025
a39dd4c
silence logs and faster hooks manager
ndyakov Nov 4, 2025
c3dbc8c
address linter comment
ndyakov Nov 4, 2025
3b65139
Merge branch 'master' into ndyakov/state-machine-conn
ndyakov Nov 5, 2025
95af71c
Merge branch 'master' into ndyakov/state-machine-conn
ndyakov Nov 5, 2025
18b46a1
fix flaky test
ndyakov Nov 5, 2025
4673c62
use read instad of control
ndyakov Nov 5, 2025
2b8023c
use pool size for semsize
ndyakov Nov 5, 2025
41024f7
CAS instead of reading the state
ndyakov Nov 5, 2025
3cb5ab3
Merge branch 'master' into ndyakov/state-machine-conn
ndyakov Nov 5, 2025
9466c1c
preallocate errors and states
ndyakov Nov 6, 2025
cca0382
preallocate state slices
ndyakov Nov 6, 2025
c412436
Merge branch 'master' into ndyakov/state-machine-conn
ndyakov Nov 7, 2025
edf6bd7
fix flaky test
ndyakov Nov 7, 2025
1dfa805
fix fast semaphore that could have been starved
ndyakov Nov 10, 2025
e418737
try to fix the semaphore
ndyakov Nov 10, 2025
dc319c0
should properly notify the waiters
ndyakov Nov 10, 2025
3fd7e08
waiter may double-release (if closed/times out)
ndyakov Nov 10, 2025
5b43c3c
priority of operations
ndyakov Nov 10, 2025
dd630c7
use simple approach of fifo waiters
ndyakov Nov 10, 2025
ac84940
use simple channel based semaphores
ndyakov Nov 10, 2025
08d9744
address linter and tests
ndyakov Nov 10, 2025
26ee59c
remove unused benchs
ndyakov Nov 10, 2025
d6159b9
change log message
ndyakov Nov 10, 2025
82dcbab
Merge branch 'master' into ndyakov/state-machine-conn
ndyakov Nov 10, 2025
56e3ae6
Merge branch 'master' into ndyakov/state-machine-conn
ndyakov Nov 11, 2025
9e24679
address pr comments
ndyakov Nov 11, 2025
d84b34a
address pr comments
ndyakov Nov 11, 2025
2d5a75b
fix data race
ndyakov Nov 11, 2025
154a6ee
Merge branch 'master' into ndyakov/state-machine-conn
ndyakov Nov 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 22 additions & 36 deletions internal/auth/streaming/pool_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,41 +179,27 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool,
r.workers <- struct{}{}
}()

var err error
timeout := time.After(r.reAuthTimeout)
// Create timeout context for connection acquisition
// This prevents indefinite waiting if the connection is stuck
ctx, cancel := context.WithTimeout(context.Background(), r.reAuthTimeout)
defer cancel()

// Try to acquire the connection for re-authentication
// We need to ensure the connection is IDLE (not IN_USE) before transitioning to UNUSABLE
// This prevents re-authentication from interfering with active commands
// Use AwaitAndTransition to wait for the connection to become IDLE
stateMachine := conn.GetStateMachine()
if stateMachine == nil {
// No state machine - should not happen, but handle gracefully
reAuthFn(pool.ErrConnUnusableTimeout)
return
}

// Try to acquire the connection
// We need to ensure the connection is both Usable and not Used
// to prevent data races with concurrent operations
const baseDelay = 10 * time.Microsecond
acquired := false
attempt := 0
for !acquired {
select {
case <-timeout:
// Timeout occurred, cannot acquire connection
err = pool.ErrConnUnusableTimeout
reAuthFn(err)
return
default:
// Try to acquire: set Usable=false, then check Used
if conn.CompareAndSwapUsable(true, false) {
if !conn.IsUsed() {
acquired = true
} else {
// Release Usable and retry with exponential backoff
// todo(ndyakov): think of a better way to do this without the need
// to release the connection, but just wait till it is not used
conn.SetUsable(true)
}
}
if !acquired {
// Exponential backoff: 10, 20, 40, 80... up to 5120 microseconds
delay := baseDelay * time.Duration(1<<uint(attempt%10)) // Cap exponential growth
time.Sleep(delay)
attempt++
}
}
_, err := stateMachine.AwaitAndTransition(ctx, []pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err != nil {
// Timeout or other error occurred, cannot acquire connection
reAuthFn(err)
return
}

// safety first
Expand All @@ -222,8 +208,8 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool,
reAuthFn(nil)
}

// Release the connection
conn.SetUsable(true)
// Release the connection: transition from UNUSABLE back to IDLE
stateMachine.Transition(pool.StateIdle)
}()
}

Expand Down
241 changes: 241 additions & 0 deletions internal/auth/streaming/pool_hook_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package streaming

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/redis/go-redis/v9/internal/pool"
)

// TestReAuthOnlyWhenIdle verifies that re-authentication only happens when
// a connection is in IDLE state, not when it's IN_USE.
func TestReAuthOnlyWhenIdle(t *testing.T) {
// Create a connection
cn := pool.NewConn(nil)

// Initialize to IDLE state
cn.GetStateMachine().Transition(pool.StateInitializing)
cn.GetStateMachine().Transition(pool.StateIdle)

// Simulate connection being acquired (IDLE → IN_USE)
if !cn.CompareAndSwapUsed(false, true) {
t.Fatal("Failed to acquire connection")
}

// Verify state is IN_USE
if state := cn.GetStateMachine().GetState(); state != pool.StateInUse {
t.Errorf("Expected state IN_USE, got %s", state)
}

// Try to transition to UNUSABLE (for reauth) - should fail
_, err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err == nil {
t.Error("Expected error when trying to transition IN_USE → UNUSABLE, but got none")
}

// Verify state is still IN_USE
if state := cn.GetStateMachine().GetState(); state != pool.StateInUse {
t.Errorf("Expected state to remain IN_USE, got %s", state)
}

// Release connection (IN_USE → IDLE)
if !cn.CompareAndSwapUsed(true, false) {
t.Fatal("Failed to release connection")
}

// Verify state is IDLE
if state := cn.GetStateMachine().GetState(); state != pool.StateIdle {
t.Errorf("Expected state IDLE, got %s", state)
}

// Now try to transition to UNUSABLE - should succeed
_, err = cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err != nil {
t.Errorf("Failed to transition IDLE → UNUSABLE: %v", err)
}

// Verify state is UNUSABLE
if state := cn.GetStateMachine().GetState(); state != pool.StateUnusable {
t.Errorf("Expected state UNUSABLE, got %s", state)
}
}

// TestReAuthWaitsForConnectionToBeIdle verifies that the re-auth worker
// waits for a connection to become IDLE before performing re-authentication.
func TestReAuthWaitsForConnectionToBeIdle(t *testing.T) {
// Create a connection
cn := pool.NewConn(nil)

// Initialize to IDLE state
cn.GetStateMachine().Transition(pool.StateInitializing)
cn.GetStateMachine().Transition(pool.StateIdle)

// Simulate connection being acquired (IDLE → IN_USE)
if !cn.CompareAndSwapUsed(false, true) {
t.Fatal("Failed to acquire connection")
}

// Track re-auth attempts
var reAuthAttempts atomic.Int32
var reAuthSucceeded atomic.Bool

// Start a goroutine that tries to acquire the connection for re-auth
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

// Try to acquire for re-auth with timeout
timeout := time.After(2 * time.Second)
acquired := false

for !acquired {
select {
case <-timeout:
t.Error("Timeout waiting to acquire connection for re-auth")
return
default:
reAuthAttempts.Add(1)
// Try to atomically transition from IDLE to UNUSABLE
_, err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err == nil {
// Successfully acquired
acquired = true
reAuthSucceeded.Store(true)
} else {
// Connection is still IN_USE, wait a bit
time.Sleep(10 * time.Millisecond)
}
}
}

// Release the connection
cn.GetStateMachine().Transition(pool.StateIdle)
}()

// Keep connection IN_USE for 500ms
time.Sleep(500 * time.Millisecond)

// Verify re-auth hasn't succeeded yet (connection is still IN_USE)
if reAuthSucceeded.Load() {
t.Error("Re-auth succeeded while connection was IN_USE")
}

// Verify there were multiple attempts
attempts := reAuthAttempts.Load()
if attempts < 2 {
t.Errorf("Expected multiple re-auth attempts, got %d", attempts)
}

// Release connection (IN_USE → IDLE)
if !cn.CompareAndSwapUsed(true, false) {
t.Fatal("Failed to release connection")
}

// Wait for re-auth to complete
wg.Wait()

// Verify re-auth succeeded after connection became IDLE
if !reAuthSucceeded.Load() {
t.Error("Re-auth did not succeed after connection became IDLE")
}

// Verify final state is IDLE
if state := cn.GetStateMachine().GetState(); state != pool.StateIdle {
t.Errorf("Expected final state IDLE, got %s", state)
}
}

// TestConcurrentReAuthAndUsage verifies that re-auth and normal usage
// don't interfere with each other.
func TestConcurrentReAuthAndUsage(t *testing.T) {
// Create a connection
cn := pool.NewConn(nil)

// Initialize to IDLE state
cn.GetStateMachine().Transition(pool.StateInitializing)
cn.GetStateMachine().Transition(pool.StateIdle)

var wg sync.WaitGroup
var usageCount atomic.Int32
var reAuthCount atomic.Int32

// Goroutine 1: Simulate normal usage (acquire/release)
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
// Try to acquire
if cn.CompareAndSwapUsed(false, true) {
usageCount.Add(1)
// Simulate work
time.Sleep(1 * time.Millisecond)
// Release
cn.CompareAndSwapUsed(true, false)
}
time.Sleep(1 * time.Millisecond)
}
}()

// Goroutine 2: Simulate re-auth attempts
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
// Try to acquire for re-auth
_, err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err == nil {
reAuthCount.Add(1)
// Simulate re-auth work
time.Sleep(2 * time.Millisecond)
// Release
cn.GetStateMachine().Transition(pool.StateIdle)
}
time.Sleep(2 * time.Millisecond)
}
}()

wg.Wait()

// Verify both operations happened
if usageCount.Load() == 0 {
t.Error("No successful usage operations")
}
if reAuthCount.Load() == 0 {
t.Error("No successful re-auth operations")
}

t.Logf("Usage operations: %d, Re-auth operations: %d", usageCount.Load(), reAuthCount.Load())

// Verify final state is IDLE
if state := cn.GetStateMachine().GetState(); state != pool.StateIdle {
t.Errorf("Expected final state IDLE, got %s", state)
}
}

// TestReAuthRespectsClosed verifies that re-auth doesn't happen on closed connections.
func TestReAuthRespectsClosed(t *testing.T) {
// Create a connection
cn := pool.NewConn(nil)

// Initialize to IDLE state
cn.GetStateMachine().Transition(pool.StateInitializing)
cn.GetStateMachine().Transition(pool.StateIdle)

// Close the connection
cn.GetStateMachine().Transition(pool.StateClosed)

// Try to transition to UNUSABLE - should fail
_, err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err == nil {
t.Error("Expected error when trying to transition CLOSED → UNUSABLE, but got none")
}

// Verify state is still CLOSED
if state := cn.GetStateMachine().GetState(); state != pool.StateClosed {
t.Errorf("Expected state to remain CLOSED, got %s", state)
}
}

Loading
Loading