9 Star 60 Fork 22

Gitee 极速下载/gnet

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/panjf2000/gnet
克隆/下载
gnet_test.go 58.30 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899
package gnet
import (
"bufio"
"bytes"
"context"
crand "crypto/rand"
"encoding/binary"
"errors"
"io"
"math"
"math/rand"
"net"
"path/filepath"
"runtime"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
)
var (
datagramLen = 1024
streamLen = 1024 * 1024
)
type testConf struct {
et bool
etChunk int
reuseport bool
multicore bool
async bool
writev bool
clients int
lb LoadBalancing
}
func TestServer(t *testing.T) {
// start an engine
// connect 10 clients
// each client will pipe random data for 1-3 seconds.
// the writes to the engine will be random sizes. 0KB - 1MB.
// the engine will echo back the data.
// waits for graceful connection closing.
t.Run("poll-LT", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{false, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{false, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{false, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{false, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("tcp-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{false, 0, false, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{false, 0, false, true, true, true, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9991"}, &testConf{false, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9992"}, &testConf{false, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9991"}, &testConf{false, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9992"}, &testConf{false, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{false, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{false, 0, false, true, false, false, 10, SourceAddrHash})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{false, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{false, 0, false, true, true, false, 10, SourceAddrHash})
})
})
t.Run("unix-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{false, 0, false, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{false, 0, false, true, true, true, 10, SourceAddrHash})
})
})
})
t.Run("poll-ET", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{true, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{true, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{true, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{true, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("tcp-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{true, 0, false, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{true, 0, false, true, true, true, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9991"}, &testConf{true, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9992"}, &testConf{true, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9991"}, &testConf{true, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9992"}, &testConf{true, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{true, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{true, 0, false, true, false, false, 10, SourceAddrHash})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{true, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{true, 0, false, true, true, false, 10, SourceAddrHash})
})
})
t.Run("unix-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{true, 0, false, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{true, 0, false, true, true, true, 10, SourceAddrHash})
})
})
})
t.Run("poll-ET-chunk", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{true, 1 << 18, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{true, 1 << 19, false, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{true, 1 << 18, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{true, 1 << 19, false, true, true, false, 10, LeastConnections})
})
})
t.Run("tcp-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{true, 1 << 18, false, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{true, 1 << 19, false, true, true, true, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9991"}, &testConf{true, 1 << 18, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9992"}, &testConf{true, 1 << 19, false, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9991"}, &testConf{true, 1 << 18, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9992"}, &testConf{true, 1 << 19, false, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{true, 1 << 18, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{true, 1 << 19, false, true, false, false, 10, SourceAddrHash})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{true, 1 << 18, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{true, 1 << 19, false, true, true, false, 10, SourceAddrHash})
})
})
t.Run("unix-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{true, 1 << 18, false, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{true, 1 << 19, false, true, true, true, 10, SourceAddrHash})
})
})
})
t.Run("poll-reuseport-LT", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{false, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{false, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("tcp-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{false, 0, true, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{false, 0, true, true, true, true, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9991"}, &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9992"}, &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9991"}, &testConf{false, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9992"}, &testConf{false, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{false, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{false, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("unix-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{false, 0, true, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{false, 0, true, true, true, true, 10, LeastConnections})
})
})
})
t.Run("poll-reuseport-ET", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{true, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{true, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("tcp-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991"}, &testConf{true, 0, true, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9992"}, &testConf{true, 0, true, true, true, true, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9991"}, &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9992"}, &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9991"}, &testConf{true, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"udp://:9992"}, &testConf{true, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{true, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{true, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("unix-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet1.sock"}, &testConf{true, 0, true, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"unix://gnet2.sock"}, &testConf{true, 0, true, true, true, true, 10, LeastConnections})
})
})
})
t.Run("poll-multi-addrs-LT", func(t *testing.T) {
t.Run("sync", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("sync-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, false, false, false, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, false, true, false, true, 10, LeastConnections})
})
})
t.Run("async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, false, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, false, true, true, true, 10, LeastConnections})
})
})
})
t.Run("poll-multi-addrs-reuseport-LT", func(t *testing.T) {
t.Run("sync", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("sync-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, true, false, false, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, true, true, false, true, 10, LeastConnections})
})
})
t.Run("async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, true, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{false, 0, true, true, true, true, 10, LeastConnections})
})
})
})
t.Run("poll-multi-addrs-ET", func(t *testing.T) {
t.Run("sync", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("sync-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, false, false, false, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, false, true, false, true, 10, LeastConnections})
})
})
t.Run("async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, false, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, false, true, true, true, 10, LeastConnections})
})
})
})
t.Run("poll-multi-addrs-reuseport-ET", func(t *testing.T) {
t.Run("sync", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("sync-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, true, false, false, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, true, true, false, true, 10, LeastConnections})
})
})
t.Run("async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, true, false, true, true, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, &testConf{true, 0, true, true, true, true, 10, LeastConnections})
})
})
})
}
type testServer struct {
*BuiltinEventEngine
tester *testing.T
eng Engine
addrs []string
multicore bool
async bool
writev bool
nclients int
started int32
connected int32
disconnected int32
clientActive int32
workerPool *goPool.Pool
}
func (s *testServer) OnBoot(eng Engine) (action Action) {
s.eng = eng
fd, err := s.eng.Dup()
if len(s.addrs) > 1 {
assert.ErrorIsf(s.tester, err, errorx.ErrUnsupportedOp, "dup error")
} else {
require.NoErrorf(s.tester, err, "dup error")
assert.Greaterf(s.tester, fd, 2, "expected fd: > 2, but got: %d", fd)
assert.NoErrorf(s.tester, SysClose(fd), "close fd error")
}
return
}
func (s *testServer) OnOpen(c Conn) (out []byte, action Action) {
c.SetContext(c)
atomic.AddInt32(&s.connected, 1)
out = []byte("sweetness\r\n")
require.NotNil(s.tester, c.LocalAddr(), "nil local addr")
require.NotNil(s.tester, c.RemoteAddr(), "nil remote addr")
return
}
func (s *testServer) OnShutdown(_ Engine) {
fd, err := s.eng.Dup()
if len(s.addrs) > 1 {
assert.ErrorIsf(s.tester, err, errorx.ErrUnsupportedOp, "dup error")
} else {
require.NoErrorf(s.tester, err, "dup error")
assert.Greaterf(s.tester, fd, 2, "expected fd: > 2, but got: %d", fd)
assert.NoErrorf(s.tester, SysClose(fd), "close fd error")
}
}
func (s *testServer) OnClose(c Conn, err error) (action Action) {
if err != nil {
logging.Debugf("error occurred on closed, %v\n", err)
}
require.Equal(s.tester, c.Context(), c, "invalid context")
atomic.AddInt32(&s.disconnected, 1)
return
}
func (s *testServer) OnTraffic(c Conn) (action Action) {
if s.async {
buf := bbPool.Get()
_, _ = c.WriteTo(buf)
if c.LocalAddr().Network() == "tcp" || c.LocalAddr().Network() == "unix" {
// just for test
_ = c.InboundBuffered()
_ = c.OutboundBuffered()
_, _ = c.Discard(1)
_ = s.workerPool.Submit(
func() {
if s.writev {
mid := buf.Len() / 2
bs := make([][]byte, 2)
bs[0] = buf.B[:mid]
bs[1] = buf.B[mid:]
_ = c.AsyncWritev(bs, func(c Conn, err error) error {
if c.RemoteAddr() != nil {
logging.Debugf("conn=%s done writev: %v", c.RemoteAddr().String(), err)
}
bbPool.Put(buf)
return nil
})
} else {
_ = c.AsyncWrite(buf.Bytes(), func(c Conn, err error) error {
if c.RemoteAddr() != nil {
logging.Debugf("conn=%s done write: %v", c.RemoteAddr().String(), err)
}
bbPool.Put(buf)
return nil
})
}
})
return
} else if c.LocalAddr().Network() == "udp" {
_ = s.workerPool.Submit(
func() {
_ = c.AsyncWrite(buf.Bytes(), nil)
})
return
}
return
}
buf, _ := c.Next(-1)
if s.writev {
mid := len(buf) / 2
_, _ = c.Writev([][]byte{buf[:mid], buf[mid:]})
} else {
_, _ = c.Write(buf)
}
// Only for code coverage of testing.
if !s.multicore {
assert.NoErrorf(s.tester, c.Flush(), "flush error")
_ = c.Fd()
fd, err := c.Dup()
require.NoErrorf(s.tester, err, "dup error")
assert.Greaterf(s.tester, fd, 2, "expected fd: > 2, but got: %d", fd)
assert.NoErrorf(s.tester, SysClose(fd), "close error")
// TODO(panjf2000): somehow these two system calls will fail with Unix Domain Socket,
// returning "invalid argument" error on macOS in Github actions intermittently,
// try to figure it out.
if c.LocalAddr().Network() == "unix" && runtime.GOOS == "darwin" {
_ = c.SetReadBuffer(streamLen)
_ = c.SetWriteBuffer(streamLen)
} else {
assert.NoErrorf(s.tester, c.SetReadBuffer(streamLen), "set read buffer error")
assert.NoErrorf(s.tester, c.SetWriteBuffer(streamLen), "set write buffer error")
}
if c.LocalAddr().Network() == "tcp" {
assert.NoErrorf(s.tester, c.SetLinger(1), "set linger error")
assert.NoErrorf(s.tester, c.SetNoDelay(false), "set no delay error")
assert.NoErrorf(s.tester, c.SetKeepAlivePeriod(time.Minute), "set keep alive period error")
}
}
return
}
func (s *testServer) OnTick() (delay time.Duration, action Action) {
delay = 100 * time.Millisecond
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
for _, protoAddr := range s.addrs {
proto, addr, err := parseProtoAddr(protoAddr)
assert.NoError(s.tester, err)
for i := 0; i < s.nclients; i++ {
atomic.AddInt32(&s.clientActive, 1)
go func() {
startClient(s.tester, proto, addr, s.multicore, s.async)
atomic.AddInt32(&s.clientActive, -1)
}()
}
}
}
if atomic.LoadInt32(&s.clientActive) == 0 {
var streamAddrs int
for _, addr := range s.addrs {
if !strings.HasPrefix(addr, "udp") {
streamAddrs++
}
}
streamConns := s.nclients * streamAddrs
disconnected := atomic.LoadInt32(&s.disconnected)
if int(disconnected) == streamConns && disconnected == atomic.LoadInt32(&s.connected) {
action = Shutdown
s.workerPool.Release()
require.EqualValues(s.tester, 0, s.eng.CountConnections())
}
}
return
}
func runServer(t *testing.T, addrs []string, conf *testConf) {
ts := &testServer{
tester: t,
addrs: addrs,
multicore: conf.multicore,
async: conf.async,
writev: conf.writev,
nclients: conf.clients,
workerPool: goPool.Default(),
}
var err error
if len(addrs) > 1 {
err = Rotate(ts,
addrs,
WithEdgeTriggeredIO(conf.et),
WithEdgeTriggeredIOChunk(conf.etChunk),
WithLockOSThread(conf.async),
WithMulticore(conf.multicore),
WithReusePort(conf.reuseport),
WithTicker(true),
WithTCPKeepAlive(time.Minute),
WithTCPNoDelay(TCPNoDelay),
WithLoadBalancing(conf.lb))
} else {
err = Run(ts,
addrs[0],
WithEdgeTriggeredIO(conf.et),
WithEdgeTriggeredIOChunk(conf.etChunk),
WithLockOSThread(conf.async),
WithMulticore(conf.multicore),
WithReusePort(conf.reuseport),
WithTicker(true),
WithTCPKeepAlive(time.Minute),
WithTCPNoDelay(TCPDelay),
WithLoadBalancing(conf.lb))
}
assert.NoError(t, err)
}
func startClient(t *testing.T, network, addr string, multicore, async bool) {
c, err := net.Dial(network, addr)
require.NoError(t, err)
defer c.Close()
rd := bufio.NewReader(c)
if network != "udp" {
msg, err := rd.ReadBytes('\n')
require.NoError(t, err)
require.Equal(t, string(msg), "sweetness\r\n", "bad header")
}
duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 2
logging.Debugf("test duration: %v", duration)
start := time.Now()
for time.Since(start) < duration {
reqData := make([]byte, streamLen)
if network == "udp" {
reqData = reqData[:datagramLen]
}
_, err = crand.Read(reqData)
require.NoError(t, err)
_, err = c.Write(reqData)
require.NoError(t, err)
respData := make([]byte, len(reqData))
_, err = io.ReadFull(rd, respData)
require.NoError(t, err)
if !async {
// require.Equalf(t, reqData, respData, "response mismatch with protocol:%s, multi-core:%t, content of bytes: %d vs %d", network, multicore, string(reqData), string(respData))
require.Equalf(
t,
reqData,
respData,
"response mismatch with protocol:%s, multi-core:%t, length of bytes: %d vs %d",
network,
multicore,
len(reqData),
len(respData),
)
}
}
}
func TestDefaultGnetServer(*testing.T) {
svr := BuiltinEventEngine{}
svr.OnBoot(Engine{})
svr.OnOpen(nil)
svr.OnClose(nil, nil)
svr.OnTraffic(nil)
svr.OnTick()
}
type testBadAddrServer struct {
*BuiltinEventEngine
}
func (t *testBadAddrServer) OnBoot(_ Engine) (action Action) {
return Shutdown
}
func TestBadAddresses(t *testing.T) {
events := new(testBadAddrServer)
err := Run(events, "tulip://howdy")
assert.ErrorIs(t, err, errorx.ErrUnsupportedProtocol)
err = Run(events, "howdy")
assert.ErrorIs(t, err, errorx.ErrInvalidNetworkAddress)
err = Run(events, "tcp://")
assert.ErrorIs(t, err, errorx.ErrInvalidNetworkAddress)
}
func TestTick(t *testing.T) {
testTick("tcp", ":9989", t)
}
type testTickServer struct {
*BuiltinEventEngine
count int
}
func (t *testTickServer) OnTick() (delay time.Duration, action Action) {
delay = time.Millisecond * 10
if t.count == 25 {
action = Shutdown
return
}
t.count++
return
}
func testTick(network, addr string, t *testing.T) {
events := &testTickServer{}
start := time.Now()
opts := Options{Ticker: true}
err := Run(events, network+"://"+addr, WithOptions(opts))
assert.NoError(t, err)
dur := time.Since(start)
if dur < 250&time.Millisecond || dur > time.Second {
t.Logf("bad ticker timing: %d", dur)
}
}
func TestWakeConn(t *testing.T) {
testWakeConn(t, "tcp", ":9990")
}
type testWakeConnServer struct {
*BuiltinEventEngine
tester *testing.T
network string
addr string
conn chan Conn
c Conn
wake bool
}
func (t *testWakeConnServer) OnOpen(c Conn) (out []byte, action Action) {
t.conn <- c
return
}
func (t *testWakeConnServer) OnClose(Conn, error) (action Action) {
action = Shutdown
return
}
func (t *testWakeConnServer) OnTraffic(c Conn) (action Action) {
_, _ = c.Write([]byte("Waking up."))
action = -1
return
}
func (t *testWakeConnServer) OnTick() (delay time.Duration, action Action) {
if !t.wake {
t.wake = true
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
r := make([]byte, 10)
_, err = conn.Read(r)
require.NoError(t.tester, err)
}()
return
}
t.c = <-t.conn
_ = t.c.Wake(func(c Conn, err error) error {
logging.Debugf("conn=%s done wake: %v", c.RemoteAddr().String(), err)
return nil
})
delay = time.Millisecond * 100
return
}
func testWakeConn(t *testing.T, network, addr string) {
currentLogger, currentFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher()
t.Cleanup(func() {
logging.SetDefaultLoggerAndFlusher(currentLogger, currentFlusher) // restore
})
svr := &testWakeConnServer{tester: t, network: network, addr: addr, conn: make(chan Conn, 1)}
logger := zap.NewExample()
err := Run(svr, network+"://"+addr,
WithTicker(true),
WithNumEventLoop(2*runtime.NumCPU()),
WithLogger(logger.Sugar()),
WithSocketRecvBuffer(4*1024),
WithSocketSendBuffer(4*1024),
WithReadBufferCap(2000),
WithWriteBufferCap(2000))
assert.NoError(t, err)
_ = logger.Sync()
}
func TestShutdown(t *testing.T) {
testShutdown(t, "tcp", ":9991")
}
type testShutdownServer struct {
*BuiltinEventEngine
tester *testing.T
eng Engine
network string
addr string
count int
clients int32
N int
}
func (t *testShutdownServer) OnBoot(eng Engine) (action Action) {
t.eng = eng
return
}
func (t *testShutdownServer) OnOpen(Conn) (out []byte, action Action) {
require.EqualValues(t.tester, atomic.AddInt32(&t.clients, 1), t.eng.CountConnections())
return
}
func (t *testShutdownServer) OnClose(Conn, error) (action Action) {
atomic.AddInt32(&t.clients, -1)
return
}
func (t *testShutdownServer) OnTick() (delay time.Duration, action Action) {
if t.count == 0 {
// start clients
for i := 0; i < t.N; i++ {
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
_, err = conn.Read([]byte{0})
require.Error(t.tester, err)
}()
}
} else if int(atomic.LoadInt32(&t.clients)) == t.N {
action = Shutdown
}
t.count++
delay = time.Second / 20
return
}
func testShutdown(t *testing.T, network, addr string) {
currentLogger, currentFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher()
t.Cleanup(func() {
logging.SetDefaultLoggerAndFlusher(currentLogger, currentFlusher) // restore
})
events := &testShutdownServer{tester: t, network: network, addr: addr, N: 100}
logPath := filepath.Join(t.TempDir(), "gnet-test-shutdown.log")
err := Run(events, network+"://"+addr,
WithLogPath(logPath),
WithLogLevel(logging.WarnLevel),
WithTicker(true),
WithReadBufferCap(512),
WithWriteBufferCap(512))
assert.NoError(t, err)
require.Equal(t, 0, int(events.clients), "did not close all clients")
}
func TestCloseActionError(t *testing.T) {
testCloseActionError(t, "tcp", ":9992")
}
type testCloseActionErrorServer struct {
*BuiltinEventEngine
tester *testing.T
network, addr string
action bool
}
func (t *testCloseActionErrorServer) OnClose(Conn, error) (action Action) {
action = Shutdown
return
}
func (t *testCloseActionErrorServer) OnTraffic(c Conn) (action Action) {
n := c.InboundBuffered()
buf := make([]byte, n)
m, err := c.Read(buf)
assert.NoError(t.tester, err)
assert.EqualValuesf(t.tester, n, m, "read %d bytes, expected %d", m, n)
n, err = c.Write(buf)
assert.NoError(t.tester, err)
assert.EqualValuesf(t.tester, m, n, "wrote %d bytes, expected %d", n, m)
action = Close
return
}
func (t *testCloseActionErrorServer) OnTick() (delay time.Duration, action Action) {
if !t.action {
t.action = true
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
data := []byte("Hello World!")
_, _ = conn.Write(data)
_, err = conn.Read(data)
require.NoError(t.tester, err)
}()
return
}
delay = time.Millisecond * 100
return
}
func testCloseActionError(t *testing.T, network, addr string) {
events := &testCloseActionErrorServer{tester: t, network: network, addr: addr}
err := Run(events, network+"://"+addr, WithTicker(true))
assert.NoError(t, err)
}
func TestShutdownActionError(t *testing.T) {
testShutdownActionError(t, "tcp", ":9993")
}
type testShutdownActionErrorServer struct {
*BuiltinEventEngine
tester *testing.T
network, addr string
action bool
}
func (t *testShutdownActionErrorServer) OnTraffic(c Conn) (action Action) {
buf, _ := c.Peek(-1)
_, _ = c.Write(buf)
_, _ = c.Discard(-1)
action = Shutdown
return
}
func (t *testShutdownActionErrorServer) OnTick() (delay time.Duration, action Action) {
if !t.action {
t.action = true
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
data := []byte("Hello World!")
_, _ = conn.Write(data)
_, err = conn.Read(data)
require.NoError(t.tester, err)
}()
return
}
delay = time.Millisecond * 100
return
}
func testShutdownActionError(t *testing.T, network, addr string) {
events := &testShutdownActionErrorServer{tester: t, network: network, addr: addr}
err := Run(events, network+"://"+addr, WithTicker(true))
assert.NoError(t, err)
}
func TestCloseActionOnOpen(t *testing.T) {
testCloseActionOnOpen(t, "tcp", ":9994")
}
type testCloseActionOnOpenServer struct {
*BuiltinEventEngine
tester *testing.T
network, addr string
action bool
}
func (t *testCloseActionOnOpenServer) OnOpen(Conn) (out []byte, action Action) {
action = Close
return
}
func (t *testCloseActionOnOpenServer) OnClose(Conn, error) (action Action) {
action = Shutdown
return
}
func (t *testCloseActionOnOpenServer) OnTick() (delay time.Duration, action Action) {
if !t.action {
t.action = true
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
}()
return
}
delay = time.Millisecond * 100
return
}
func testCloseActionOnOpen(t *testing.T, network, addr string) {
events := &testCloseActionOnOpenServer{tester: t, network: network, addr: addr}
err := Run(events, network+"://"+addr, WithTicker(true))
assert.NoError(t, err)
}
func TestShutdownActionOnOpen(t *testing.T) {
testShutdownActionOnOpen(t, "tcp", ":9995")
}
type testShutdownActionOnOpenServer struct {
*BuiltinEventEngine
tester *testing.T
network, addr string
action bool
eng Engine
}
func (t *testShutdownActionOnOpenServer) OnOpen(Conn) (out []byte, action Action) {
action = Shutdown
return
}
func (t *testShutdownActionOnOpenServer) OnShutdown(e Engine) {
t.eng = e
fd, err := t.eng.Dup()
assert.Greaterf(t.tester, fd, 2, "expected fd: > 2, but got: %d", fd)
require.NoErrorf(t.tester, err, "dup error")
assert.NoErrorf(t.tester, SysClose(fd), "close error")
logging.Debugf("dup fd: %d with error: %v\n", fd, err)
}
func (t *testShutdownActionOnOpenServer) OnTick() (delay time.Duration, action Action) {
if !t.action {
t.action = true
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
}()
return
}
delay = time.Millisecond * 100
return
}
func testShutdownActionOnOpen(t *testing.T, network, addr string) {
events := &testShutdownActionOnOpenServer{tester: t, network: network, addr: addr}
err := Run(events, network+"://"+addr, WithTicker(true))
assert.NoError(t, err)
_, err = events.eng.Dup()
assert.ErrorIsf(t, err, errorx.ErrEngineInShutdown, "expected error: %v, but got: %v",
errorx.ErrEngineInShutdown, err)
}
func TestUDPShutdown(t *testing.T) {
testUDPShutdown(t, "udp4", ":9000")
}
type testUDPShutdownServer struct {
*BuiltinEventEngine
tester *testing.T
network string
addr string
tick bool
}
func (t *testUDPShutdownServer) OnTraffic(c Conn) (action Action) {
buf, _ := c.Peek(-1)
_, _ = c.Write(buf)
_, _ = c.Discard(-1)
action = Shutdown
return
}
func (t *testUDPShutdownServer) OnTick() (delay time.Duration, action Action) {
if !t.tick {
t.tick = true
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
data := []byte("Hello World!")
_, err = conn.Write(data)
require.NoError(t.tester, err)
_, err = conn.Read(data)
require.NoError(t.tester, err)
}()
return
}
delay = time.Millisecond * 100
return
}
func testUDPShutdown(t *testing.T, network, addr string) {
svr := &testUDPShutdownServer{tester: t, network: network, addr: addr}
err := Run(svr, network+"://"+addr, WithTicker(true))
assert.NoError(t, err)
}
func TestCloseConnection(t *testing.T) {
testCloseConnection(t, "tcp", ":9996")
}
type testCloseConnectionServer struct {
*BuiltinEventEngine
tester *testing.T
network, addr string
action bool
}
func (t *testCloseConnectionServer) OnClose(Conn, error) (action Action) {
action = Shutdown
return
}
func (t *testCloseConnectionServer) OnTraffic(c Conn) (action Action) {
buf, _ := c.Peek(-1)
_, _ = c.Write(buf)
_, _ = c.Discard(-1)
go func() {
time.Sleep(time.Second)
_ = c.CloseWithCallback(func(_ Conn, err error) error {
assert.ErrorIsf(t.tester, err, errorx.ErrEngineShutdown, "should be engine shutdown error")
return nil
})
}()
return
}
func (t *testCloseConnectionServer) OnTick() (delay time.Duration, action Action) {
delay = time.Millisecond * 100
if !t.action {
t.action = true
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
data := []byte("Hello World!")
_, _ = conn.Write(data)
_, err = conn.Read(data)
require.NoError(t.tester, err)
// waiting the engine shutdown.
_, err = conn.Read(data)
require.Error(t.tester, err)
}()
return
}
return
}
func testCloseConnection(t *testing.T, network, addr string) {
events := &testCloseConnectionServer{tester: t, network: network, addr: addr}
err := Run(events, network+"://"+addr, WithTicker(true))
assert.NoError(t, err)
}
func TestServerOptionsCheck(t *testing.T) {
err := Run(&BuiltinEventEngine{}, "tcp://:3500", WithNumEventLoop(10001), WithLockOSThread(true))
assert.EqualError(t, err, errorx.ErrTooManyEventLoopThreads.Error(), "error returned with LockOSThread option")
}
func TestStopServer(t *testing.T) {
testStop(t, "tcp", ":9997")
}
type testStopServer struct {
*BuiltinEventEngine
tester *testing.T
network, addr, protoAddr string
eng Engine
action bool
}
func (t *testStopServer) OnBoot(eng Engine) (action Action) {
t.eng = eng
return
}
func (t *testStopServer) OnClose(Conn, error) (action Action) {
logging.Debugf("closing connection...")
return
}
func (t *testStopServer) OnTraffic(c Conn) (action Action) {
buf, _ := c.Peek(-1)
_, _ = c.Write(buf)
_, _ = c.Discard(-1)
return
}
func (t *testStopServer) OnTick() (delay time.Duration, action Action) {
delay = time.Millisecond * 100
if !t.action {
t.action = true
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
data := []byte("Hello World!")
_, _ = conn.Write(data)
_, err = conn.Read(data)
require.NoError(t.tester, err)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
logging.Debugf("stop engine...", t.eng.Stop(ctx))
}()
// waiting the engine shutdown.
_, err = conn.Read(data)
require.Error(t.tester, err)
}()
return
}
return
}
func testStop(t *testing.T, network, addr string) {
events := &testStopServer{tester: t, network: network, addr: addr, protoAddr: network + "://" + addr}
err := Run(events, events.protoAddr, WithTicker(true))
assert.NoError(t, err)
}
func TestEngineStop(t *testing.T) {
testEngineStop(t, "tcp", ":9998")
}
type testStopEngine struct {
*BuiltinEventEngine
tester *testing.T
network, addr, protoAddr string
eng Engine
stopIter int64
name string
exchngCount int64
}
func (t *testStopEngine) OnBoot(eng Engine) (action Action) {
t.eng = eng
return
}
func (t *testStopEngine) OnClose(Conn, error) (action Action) {
logging.Debugf("closing connection...")
return
}
func (t *testStopEngine) OnTraffic(c Conn) (action Action) {
buf, _ := c.Peek(-1)
_, _ = c.Write(buf)
_, _ = c.Discard(-1)
atomic.AddInt64(&t.exchngCount, 1)
return
}
func (t *testStopEngine) OnTick() (delay time.Duration, action Action) {
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
data := []byte("Hello World! " + t.name)
_, _ = conn.Write(data)
_, err = conn.Read(data)
require.NoError(t.tester, err)
iter := atomic.LoadInt64(&t.stopIter)
if iter <= 0 {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
logging.Debugf("stop engine...", t.eng.Stop(ctx))
// waiting the engine shutdown.
_, err = conn.Read(data)
require.Error(t.tester, err)
}
atomic.AddInt64(&t.stopIter, -1)
}()
return
}
func testEngineStop(t *testing.T, network, addr string) {
events1 := &testStopEngine{tester: t, network: network, addr: addr, protoAddr: network + "://" + addr, name: "1", stopIter: 2}
events2 := &testStopEngine{tester: t, network: network, addr: addr, protoAddr: network + "://" + addr, name: "2", stopIter: 5}
result1 := make(chan error, 1)
go func() {
err := Run(events1, events1.protoAddr, WithTicker(true), WithReuseAddr(true), WithReusePort(true))
result1 <- err
}()
// ensure the first handler processes before starting the next since the delay per tick is 100ms
time.Sleep(150 * time.Millisecond)
result2 := make(chan error, 1)
go func() {
err := Run(events2, events2.protoAddr, WithTicker(true), WithReuseAddr(true), WithReusePort(true))
result2 <- err
}()
err := <-result1
assert.NoError(t, err)
err = <-result2
assert.NoError(t, err)
// make sure that each handler processed at least 1
require.Greater(t, events1.exchngCount, int64(0))
require.Greater(t, events2.exchngCount, int64(0))
require.Equal(t, int64(2+1+5+1), events1.exchngCount+events2.exchngCount)
// stop an already stopped engine
require.Equal(t, errorx.ErrEngineInShutdown, events1.eng.Stop(context.Background()))
}
// Test should not panic when we wake-up server_closed conn.
func TestClosedWakeUp(t *testing.T) {
events := &testClosedWakeUpServer{
tester: t,
BuiltinEventEngine: &BuiltinEventEngine{}, network: "tcp", addr: ":9999", protoAddr: "tcp://:9999",
clientClosed: make(chan struct{}),
serverClosed: make(chan struct{}),
wakeup: make(chan struct{}),
}
err := Run(events, events.protoAddr)
assert.NoError(t, err)
}
type testClosedWakeUpServer struct {
*BuiltinEventEngine
tester *testing.T
network, addr, protoAddr string
wakeup chan struct{}
serverClosed chan struct{}
clientClosed chan struct{}
}
func (s *testClosedWakeUpServer) OnBoot(eng Engine) (action Action) {
go func() {
c, err := net.Dial(s.network, s.addr)
require.NoError(s.tester, err)
_, err = c.Write([]byte("hello"))
require.NoError(s.tester, err)
<-s.wakeup
_, err = c.Write([]byte("hello again"))
require.NoError(s.tester, err)
close(s.clientClosed)
<-s.serverClosed
logging.Debugf("stop engine...", eng.Stop(context.TODO()))
}()
return None
}
func (s *testClosedWakeUpServer) OnTraffic(c Conn) Action {
assert.NotNil(s.tester, c.RemoteAddr())
select {
case <-s.wakeup:
default:
close(s.wakeup)
}
go func() { require.NoError(s.tester, c.Wake(nil)) }()
go func() { require.NoError(s.tester, c.Close()) }()
<-s.clientClosed
_, _ = c.Write([]byte("answer"))
return None
}
func (s *testClosedWakeUpServer) OnClose(Conn, error) (action Action) {
select {
case <-s.serverClosed:
default:
close(s.serverClosed)
}
return
}
type testMultiInstLoggerRaceServer struct {
*BuiltinEventEngine
}
func (t *testMultiInstLoggerRaceServer) OnBoot(_ Engine) (action Action) {
return Shutdown
}
func TestMultiInstLoggerRace(t *testing.T) {
currentLogger, currentFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher()
t.Cleanup(func() {
logging.SetDefaultLoggerAndFlusher(currentLogger, currentFlusher) // restore
})
logger1, _ := zap.NewDevelopment()
events1 := new(testMultiInstLoggerRaceServer)
g := errgroup.Group{}
g.Go(func() error {
err := Run(events1, "tulip://howdy", WithLogger(logger1.Sugar()))
return err
})
logger2, _ := zap.NewDevelopment()
events2 := new(testMultiInstLoggerRaceServer)
g.Go(func() error {
err := Run(events2, "tulip://howdy", WithLogger(logger2.Sugar()))
return err
})
assert.ErrorIs(t, g.Wait(), errorx.ErrUnsupportedProtocol)
}
type testDisconnectedAsyncWriteServer struct {
BuiltinEventEngine
tester *testing.T
addr string
writev, clientStarted bool
exit atomic.Bool
}
func (t *testDisconnectedAsyncWriteServer) OnTraffic(c Conn) Action {
_, err := c.Next(0)
require.NoErrorf(t.tester, err, "c.Next error: %v", err)
go func() {
for range time.Tick(100 * time.Millisecond) {
if t.exit.Load() {
break
}
if t.writev {
err = c.AsyncWritev([][]byte{[]byte("hello"), []byte("hello")}, func(_ Conn, err error) error {
if err == nil {
return nil
}
require.ErrorIsf(t.tester, err, net.ErrClosed, "expected error: %v, but got: %v", net.ErrClosed, err)
t.exit.Store(true)
return nil
})
} else {
err = c.AsyncWrite([]byte("hello"), func(_ Conn, err error) error {
if err == nil {
return nil
}
require.ErrorIsf(t.tester, err, net.ErrClosed, "expected error: %v, but got: %v", net.ErrClosed, err)
t.exit.Store(true)
return nil
})
}
if err != nil {
return
}
}
}()
return None
}
func (t *testDisconnectedAsyncWriteServer) OnTick() (delay time.Duration, action Action) {
delay = 500 * time.Millisecond
if t.exit.Load() {
action = Shutdown
return
}
if !t.clientStarted {
t.clientStarted = true
go func() {
c, err := net.Dial("tcp", t.addr)
require.NoError(t.tester, err)
_, err = c.Write([]byte("hello"))
require.NoError(t.tester, err)
require.NoError(t.tester, c.Close())
}()
}
return
}
func TestDisconnectedAsyncWrite(t *testing.T) {
t.Run("async-write", func(t *testing.T) {
events := &testDisconnectedAsyncWriteServer{tester: t, addr: ":10000"}
err := Run(events, "tcp://:10000", WithTicker(true))
assert.NoError(t, err)
})
t.Run("async-writev", func(t *testing.T) {
events := &testDisconnectedAsyncWriteServer{tester: t, addr: ":10001", writev: true}
err := Run(events, "tcp://:10001", WithTicker(true))
assert.NoError(t, err)
})
}
var errIncompletePacket = errors.New("incomplete packet")
type simServer struct {
BuiltinEventEngine
tester *testing.T
eng Engine
network string
addr string
multicore bool
nclients int
packetSize int
batchWrite int
batchRead int
started int32
connected int32
disconnected int32
}
func (s *simServer) OnBoot(eng Engine) (action Action) {
s.eng = eng
return
}
func (s *simServer) OnOpen(c Conn) (out []byte, action Action) {
c.SetContext(&testCodec{})
atomic.AddInt32(&s.connected, 1)
out = []byte("sweetness\r\n")
require.NotNil(s.tester, c.LocalAddr(), "nil local addr")
require.NotNil(s.tester, c.RemoteAddr(), "nil remote addr")
return
}
func (s *simServer) OnClose(_ Conn, err error) (action Action) {
if err != nil {
logging.Debugf("error occurred on closed, %v\n", err)
}
atomic.AddInt32(&s.disconnected, 1)
if atomic.LoadInt32(&s.connected) == atomic.LoadInt32(&s.disconnected) &&
atomic.LoadInt32(&s.disconnected) == int32(s.nclients) {
action = Shutdown
}
return
}
func (s *simServer) OnTraffic(c Conn) (action Action) {
codec := c.Context().(*testCodec)
var packets [][]byte
for i := 0; i < s.batchRead; i++ {
data, err := codec.Decode(c)
if errors.Is(err, errIncompletePacket) {
break
}
if err != nil {
logging.Errorf("invalid packet: %v", err)
return Close
}
packet, _ := codec.Encode(data)
packets = append(packets, packet)
}
if n := len(packets); n > 1 {
_, _ = c.Writev(packets)
} else if n == 1 {
_, _ = c.Write(packets[0])
}
if len(packets) == s.batchRead && c.InboundBuffered() > 0 {
err := c.Wake(nil) // wake up the connection manually to avoid missing the leftover data
assert.NoError(s.tester, err)
}
return
}
func (s *simServer) OnTick() (delay time.Duration, action Action) {
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
for i := 0; i < s.nclients; i++ {
go func() {
runSimClient(s.tester, s.network, s.addr, s.packetSize, s.batchWrite)
}()
}
}
delay = 100 * time.Millisecond
return
}
// All current protocols.
const (
magicNumber = 1314
magicNumberSize = 2
bodySize = 4
)
var magicNumberBytes []byte
func init() {
magicNumberBytes = make([]byte, magicNumberSize)
binary.BigEndian.PutUint16(magicNumberBytes, uint16(magicNumber))
}
// Protocol format:
//
// * 0 2 6
// * +-----------+-----------------------+
// * | magic | body len |
// * +-----------+-----------+-----------+
// * | |
// * + +
// * | body bytes |
// * + +
// * | ... ... |
// * +-----------------------------------+.
type testCodec struct{}
func (codec testCodec) Encode(buf []byte) ([]byte, error) {
bodyOffset := magicNumberSize + bodySize
msgLen := bodyOffset + len(buf)
data := make([]byte, msgLen)
copy(data, magicNumberBytes)
binary.BigEndian.PutUint32(data[magicNumberSize:bodyOffset], uint32(len(buf)))
copy(data[bodyOffset:msgLen], buf)
return data, nil
}
func (codec testCodec) Decode(c Conn) ([]byte, error) {
bodyOffset := magicNumberSize + bodySize
buf, err := c.Peek(bodyOffset)
if err != nil {
if errors.Is(err, io.ErrShortBuffer) {
err = errIncompletePacket
}
return nil, err
}
if !bytes.Equal(magicNumberBytes, buf[:magicNumberSize]) {
return nil, errors.New("invalid magic number")
}
bodyLen := binary.BigEndian.Uint32(buf[magicNumberSize:bodyOffset])
msgLen := bodyOffset + int(bodyLen)
buf, err = c.Peek(msgLen)
if err != nil {
if errors.Is(err, io.ErrShortBuffer) {
err = errIncompletePacket
}
return nil, err
}
body := make([]byte, bodyLen)
copy(body, buf[bodyOffset:msgLen])
_, _ = c.Discard(msgLen)
return body, nil
}
func (codec testCodec) Unpack(buf []byte) ([]byte, error) {
bodyOffset := magicNumberSize + bodySize
if len(buf) < bodyOffset {
return nil, errIncompletePacket
}
if !bytes.Equal(magicNumberBytes, buf[:magicNumberSize]) {
return nil, errors.New("invalid magic number")
}
bodyLen := binary.BigEndian.Uint32(buf[magicNumberSize:bodyOffset])
msgLen := bodyOffset + int(bodyLen)
if len(buf) < msgLen {
return nil, errIncompletePacket
}
return buf[bodyOffset:msgLen], nil
}
func TestSimServer(t *testing.T) {
t.Run("packet-size=64,batch=200", func(t *testing.T) {
runSimServer(t, ":7200", true, 10, 64, 200, -1)
})
t.Run("packet-size=128,batch=100", func(t *testing.T) {
runSimServer(t, ":7201", false, 10, 128, 100, 10)
})
t.Run("packet-size=256,batch=50", func(t *testing.T) {
runSimServer(t, ":7202", true, 10, 256, 50, -1)
})
t.Run("packet-size=512,batch=30", func(t *testing.T) {
runSimServer(t, ":7203", false, 10, 512, 30, 3)
})
t.Run("packet-size=1024,batch=20", func(t *testing.T) {
runSimServer(t, ":7204", true, 10, 1024, 20, -1)
})
t.Run("packet-size=64*1024,batch=10", func(t *testing.T) {
runSimServer(t, ":7205", false, 10, 64*1024, 10, 1)
})
t.Run("packet-size=128*1024,batch=5", func(t *testing.T) {
runSimServer(t, ":7206", true, 10, 128*1024, 5, -1)
})
t.Run("packet-size=512*1024,batch=3", func(t *testing.T) {
runSimServer(t, ":7207", false, 10, 512*1024, 3, 1)
})
t.Run("packet-size=1024*1024,batch=2", func(t *testing.T) {
runSimServer(t, ":7208", true, 10, 1024*1024, 2, -1)
})
}
func runSimServer(t *testing.T, addr string, et bool, nclients, packetSize, batchWrite, batchRead int) {
ts := &simServer{
tester: t,
network: "tcp",
addr: addr,
multicore: true,
nclients: nclients,
packetSize: packetSize,
batchWrite: batchWrite,
batchRead: batchRead,
}
if batchRead < 0 {
ts.batchRead = math.MaxInt32 // unlimited read batch
}
err := Run(ts,
ts.network+"://"+ts.addr,
WithEdgeTriggeredIO(et),
WithMulticore(ts.multicore),
WithTicker(true),
WithTCPKeepAlive(time.Minute*1))
assert.NoError(t, err)
}
func runSimClient(t *testing.T, network, addr string, packetSize, batch int) {
c, err := net.Dial(network, addr)
require.NoError(t, err)
defer c.Close()
rd := bufio.NewReader(c)
msg, err := rd.ReadBytes('\n')
require.NoError(t, err)
require.Equal(t, string(msg), "sweetness\r\n", "bad header")
var duration time.Duration
packetBytes := packetSize * batch
switch {
case packetBytes < 16*1024:
duration = 2 * time.Second
case packetBytes < 32*1024:
duration = 3 * time.Second
case packetBytes < 480*1024:
duration = 4 * time.Second
default:
duration = 5 * time.Second
}
logging.Debugf("test duration: %v", duration)
start := time.Now()
for time.Since(start) < duration {
batchSendAndRecv(t, c, rd, packetSize, batch)
}
}
func batchSendAndRecv(t *testing.T, c net.Conn, rd *bufio.Reader, packetSize, batch int) {
codec := testCodec{}
var (
requests [][]byte
buf []byte
packetLen int
)
for i := 0; i < batch; i++ {
req := make([]byte, packetSize)
_, err := crand.Read(req)
require.NoError(t, err)
requests = append(requests, req)
packet, _ := codec.Encode(req)
packetLen = len(packet)
buf = append(buf, packet...)
}
_, err := c.Write(buf)
require.NoError(t, err)
respPacket := make([]byte, batch*packetLen)
_, err = io.ReadFull(rd, respPacket)
require.NoError(t, err)
for i, req := range requests {
rsp, err := codec.Unpack(respPacket[i*packetLen:])
require.NoError(t, err)
require.Equalf(t, req, rsp, "request and response mismatch, packet size: %d, batch: %d, round: %d",
packetSize, batch, i)
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Shell
1
https://gitee.com/mirrors/gnet.git
[email protected]:mirrors/gnet.git
mirrors
gnet
gnet
dev

搜索帮助