Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions cluster_conn.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
package main

import (
"github.com/mediocregopher/radix/v3"
"context"
"github.com/mediocregopher/radix/v4"
"log"
)

func getOSSClusterConn(addr string, opts []radix.DialOpt, clients uint64) *radix.Cluster {
var vanillaCluster *radix.Cluster
func getOSSClusterConn(addr string, opts radix.Dialer, clients uint64) *radix.Cluster {
var err error
var vanillaCluster *radix.Cluster
var size int = int(clients)
ctx := context.Background()
laddr := make([]string, 1)
laddr[0] = addr
poolConfig := radix.PoolConfig{}
poolConfig.Dialer = opts
poolConfig.Size = size

customConnFunc := func(network, addr string) (radix.Conn, error) {
return radix.Dial(network, addr, opts...,
)
}

// this cluster will use the ClientFunc to create a pool to each node in the
// cluster.
poolFunc := func(network, addr string) (radix.Client, error) {
return radix.NewPool(network, addr, int(clients), radix.PoolConnFunc(customConnFunc), radix.PoolPipelineWindow(0, 0))
}
clusterConfig := radix.ClusterConfig{}
clusterConfig.PoolConfig = poolConfig

vanillaCluster, err = radix.NewCluster([]string{addr}, radix.ClusterPoolFunc(poolFunc))
vanillaCluster, err = clusterConfig.New(ctx, laddr)
if err != nil {
log.Fatalf("Error preparing for benchmark, while creating new connection. error = %v", err)
}
// Issue CLUSTER SLOTS command
err = vanillaCluster.Sync()
err = vanillaCluster.Sync(ctx)
if err != nil {
log.Fatalf("Error preparing for benchmark, while issuing CLUSTER SLOTS. error = %v", err)
}
Expand Down
16 changes: 16 additions & 0 deletions common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package main

import (
"context"
radix "github.com/mediocregopher/radix/v4"
)

type Client interface {

// Do performs an Action on a Conn connected to the redis instance.
Do(context.Context, radix.Action) error

// Once Close() is called all future method calls on the Client will return
// an error
Close() error
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.14
require (
github.com/HdrHistogram/hdrhistogram-go v1.1.0
github.com/google/go-cmp v0.5.5 // indirect
github.com/mediocregopher/radix/v3 v3.5.2
github.com/mediocregopher/radix/v4 v4.1.2
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
)
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mediocregopher/radix/v3 v3.5.2 h1:A9u3G7n4+fWmDZ2ZDHtlK+cZl4q55T+7RjKjR0/MAdk=
github.com/mediocregopher/radix/v3 v3.5.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/mediocregopher/radix/v4 v4.1.2 h1:Pj7XnNK5WuzzFy63g98pnccainAePK+aZNQRvxSvj2I=
github.com/mediocregopher/radix/v4 v4.1.2/go.mod h1:ajchozX/6ELmydxWeWM6xCFHVpZ4+67LXHOTOVR0nCE=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -29,6 +29,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tilinna/clock v1.0.2 h1:6BO2tyAC9JbPExKH/z9zl44FLu1lImh3nDNKA0kgrkI=
github.com/tilinna/clock v1.0.2/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down
44 changes: 27 additions & 17 deletions redis-bechmark-go.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package main

import (
"context"
"flag"
"fmt"
hdrhistogram "github.com/HdrHistogram/hdrhistogram-go"
"github.com/mediocregopher/radix/v3"
"github.com/mediocregopher/radix/v4"
"golang.org/x/time/rate"
"log"
"math"
Expand Down Expand Up @@ -37,15 +38,15 @@ func stringWithCharset(length int, charset string) string {
return string(b)
}

func ingestionRoutine(conn radix.Client, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, cmdS []string, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace int, useLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs int) {
func ingestionRoutine(conn Client, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, cmdS []string, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace int, useLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs int) {
defer wg.Done()
for i := 0; uint64(i) < number_samples || loop; i++ {
rawCurrentCmd, key, _ := keyBuildLogic(keyplace, dataplace, datasize, keyspacelen, cmdS)
sendCmdLogic(conn, rawCurrentCmd, enableMultiExec, key, datapointsChan, continueOnError, debug_level, useLimiter, rateLimiter, waitReplicas, waitReplicasMs)
}
}

func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS []string) (cmd radix.CmdAction, key string, keySlot uint16) {
func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS []string) (cmd radix.Action, key string, keySlot uint16) {
newCmdS := cmdS
if keyPos > -1 {
newCmdS[keyPos] = fmt.Sprintf("%d", rand.Int63n(int64(keyspacelen)))
Expand All @@ -54,22 +55,23 @@ func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS [
newCmdS[dataPos] = stringWithCharset(int(datasize), charset)
}
rawCmd := radix.Cmd(nil, newCmdS[0], newCmdS[1:]...)

return rawCmd, key, radix.ClusterSlot([]byte(newCmdS[1]))
}

func sendCmdLogic(conn radix.Client, cmd radix.CmdAction, enableMultiExec bool, key string, datapointsChan chan datapoint, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs int) {
func sendCmdLogic(conn Client, cmd radix.Action, enableMultiExec bool, key string, datapointsChan chan datapoint, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs int) {
ctx := context.Background()

if useRateLimiter {
r := rateLimiter.ReserveN(time.Now(), int(1))
time.Sleep(r.Delay())
}
var err error
startT := time.Now()
if enableMultiExec {
err = conn.Do(radix.WithConn(key, func(c radix.Conn) error {
err = conn.Do(ctx, radix.WithConn(key, func(ctx context.Context, c radix.Conn) error {

// Begin the transaction with a MULTI command
if err := conn.Do(radix.Cmd(nil, "MULTI")); err != nil {
if err := conn.Do(ctx, radix.Cmd(nil, "MULTI")); err != nil {
log.Fatalf("Received an error while preparing for MULTI: %v, error: %v", cmd, err)
}

Expand All @@ -83,25 +85,28 @@ func sendCmdLogic(conn radix.Client, cmd radix.CmdAction, enableMultiExec bool,
// The return from DISCARD doesn't matter. If it's an error then
// it's a network error and the Conn will be closed by the
// client.
conn.Do(radix.Cmd(nil, "DISCARD"))
conn.Do(ctx, radix.Cmd(nil, "DISCARD"))
log.Fatalf("Received an error while in multi: %v, error: %v", cmd, err)
}
}()

// queue up the transaction's commands
err = conn.Do(cmd)
err = conn.Do(ctx, cmd)

// execute the transaction, capturing the result in a Tuple. We only
// care about the first element (the result from GET), so we discard the
// second by setting nil.
return conn.Do(radix.Cmd(nil, "EXEC"))
return conn.Do(ctx, radix.Cmd(nil, "EXEC"))
}))
} else if waitReplicas > 0 {
// pipeline the command + wait
err = conn.Do(radix.Pipeline(cmd,
radix.Cmd(nil, "WAIT", fmt.Sprintf("%d", waitReplicas), fmt.Sprintf("%d", waitReplicasMs))))
// Create a new pipeline for the WAIT command
p := radix.NewPipeline()
// Pass both cmd and waitCmd to the original pipeline
p.Append(cmd)
p.Append(radix.Cmd(nil, "WAIT", fmt.Sprintf("%d", waitReplicas), fmt.Sprintf("%d", waitReplicasMs)))
err = conn.Do(ctx, p)
} else {
err = conn.Do(cmd)
err = conn.Do(ctx, cmd)
}
endT := time.Now()
if err != nil {
Expand Down Expand Up @@ -134,6 +139,7 @@ func main() {
clusterMode := flag.Bool("oss-cluster", false, "Enable OSS cluster mode.")
loop := flag.Bool("l", false, "Loop. Run the tests forever.")
version := flag.Bool("v", false, "Output version and exit")
resp := flag.Int("resp", 2, "redis command response protocol (2 - RESP 2, 3 - RESP 3)")
flag.Parse()
git_sha := toolGitSHA1()
git_dirty_str := ""
Expand Down Expand Up @@ -173,9 +179,14 @@ func main() {
samplesPerClient := *numberRequests / *clients
client_update_tick := 1
latencies = hdrhistogram.New(1, 90000000, 3)
opts := make([]radix.DialOpt, 0)
opts := radix.Dialer{}
if *password != "" {
opts = append(opts, radix.DialAuthPass(*password))
opts.AuthPass = *password
}
if *resp == 2 {
opts.Protocol = "2"
} else if *resp == 3 {
opts.Protocol = "3"
}
ips, _ := net.LookupIP(*host)
fmt.Printf("IPs %v\n", ips)
Expand All @@ -191,7 +202,6 @@ func main() {
fmt.Printf("Using random seed: %d\n", *seed)
rand.Seed(*seed)
var cluster *radix.Cluster

datapointsChan := make(chan datapoint, *numberRequests)
for channel_id := 1; uint64(channel_id) <= *clients; channel_id++ {
wg.Add(1)
Expand Down
22 changes: 12 additions & 10 deletions standalone_conn.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package main

import (
"github.com/mediocregopher/radix/v3"
"context"
"github.com/mediocregopher/radix/v4"
"log"
)

func getStandaloneConn(addr string, opts []radix.DialOpt, clients uint64) *radix.Pool {
var pool *radix.Pool
func getStandaloneConn(addr string, opts radix.Dialer, clients uint64) Client {
var err error

customConnFunc := func(network, addr string) (radix.Conn, error) {
return radix.Dial(network, addr, opts...,
)
}
var size int = int(clients)
network := "tcp"
pool, err = radix.NewPool(network, addr, int(clients), radix.PoolConnFunc(customConnFunc), radix.PoolPipelineWindow(0, 0))
ctx := context.Background()

poolConfig := radix.PoolConfig{}
poolConfig.Dialer = opts
poolConfig.Size = size

pool, err := poolConfig.New(ctx, network, addr)
if err != nil {
log.Fatalf("Error preparing for benchmark, while creating new connection. error = %v", err)
log.Fatalf("Error preparing for benchmark, while creating new pool. error = %v", err)
}
return pool
}