5 Star 4 Fork 1

Gitee 极速下载/rueidis

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/rueian/rueidis
克隆/下载
mux_test.go 31.66 KB
一键复制 编辑 原始数据 按行查看 历史
Hyeonho Kim 提交于 2025-01-19 03:32 . feat: add AZ to replica info (#721)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176
package rueidis
import (
"bufio"
"context"
"errors"
"fmt"
"net"
"runtime"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/redis/rueidis/internal/cmds"
)
func setupMux(wires []*mockWire) (conn *mux, checkClean func(t *testing.T)) {
return setupMuxWithOption(wires, &ClientOption{})
}
func setupMuxWithOption(wires []*mockWire, option *ClientOption) (conn *mux, checkClean func(t *testing.T)) {
var mu sync.Mutex
var count = -1
wfn := func() wire {
mu.Lock()
defer mu.Unlock()
count++
return wires[count]
}
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
return newMux("", option, (*mockWire)(nil), (*mockWire)(nil), wfn, wfn), func(t *testing.T) {
if count != len(wires)-1 {
t.Fatalf("there is %d remaining unused wires", len(wires)-count-1)
}
}
}
func TestNewMuxDailErr(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
c := 0
e := errors.New("any")
m := makeMux("", &ClientOption{}, func(dst string, opt *ClientOption) (net.Conn, error) {
c++
return nil, e
})
if err := m.Dial(); err != e {
t.Fatalf("unexpected return %v", err)
}
if c != 1 {
t.Fatalf("dialFn not called")
}
if w := m.pipe(0); w != m.dead { // c = 2
t.Fatalf("unexpected wire %v", w)
}
if err := m.Dial(); err != e { // c = 3
t.Fatalf("unexpected return %v", err)
}
if w := m.Acquire(); w != m.dead {
t.Fatalf("unexpected wire %v", w)
}
if c != 4 {
t.Fatalf("dialFn not called %v", c)
}
}
func TestNewMux(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
n1, n2 := net.Pipe()
mock := &redisMock{t: t, buf: bufio.NewReader(n2), conn: n2}
go func() {
mock.Expect("HELLO", "3").
Reply(RedisMessage{
typ: '%',
values: []RedisMessage{
{typ: '+', string: "proto"},
{typ: ':', integer: 3},
},
})
mock.Expect("CLIENT", "TRACKING", "ON", "OPTIN").
ReplyString("OK")
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LibName).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("PING").ReplyString("OK")
mock.Close()
}()
m := makeMux("", &ClientOption{}, func(dst string, opt *ClientOption) (net.Conn, error) {
return n1, nil
})
if err := m.Dial(); err != nil {
t.Fatalf("unexpected error %v", err)
}
t.Run("Override with previous mux", func(t *testing.T) {
m2 := makeMux("", &ClientOption{}, func(dst string, opt *ClientOption) (net.Conn, error) {
return n1, nil
})
m2.Override(m)
if err := m2.Dial(); err != nil {
t.Fatalf("unexpected error %v", err)
}
m2.Close()
})
}
func TestNewMuxPipelineMultiplex(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
for _, v := range []int{-1, 0, 1, 2} {
m := makeMux("", &ClientOption{PipelineMultiplex: v}, func(dst string, opt *ClientOption) (net.Conn, error) { return nil, nil })
if (v < 0 && len(m.wire) != 1) || (v >= 0 && len(m.wire) != 1<<v) {
t.Fatalf("unexpected len(m.wire): %v", len(m.wire))
}
}
}
func TestMuxAddr(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
m := makeMux("dst1", &ClientOption{}, nil)
if m.Addr() != "dst1" {
t.Fatalf("unexpected m.Addr != dst1")
}
}
func TestMuxDialSuppress(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
var wires, waits, done int64
blocking := make(chan struct{})
m := newMux("", &ClientOption{}, (*mockWire)(nil), (*mockWire)(nil), func() wire {
atomic.AddInt64(&wires, 1)
<-blocking
return &mockWire{}
}, func() wire {
return &mockWire{}
})
for i := 0; i < 1000; i++ {
go func() {
atomic.AddInt64(&waits, 1)
m.Info()
atomic.AddInt64(&done, 1)
}()
}
for atomic.LoadInt64(&waits) != 1000 {
runtime.Gosched()
}
close(blocking)
for atomic.LoadInt64(&done) != 1000 {
runtime.Gosched()
}
if atomic.LoadInt64(&wires) != 1 {
t.Fatalf("wireFn is not suppressed")
}
}
//gocyclo:ignore
func TestMuxReuseWire(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
t.Run("reuse wire if no error", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoFn: func(cmd Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "PONG"}, nil)
},
},
})
defer checkClean(t)
defer m.Close()
for i := 0; i < 2; i++ {
if err := m.Do(context.Background(), cmds.NewCompleted([]string{"PING"})).Error(); err != nil {
t.Fatalf("unexpected error %v", err)
}
}
m.Close()
})
t.Run("reuse blocking (dpool) pool", func(t *testing.T) {
blocking := make(chan struct{})
response := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoFn: func(cmd Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "ACQUIRED"}, nil)
},
},
{
DoFn: func(cmd Completed) RedisResult {
blocking <- struct{}{}
return <-response
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wire1 := m.dpool.Acquire()
go func() {
// this should use the second wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_RESPONSE" {
t.Errorf("unexpected response %v", val)
}
close(blocking)
}()
<-blocking
m.dpool.Store(wire1)
// this should use the first wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "ACQUIRED" {
t.Fatalf("unexpected response %v", val)
}
response <- newResult(RedisMessage{typ: '+', string: "BLOCK_RESPONSE"}, nil)
<-blocking
})
t.Run("reuse blocking (spool) pool", func(t *testing.T) {
blocking := make(chan struct{})
response := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoFn: func(cmd Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "ACQUIRED"}, nil)
},
},
{
DoFn: func(cmd Completed) RedisResult {
blocking <- struct{}{}
return <-response
},
},
})
m.usePool = true // switch to spool
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wire1 := m.spool.Acquire()
go func() {
// this should use the second wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_RESPONSE" {
t.Errorf("unexpected response %v", val)
}
close(blocking)
}()
<-blocking
m.spool.Store(wire1)
// this should use the first wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "ACQUIRED" {
t.Fatalf("unexpected response %v", val)
}
response <- newResult(RedisMessage{typ: '+', string: "BLOCK_RESPONSE"}, nil)
<-blocking
})
t.Run("reuse blocking (dpool) pool DoMulti", func(t *testing.T) {
blocking := make(chan struct{})
response := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "ACQUIRED"}, nil)}}
},
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocking <- struct{}{}
return &redisresults{s: []RedisResult{<-response}}
},
},
})
m.usePool = true // switch to spool
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wire1 := m.spool.Acquire()
go func() {
// this should use the second wire
if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_RESPONSE" {
t.Errorf("unexpected response %v", val)
}
close(blocking)
}()
<-blocking
m.spool.Store(wire1)
// this should use the first wire
if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "ACQUIRED" {
t.Fatalf("unexpected response %v", val)
}
response <- newResult(RedisMessage{typ: '+', string: "BLOCK_RESPONSE"}, nil)
<-blocking
})
t.Run("reuse blocking (spool) pool DoMulti", func(t *testing.T) {
blocking := make(chan struct{})
response := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "ACQUIRED"}, nil)}}
},
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocking <- struct{}{}
return &redisresults{s: []RedisResult{<-response}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wire1 := m.dpool.Acquire()
go func() {
// this should use the second wire
if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_RESPONSE" {
t.Errorf("unexpected response %v", val)
}
close(blocking)
}()
<-blocking
m.dpool.Store(wire1)
// this should use the first wire
if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "ACQUIRED" {
t.Fatalf("unexpected response %v", val)
}
response <- newResult(RedisMessage{typ: '+', string: "BLOCK_RESPONSE"}, nil)
<-blocking
})
t.Run("unsubscribe blocking pool", func(t *testing.T) {
cleaned := false
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
CleanSubscriptionsFn: func() {
cleaned = true
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wire1 := m.Acquire()
m.Store(wire1)
if !cleaned {
t.Fatalf("CleanSubscriptions not called")
}
})
}
//gocyclo:ignore
func TestMuxDelegation(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
t.Run("wire info", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
InfoFn: func() map[string]RedisMessage {
return map[string]RedisMessage{"key": {typ: '+', string: "value"}}
},
},
})
defer checkClean(t)
defer m.Close()
if info := m.Info(); info == nil || info["key"].string != "value" {
t.Fatalf("unexpected info %v", info)
}
})
t.Run("wire version", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
VersionFn: func() int {
return 7
},
},
})
defer checkClean(t)
defer m.Close()
if version := m.Version(); version != 7 {
t.Fatalf("unexpected version %v", version)
}
})
t.Run("wire az", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
AZFn: func() string {
return "az"
},
},
})
defer checkClean(t)
defer m.Close()
if az := m.AZ(); az != "az" {
t.Fatalf("unexpected az %v", az)
}
})
t.Run("wire err", func(t *testing.T) {
e := errors.New("err")
m, checkClean := setupMux([]*mockWire{
{
ErrorFn: func() error {
return e
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Error(); err != e {
t.Fatalf("unexpected err %v", err)
}
})
t.Run("wire do", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoFn: func(cmd Completed) RedisResult {
return newErrResult(context.DeadlineExceeded)
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
},
{
DoFn: func(cmd Completed) RedisResult {
if cmd.Commands()[0] != "READONLY_COMMAND" {
t.Fatalf("command should be READONLY_COMMAND")
}
return newResult(RedisMessage{typ: '+', string: "READONLY_COMMAND_RESPONSE"}, nil)
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Do(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})).Error(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
if val, err := m.Do(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "READONLY_COMMAND_RESPONSE" {
t.Fatalf("unexpected response %v", val)
}
})
t.Run("wire do stream", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoStreamFn: func(pool *pool, cmd Completed) RedisResultStream {
return RedisResultStream{e: errors.New(cmd.Commands()[0])}
},
},
})
defer checkClean(t)
defer m.Close()
if s := m.DoStream(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})); s.Error().Error() != "READONLY_COMMAND" {
t.Fatalf("unexpected error %v", s.Error())
}
})
t.Run("wire do multi", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoMultiFn: func(multi ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newErrResult(context.DeadlineExceeded)}}
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
},
{
DoMultiFn: func(multi ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "MULTI_COMMANDS_RESPONSE"}, nil)}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.DoMulti(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})).s[0].Error(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
if val, err := m.DoMulti(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})).s[0].ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "MULTI_COMMANDS_RESPONSE" {
t.Fatalf("unexpected response %v", val)
}
})
t.Run("wire do multi stream", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoMultiStreamFn: func(pool *pool, cmd ...Completed) MultiRedisResultStream {
return MultiRedisResultStream{e: errors.New(cmd[0].Commands()[0])}
},
},
})
defer checkClean(t)
defer m.Close()
if s := m.DoMultiStream(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})); s.Error().Error() != "READONLY_COMMAND" {
t.Fatalf("unexpected error %v", s.Error())
}
})
t.Run("wire do cache", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoCacheFn: func(cmd Cacheable, ttl time.Duration) RedisResult {
return newErrResult(context.DeadlineExceeded)
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
},
{
DoCacheFn: func(cmd Cacheable, ttl time.Duration) RedisResult {
return newResult(RedisMessage{typ: '+', string: "READONLY_COMMAND_RESPONSE"}, nil)
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.DoCache(context.Background(), Cacheable(cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})), time.Second).Error(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
if val, err := m.DoCache(context.Background(), Cacheable(cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})), time.Second).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "READONLY_COMMAND_RESPONSE" {
t.Fatalf("unexpected response %v", val)
}
})
t.Run("wire do multi cache", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults {
return &redisresults{s: []RedisResult{newErrResult(context.DeadlineExceeded)}}
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
},
{
DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "MULTI_COMMANDS_RESPONSE"}, nil)}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.DoMultiCache(context.Background(), CT(Cacheable(cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})), time.Second)).s[0].Error(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
if val, err := m.DoMultiCache(context.Background(), CT(Cacheable(cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})), time.Second)).s[0].ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "MULTI_COMMANDS_RESPONSE" {
t.Fatalf("unexpected response %v", val)
}
})
t.Run("wire do multi cache multiple slots", func(t *testing.T) {
multiplex := 1
wires := make([]*mockWire, 1<<multiplex)
for i := range wires {
idx := uint16(i)
wires[i] = &mockWire{
DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults {
result := make([]RedisResult, len(multi))
for j, cmd := range multi {
if s := cmd.Cmd.Slot() & uint16(len(wires)-1); s != idx {
result[j] = newErrResult(fmt.Errorf("wrong slot %v %v", s, idx))
} else {
result[j] = newResult(RedisMessage{typ: '+', string: cmd.Cmd.Commands()[1]}, nil)
}
}
return &redisresults{s: result}
},
}
}
m, checkClean := setupMuxWithOption(wires, &ClientOption{PipelineMultiplex: multiplex})
defer checkClean(t)
defer m.Close()
for i := range wires {
m._pipe(uint16(i))
}
builder := cmds.NewBuilder(cmds.NoSlot)
for count := 1; count <= 3; count++ {
commands := make([]CacheableTTL, count)
for c := 0; c < count; c++ {
commands[c] = CT(builder.Get().Key(strconv.Itoa(c)).Cache(), time.Second)
}
for i, resp := range m.DoMultiCache(context.Background(), commands...).s {
if v, err := resp.ToString(); err != nil || v != strconv.Itoa(i) {
t.Fatalf("unexpected resp %v %v", v, err)
}
}
}
})
t.Run("wire do multi cache multiple slots fail", func(t *testing.T) {
multiplex := 1
wires := make([]*mockWire, 1<<multiplex)
for i := range wires {
idx := uint16(i)
wires[i] = &mockWire{
DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults {
for _, cmd := range multi {
if s := cmd.Cmd.Slot() & uint16(len(wires)-1); s != idx {
return &redisresults{s: []RedisResult{newErrResult(fmt.Errorf("wrong slot %v %v", s, idx))}}
}
}
return &redisresults{s: []RedisResult{newErrResult(context.DeadlineExceeded)}}
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
}
}
m, checkClean := setupMuxWithOption(wires, &ClientOption{PipelineMultiplex: multiplex})
defer checkClean(t)
defer m.Close()
for i := range wires {
m._pipe(uint16(i))
}
builder := cmds.NewBuilder(cmds.NoSlot)
commands := make([]CacheableTTL, 4)
for c := 0; c < len(commands); c++ {
commands[c] = CT(builder.Get().Key(strconv.Itoa(c)).Cache(), time.Second)
}
if err := m.DoMultiCache(context.Background(), commands...).s[0].Error(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
})
t.Run("wire receive", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
return context.DeadlineExceeded
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
},
{
ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
if subscribe.Commands()[0] != "SUBSCRIBE" {
t.Fatalf("command should be SUBSCRIBE")
}
return nil
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Receive(context.Background(), cmds.NewCompleted([]string{"SUBSCRIBE"}), func(message PubSubMessage) {}); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
if err := m.Receive(context.Background(), cmds.NewCompleted([]string{"SUBSCRIBE"}), func(message PubSubMessage) {}); err != nil {
t.Fatalf("unexpected error %v", err)
}
})
t.Run("single blocking", func(t *testing.T) {
blocked := make(chan struct{})
responses := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoFn: func(cmd Completed) RedisResult {
blocked <- struct{}{}
return <-responses
},
},
{
DoFn: func(cmd Completed) RedisResult {
blocked <- struct{}{}
return <-responses
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wg := sync.WaitGroup{}
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"BLOCK"})).ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_COMMANDS_RESPONSE" {
t.Errorf("unexpected response %v", val)
} else {
wg.Done()
}
}()
}
for i := 0; i < 2; i++ {
<-blocked
}
for i := 0; i < 2; i++ {
responses <- newResult(RedisMessage{typ: '+', string: "BLOCK_COMMANDS_RESPONSE"}, nil)
}
wg.Wait()
})
t.Run("single blocking no recycle the wire if err", func(t *testing.T) {
closed := false
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoFn: func(cmd Completed) RedisResult {
return newErrResult(context.DeadlineExceeded)
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
CloseFn: func() {
closed = true
},
},
{
DoFn: func(cmd Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "OK"}, nil)
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
if err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"BLOCK"})).Error(); err != context.DeadlineExceeded {
t.Errorf("unexpected error %v", err)
}
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"BLOCK"})).ToString(); err != nil || val != "OK" {
t.Errorf("unexpected response %v %v", err, val)
}
if !closed {
t.Errorf("wire not closed")
}
})
t.Run("multiple blocking", func(t *testing.T) {
blocked := make(chan struct{})
responses := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wg := sync.WaitGroup{}
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
if val, err := m.DoMulti(
context.Background(),
cmds.NewReadOnlyCompleted([]string{"READONLY"}),
cmds.NewBlockingCompleted([]string{"BLOCK"}),
).s[0].ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_COMMANDS_RESPONSE" {
t.Errorf("unexpected response %v", val)
} else {
wg.Done()
}
}()
}
for i := 0; i < 2; i++ {
<-blocked
}
for i := 0; i < 2; i++ {
responses <- newResult(RedisMessage{typ: '+', string: "BLOCK_COMMANDS_RESPONSE"}, nil)
}
wg.Wait()
})
t.Run("multiple long pipeline", func(t *testing.T) {
blocked := make(chan struct{})
responses := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wg := sync.WaitGroup{}
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
pipeline := make(Commands, DefaultBlockingPipeline)
for i := 0; i < len(pipeline); i++ {
pipeline[i] = cmds.NewCompleted([]string{"SET"})
}
if val, err := m.DoMulti(context.Background(), pipeline...).s[0].ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_COMMANDS_RESPONSE" {
t.Errorf("unexpected response %v", val)
} else {
wg.Done()
}
}()
}
for i := 0; i < 2; i++ {
<-blocked
}
for i := 0; i < 2; i++ {
responses <- newResult(RedisMessage{typ: '+', string: "BLOCK_COMMANDS_RESPONSE"}, nil)
}
wg.Wait()
})
t.Run("multi blocking no recycle the wire if err", func(t *testing.T) {
closed := false
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newErrResult(context.DeadlineExceeded)}}
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
CloseFn: func() {
closed = true
},
},
{
DoFn: func(cmd Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "OK"}, nil)
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
if err := m.DoMulti(
context.Background(),
cmds.NewReadOnlyCompleted([]string{"READONLY"}),
cmds.NewBlockingCompleted([]string{"BLOCK"}),
).s[0].Error(); err != context.DeadlineExceeded {
t.Errorf("unexpected error %v", err)
}
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"BLOCK"})).ToString(); err != nil || val != "OK" {
t.Errorf("unexpected response %v %v", err, val)
}
if !closed {
t.Errorf("wire not closed")
}
})
}
//gocyclo:ignore
func TestMuxRegisterCloseHook(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
t.Run("trigger hook with unexpected error", func(t *testing.T) {
var hook atomic.Value
m, checkClean := setupMux([]*mockWire{
{
DoFn: func(cmd Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "PONG1"}, nil)
},
SetOnCloseHookFn: func(fn func(error)) {
hook.Store(fn)
},
},
{
DoFn: func(cmd Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "PONG2"}, nil)
},
},
})
defer checkClean(t)
defer m.Close()
if resp, _ := m.Do(context.Background(), cmds.NewCompleted([]string{"PING"})).ToString(); resp != "PONG1" {
t.Fatalf("unexpected response %v", resp)
}
hook.Load().(func(error))(errors.New("any")) // invoke the hook, this should cause the first wire be discarded
if resp, _ := m.Do(context.Background(), cmds.NewCompleted([]string{"PING"})).ToString(); resp != "PONG2" {
t.Fatalf("unexpected response %v", resp)
}
})
t.Run("not trigger hook with ErrClosing", func(t *testing.T) {
var hook atomic.Value
m, checkClean := setupMux([]*mockWire{
{
DoFn: func(cmd Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "PONG1"}, nil)
},
SetOnCloseHookFn: func(fn func(error)) {
hook.Store(fn)
},
},
})
defer checkClean(t)
defer m.Close()
if resp, _ := m.Do(context.Background(), cmds.NewCompleted([]string{"PING"})).ToString(); resp != "PONG1" {
t.Fatalf("unexpected response %v", resp)
}
hook.Load().(func(error))(ErrClosing) // invoke the hook, this should cause the first wire be discarded
if resp, _ := m.Do(context.Background(), cmds.NewCompleted([]string{"PING"})).ToString(); resp != "PONG1" {
t.Fatalf("unexpected response %v", resp)
}
})
}
func BenchmarkClientSideCaching(b *testing.B) {
setup := func(b *testing.B) *mux {
c := makeMux("127.0.0.1:6379", &ClientOption{CacheSizeEachConn: DefaultCacheBytes}, func(dst string, opt *ClientOption) (conn net.Conn, err error) {
return net.Dial("tcp", dst)
})
if err := c.Dial(); err != nil {
panic(err)
}
b.SetParallelism(100)
b.ResetTimer()
return c
}
b.Run("Do", func(b *testing.B) {
m := setup(b)
cmd := cmds.NewCompleted([]string{"GET", "a"})
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
m.Do(context.Background(), cmd)
}
})
})
b.Run("DoCache", func(b *testing.B) {
m := setup(b)
cmd := Cacheable(cmds.NewCompleted([]string{"GET", "a"}))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
m.DoCache(context.Background(), cmd, time.Second*5)
}
})
})
}
type mockWire struct {
DoFn func(cmd Completed) RedisResult
DoCacheFn func(cmd Cacheable, ttl time.Duration) RedisResult
DoMultiFn func(multi ...Completed) *redisresults
DoMultiCacheFn func(multi ...CacheableTTL) *redisresults
ReceiveFn func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error
DoStreamFn func(pool *pool, cmd Completed) RedisResultStream
DoMultiStreamFn func(pool *pool, cmd ...Completed) MultiRedisResultStream
InfoFn func() map[string]RedisMessage
AZFn func() string
VersionFn func() int
ErrorFn func() error
CloseFn func()
CleanSubscriptionsFn func()
SetPubSubHooksFn func(hooks PubSubHooks) <-chan error
SetOnCloseHookFn func(fn func(error))
}
func (m *mockWire) Do(ctx context.Context, cmd Completed) RedisResult {
if m.DoFn != nil {
return m.DoFn(cmd)
}
return RedisResult{}
}
func (m *mockWire) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) RedisResult {
if m.DoCacheFn != nil {
return m.DoCacheFn(cmd, ttl)
}
return RedisResult{}
}
func (m *mockWire) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisresults {
if m.DoMultiCacheFn != nil {
return m.DoMultiCacheFn(multi...)
}
return nil
}
func (m *mockWire) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
if m.DoMultiFn != nil {
return m.DoMultiFn(multi...)
}
return nil
}
func (m *mockWire) Receive(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
if m.ReceiveFn != nil {
return m.ReceiveFn(ctx, subscribe, fn)
}
return nil
}
func (m *mockWire) DoStream(ctx context.Context, pool *pool, cmd Completed) RedisResultStream {
if m.DoStreamFn != nil {
return m.DoStreamFn(pool, cmd)
}
return RedisResultStream{}
}
func (m *mockWire) DoMultiStream(ctx context.Context, pool *pool, cmd ...Completed) MultiRedisResultStream {
if m.DoMultiStreamFn != nil {
return m.DoMultiStreamFn(pool, cmd...)
}
return MultiRedisResultStream{}
}
func (m *mockWire) CleanSubscriptions() {
if m.CleanSubscriptionsFn != nil {
m.CleanSubscriptionsFn()
}
}
func (m *mockWire) SetPubSubHooks(hooks PubSubHooks) <-chan error {
if m.SetPubSubHooksFn != nil {
return m.SetPubSubHooksFn(hooks)
}
return nil
}
func (m *mockWire) SetOnCloseHook(fn func(error)) {
if m.SetOnCloseHookFn != nil {
m.SetOnCloseHookFn(fn)
}
}
func (m *mockWire) Info() map[string]RedisMessage {
if m.InfoFn != nil {
return m.InfoFn()
}
return nil
}
func (m *mockWire) Version() int {
if m.VersionFn != nil {
return m.VersionFn()
}
return 0
}
func (m *mockWire) AZ() string {
if m.AZFn != nil {
return m.AZFn()
}
return ""
}
func (m *mockWire) Error() error {
if m == nil {
return ErrClosing
}
if m.ErrorFn != nil {
return m.ErrorFn()
}
return nil
}
func (m *mockWire) Close() {
if m == nil {
return
}
if m.CloseFn != nil {
m.CloseFn()
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/mirrors/rueidis.git
[email protected]:mirrors/rueidis.git
mirrors
rueidis
rueidis
main

搜索帮助