5 Star 4 Fork 1

Gitee 极速下载/rueidis

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/rueian/rueidis
克隆/下载
rueidis_test.go 14.54 KB
一键复制 编辑 原始数据 按行查看 历史
Hyeonho Kim 提交于 2024-12-14 01:26 . feat: replica selector (#692)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
package rueidis
import (
"bufio"
"context"
"crypto/ed25519"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"math/big"
"net"
"os"
"strings"
"testing"
"time"
"github.com/onsi/gomega"
"github.com/onsi/gomega/gleak"
)
func SetupLeakDetection() (gomega.Gomega, []gleak.Goroutine) {
return gomega.NewGomega(func(message string, callerSkip ...int) {
panic(message)
}), gleak.Goroutines()
}
func ShouldNotLeaked(g gomega.Gomega, snapshot []gleak.Goroutine) {
g.Eventually(gleak.Goroutines).WithTimeout(time.Minute).ShouldNot(gleak.HaveLeaked(snapshot))
}
func TestMain(m *testing.M) {
g, snap := SetupLeakDetection()
code := m.Run()
ShouldNotLeaked(g, snap)
os.Exit(code)
}
func accept(t *testing.T, ln net.Listener) (*redisMock, error) {
conn, err := ln.Accept()
if err != nil {
t.Error(err)
return nil, err
}
mock := &redisMock{
t: t,
buf: bufio.NewReader(conn),
conn: conn,
}
mock.Expect("HELLO", "3").
Reply(RedisMessage{
typ: '%',
values: []RedisMessage{
{typ: '+', string: "proto"},
{typ: ':', integer: 3},
},
})
mock.Expect("CLIENT", "TRACKING", "ON", "OPTIN").
ReplyString("OK")
return mock, nil
}
func TestNewClusterClient(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
done := make(chan struct{})
go func() {
mock, err := accept(t, ln)
if err != nil {
return
}
slots, _ := slotsResp.ToMessage()
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LibName).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLUSTER", "SLOTS").Reply(slots)
mock.Close()
close(done)
}()
_, port, _ := net.SplitHostPort(ln.Addr().String())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:" + port},
})
if err != nil {
t.Fatal(err)
}
defer client.Close()
if _, ok := client.(*clusterClient); !ok {
t.Fatal("client should be a clusterClient")
}
<-done
}
func TestNewClusterClientError(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
t.Run("cluster slots command error", func(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
done := make(chan struct{})
go func() {
mock, err := accept(t, ln)
if err != nil {
return
}
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LibName).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLUSTER", "SLOTS").Reply(RedisMessage{typ: '-', string: "other error"})
mock.Expect("PING").ReplyString("OK")
mock.Close()
close(done)
}()
_, port, _ := net.SplitHostPort(ln.Addr().String())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:" + port},
})
if client != nil || err == nil {
t.Errorf("unexpected return %v %v", client, err)
}
<-done
})
t.Run("replica only and send to replicas option conflict", func(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
_, port, _ := net.SplitHostPort(ln.Addr().String())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:" + port},
ReplicaOnly: true,
SendToReplicas: func(cmd Completed) bool {
return true
},
})
if client != nil || err == nil {
t.Errorf("unexpected return %v %v", client, err)
}
if !strings.Contains(err.Error(), ErrReplicaOnlyConflict.Error()) {
t.Errorf("unexpected error %v", err)
}
})
t.Run("replica only and replica selector option conflict", func(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
_, port, _ := net.SplitHostPort(ln.Addr().String())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:" + port},
ReplicaOnly: true,
ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int {
return 0
},
})
if client != nil || err == nil {
t.Errorf("unexpected return %v %v", client, err)
}
if !strings.Contains(err.Error(), ErrReplicaOnlyConflictWithReplicaSelector.Error()) {
t.Errorf("unexpected error %v", err)
}
})
t.Run("send to replicas should be set when replica selector is set", func(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
_, port, _ := net.SplitHostPort(ln.Addr().String())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:" + port},
ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int {
return 0
},
})
if client != nil || err == nil {
t.Errorf("unexpected return %v %v", client, err)
}
if !strings.Contains(err.Error(), ErrSendToReplicasNotSet.Error()) {
t.Errorf("unexpected error %v", err)
}
})
}
func TestFallBackSingleClient(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
done := make(chan struct{})
go func() {
mock, err := accept(t, ln)
if err != nil {
return
}
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LibName).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLUSTER", "SLOTS").Reply(RedisMessage{typ: '-', string: "ERR This instance has cluster support disabled"})
mock.Expect("PING").ReplyString("OK")
mock.Close()
close(done)
}()
_, port, _ := net.SplitHostPort(ln.Addr().String())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:" + port},
})
if err != nil {
t.Fatal(err)
}
if _, ok := client.(*singleClient); !ok {
t.Fatal("client should be a singleClient")
}
client.Close()
<-done
}
func TestForceSingleClient(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer ln.Close()
done := make(chan struct{})
go func() {
mock, err := accept(t, ln)
if err != nil {
return
}
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LibName).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("PING").ReplyString("OK")
mock.Close()
close(done)
}()
_, port, _ := net.SplitHostPort(ln.Addr().String())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:" + port},
ForceSingleClient: true,
})
if err != nil {
t.Fatal(err)
}
if _, ok := client.(*singleClient); !ok {
t.Fatal("client should be a singleClient")
}
client.Close()
<-done
}
func TestTLSClient(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
pub, priv, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("Failed to generate private key: %v", err)
}
serialNumber, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128))
if err != nil {
t.Fatalf("Failed to generate serial number: %v", err)
}
template := x509.Certificate{
SerialNumber: serialNumber,
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour),
KeyUsage: x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true,
DNSNames: []string{"localhost"},
Subject: pkix.Name{Organization: []string{"Acme Co"}},
}
derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, pub, priv)
if err != nil {
t.Fatalf("Failed to create certificate: %v", err)
}
certPem := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
privBytes, err := x509.MarshalPKCS8PrivateKey(priv)
if err != nil {
t.Fatalf("Unable to marshal private key: %v", err)
}
privPem := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
cert, err := tls.X509KeyPair(certPem, privPem)
if err != nil {
t.Fatalf("Fail to load X509KeyPair: %v", err)
}
config := &tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: true,
}
ln, err := tls.Listen("tcp", "127.0.0.1:0", config)
if err != nil {
t.Fatal(err)
}
defer ln.Close()
done := make(chan struct{})
go func() {
mock, err := accept(t, ln)
if err != nil {
return
}
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LibName).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLUSTER", "SLOTS").Reply(RedisMessage{typ: '-', string: "ERR This instance has cluster support disabled"})
mock.Expect("PING").ReplyString("OK")
mock.Close()
close(done)
}()
_, port, _ := net.SplitHostPort(ln.Addr().String())
client, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:" + port},
TLSConfig: config,
DisableTCPNoDelay: true,
})
if err != nil {
t.Fatal(err)
}
if _, ok := client.(*singleClient); !ok {
t.Fatal("client should be a singleClient")
}
client.Close()
<-done
}
func TestNewClientMaxMultiplex(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
_, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
PipelineMultiplex: MaxPipelineMultiplex + 1,
})
if err != ErrWrongPipelineMultiplex {
t.Fatalf("unexpected error %v", err)
}
}
func TestSingleClientMultiplex(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
option := ClientOption{}
if v := singleClientMultiplex(option.PipelineMultiplex); v != 2 {
t.Fatalf("unexpected value %v", v)
}
option.PipelineMultiplex = -1
if v := singleClientMultiplex(option.PipelineMultiplex); v != 0 {
t.Fatalf("unexpected value %v", v)
}
}
func TestCustomDialFnIsCalled(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
isFnCalled := false
option := ClientOption{
InitAddress: []string{"127.0.0.1:0"},
DialFn: func(s string, dialer *net.Dialer, config *tls.Config) (conn net.Conn, err error) {
isFnCalled = true
return nil, errors.New("dial error")
},
}
_, err := NewClient(option)
if !isFnCalled {
t.Fatalf("excepted ClientOption.DialFn to be called")
}
if err == nil {
t.Fatalf("expected dial error")
}
}
func ExampleIsRedisNil() {
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
panic(err)
}
defer client.Close()
_, err = client.Do(context.Background(), client.B().Get().Key("not_exists").Build()).ToString()
if err != nil && IsRedisNil(err) {
fmt.Printf("it is a nil response")
}
}
func ExampleClient_do() {
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
client.Do(ctx, client.B().Set().Key("k").Value("1").Build()).Error()
client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()
client.Do(ctx, client.B().Hmget().Key("h").Field("a", "b").Build()).ToArray()
client.Do(ctx, client.B().Scard().Key("s").Build()).ToInt64()
client.Do(ctx, client.B().Smembers().Key("s").Build()).AsStrSlice()
}
func ExampleClient_doCache() {
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
client.DoCache(ctx, client.B().Get().Key("k").Cache(), time.Minute).ToString()
client.DoCache(ctx, client.B().Get().Key("k").Cache(), time.Minute).AsInt64()
client.DoCache(ctx, client.B().Hmget().Key("h").Field("a", "b").Cache(), time.Minute).ToArray()
client.DoCache(ctx, client.B().Scard().Key("s").Cache(), time.Minute).ToInt64()
client.DoCache(ctx, client.B().Smembers().Key("s").Cache(), time.Minute).AsStrSlice()
}
func ExampleClient_scan() {
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
panic(err)
}
defer client.Close()
for _, c := range client.Nodes() { // loop over all your redis nodes
var scan ScanEntry
for more := true; more; more = scan.Cursor != 0 {
if scan, err = c.Do(context.Background(), c.B().Scan().Cursor(scan.Cursor).Build()).AsScanEntry(); err != nil {
panic(err)
}
fmt.Println(scan.Elements)
}
}
}
func ExampleClient_dedicatedCAS() {
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
client.Dedicated(func(client DedicatedClient) error {
// watch keys first
if err := client.Do(ctx, client.B().Watch().Key("k1", "k2").Build()).Error(); err != nil {
return err
}
// perform read here
values, err := client.Do(ctx, client.B().Mget().Key("k1", "k2").Build()).ToArray()
if err != nil {
return err
}
v1, _ := values[0].ToString()
v2, _ := values[1].ToString()
// perform write with MULTI EXEC
for _, resp := range client.DoMulti(
ctx,
client.B().Multi().Build(),
client.B().Set().Key("k1").Value(v1+"1").Build(),
client.B().Set().Key("k2").Value(v2+"2").Build(),
client.B().Exec().Build(),
) {
if err := resp.Error(); err != nil {
return err
}
}
return nil
})
}
func ExampleClient_dedicateCAS() {
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
panic(err)
}
defer client.Close()
c, cancel := client.Dedicate()
defer cancel()
ctx := context.Background()
// watch keys first
if err := c.Do(ctx, c.B().Watch().Key("k1", "k2").Build()).Error(); err != nil {
panic(err)
}
// perform read here
values, err := c.Do(ctx, c.B().Mget().Key("k1", "k2").Build()).ToArray()
if err != nil {
panic(err)
}
v1, _ := values[0].ToString()
v2, _ := values[1].ToString()
// perform write with MULTI EXEC
for _, resp := range c.DoMulti(
ctx,
c.B().Multi().Build(),
c.B().Set().Key("k1").Value(v1+"1").Build(),
c.B().Set().Key("k2").Value(v2+"2").Build(),
c.B().Exec().Build(),
) {
if err := resp.Error(); err != nil {
panic(err)
}
}
}
func ExampleNewClient_cluster() {
client, _ := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ShuffleInit: true,
})
defer client.Close()
}
func ExampleNewClient_single() {
client, _ := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
})
defer client.Close()
}
func ExampleNewClient_sentinel() {
client, _ := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"},
Sentinel: SentinelOption{
MasterSet: "my_master",
},
})
defer client.Close()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/mirrors/rueidis.git
[email protected]:mirrors/rueidis.git
mirrors
rueidis
rueidis
main

搜索帮助