1 Star 0 Fork 1

greatesoft/ants

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
ants_test.go 22.54 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848
// MIT License
// Copyright (c) 2018 Andy Pan
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package ants
import (
"log"
"os"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
const (
_ = 1 << (10 * iota)
KiB // 1024
MiB // 1048576
// GiB // 1073741824
// TiB // 1099511627776 (超过了int32的范围)
// PiB // 1125899906842624
// EiB // 1152921504606846976
// ZiB // 1180591620717411303424 (超过了int64的范围)
// YiB // 1208925819614629174706176
)
const (
Param = 100
AntsSize = 1000
TestSize = 10000
n = 100000
)
var curMem uint64
// TestAntsPoolWaitToGetWorker is used to test waiting to get worker.
func TestAntsPoolWaitToGetWorker(t *testing.T) {
var wg sync.WaitGroup
p, _ := NewPool(AntsSize)
defer p.Release()
for i := 0; i < n; i++ {
wg.Add(1)
_ = p.Submit(func() {
demoPoolFunc(Param)
wg.Done()
})
}
wg.Wait()
t.Logf("pool, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}
func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) {
var wg sync.WaitGroup
p, _ := NewPool(AntsSize, WithPreAlloc(true))
defer p.Release()
for i := 0; i < n; i++ {
wg.Add(1)
_ = p.Submit(func() {
demoPoolFunc(Param)
wg.Done()
})
}
wg.Wait()
t.Logf("pool, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}
// TestAntsPoolWithFuncWaitToGetWorker is used to test waiting to get worker.
func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
var wg sync.WaitGroup
p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) {
demoPoolFunc(i)
wg.Done()
})
defer p.Release()
for i := 0; i < n; i++ {
wg.Add(1)
_ = p.Invoke(Param)
}
wg.Wait()
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}
func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) {
var wg sync.WaitGroup
p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) {
demoPoolFunc(i)
wg.Done()
}, WithPreAlloc(true))
defer p.Release()
for i := 0; i < n; i++ {
wg.Add(1)
_ = p.Invoke(Param)
}
wg.Wait()
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}
// TestAntsPoolGetWorkerFromCache is used to test getting worker from sync.Pool.
func TestAntsPoolGetWorkerFromCache(t *testing.T) {
p, _ := NewPool(TestSize)
defer p.Release()
for i := 0; i < AntsSize; i++ {
_ = p.Submit(demoFunc)
}
time.Sleep(2 * DefaultCleanIntervalTime)
_ = p.Submit(demoFunc)
t.Logf("pool, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}
// TestAntsPoolWithFuncGetWorkerFromCache is used to test getting worker from sync.Pool.
func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
dur := 10
p, _ := NewPoolWithFunc(TestSize, demoPoolFunc)
defer p.Release()
for i := 0; i < AntsSize; i++ {
_ = p.Invoke(dur)
}
time.Sleep(2 * DefaultCleanIntervalTime)
_ = p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}
func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) {
dur := 10
p, _ := NewPoolWithFunc(TestSize, demoPoolFunc, WithPreAlloc(true))
defer p.Release()
for i := 0; i < AntsSize; i++ {
_ = p.Invoke(dur)
}
time.Sleep(2 * DefaultCleanIntervalTime)
_ = p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}
//-------------------------------------------------------------------------------------------
// Contrast between goroutines without a pool and goroutines with ants pool.
//-------------------------------------------------------------------------------------------
func TestNoPool(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
demoFunc()
wg.Done()
}()
}
wg.Wait()
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}
func TestAntsPool(t *testing.T) {
defer Release()
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
_ = Submit(func() {
demoFunc()
wg.Done()
})
}
wg.Wait()
t.Logf("pool, capacity:%d", Cap())
t.Logf("pool, running workers number:%d", Running())
t.Logf("pool, free workers number:%d", Free())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}
//-------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------
func TestPanicHandler(t *testing.T) {
var panicCounter int64
var wg sync.WaitGroup
p0, err := NewPool(10, WithPanicHandler(func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p)
}))
assert.NoErrorf(t, err, "create new pool failed: %v", err)
defer p0.Release()
wg.Add(1)
_ = p0.Submit(func() {
panic("Oops!")
})
wg.Wait()
c := atomic.LoadInt64(&panicCounter)
assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c)
assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic")
p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
}))
assert.NoErrorf(t, err, "create new pool with func failed: %v", err)
defer p1.Release()
wg.Add(1)
_ = p1.Invoke("Oops!")
wg.Wait()
c = atomic.LoadInt64(&panicCounter)
assert.EqualValuesf(t, 2, c, "panic handler didn't work, panicCounter: %d", c)
assert.EqualValues(t, 0, p1.Running(), "pool should be empty after panic")
}
func TestPanicHandlerPreMalloc(t *testing.T) {
var panicCounter int64
var wg sync.WaitGroup
p0, err := NewPool(10, WithPreAlloc(true), WithPanicHandler(func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p)
}))
assert.NoErrorf(t, err, "create new pool failed: %v", err)
defer p0.Release()
wg.Add(1)
_ = p0.Submit(func() {
panic("Oops!")
})
wg.Wait()
c := atomic.LoadInt64(&panicCounter)
assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c)
assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic")
p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
}))
assert.NoErrorf(t, err, "create new pool with func failed: %v", err)
defer p1.Release()
wg.Add(1)
_ = p1.Invoke("Oops!")
wg.Wait()
c = atomic.LoadInt64(&panicCounter)
assert.EqualValuesf(t, 2, c, "panic handler didn't work, panicCounter: %d", c)
assert.EqualValues(t, 0, p1.Running(), "pool should be empty after panic")
}
func TestPoolPanicWithoutHandler(t *testing.T) {
p0, err := NewPool(10)
assert.NoErrorf(t, err, "create new pool failed: %v", err)
defer p0.Release()
_ = p0.Submit(func() {
panic("Oops!")
})
p1, err := NewPoolWithFunc(10, func(p interface{}) {
panic(p)
})
assert.NoErrorf(t, err, "create new pool with func failed: %v", err)
defer p1.Release()
_ = p1.Invoke("Oops!")
}
func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
p0, err := NewPool(10, WithPreAlloc(true))
assert.NoErrorf(t, err, "create new pool failed: %v", err)
defer p0.Release()
_ = p0.Submit(func() {
panic("Oops!")
})
p1, err := NewPoolWithFunc(10, func(p interface{}) {
panic(p)
})
assert.NoErrorf(t, err, "create new pool with func failed: %v", err)
defer p1.Release()
_ = p1.Invoke("Oops!")
}
func TestPurge(t *testing.T) {
p, err := NewPool(10)
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release()
_ = p.Submit(demoFunc)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")
p1, err := NewPoolWithFunc(10, demoPoolFunc)
assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err)
defer p1.Release()
_ = p1.Invoke(1)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")
}
func TestPurgePreMalloc(t *testing.T) {
p, err := NewPool(10, WithPreAlloc(true))
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release()
_ = p.Submit(demoFunc)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")
p1, err := NewPoolWithFunc(10, demoPoolFunc)
assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err)
defer p1.Release()
_ = p1.Invoke(1)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")
}
func TestNonblockingSubmit(t *testing.T) {
poolSize := 10
p, err := NewPool(poolSize, WithNonblocking(true))
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release()
for i := 0; i < poolSize-1; i++ {
assert.NoError(t, p.Submit(longRunningFunc), "nonblocking submit when pool is not full shouldn't return error")
}
ch := make(chan struct{})
ch1 := make(chan struct{})
f := func() {
<-ch
close(ch1)
}
// p is full now.
assert.NoError(t, p.Submit(f), "nonblocking submit when pool is not full shouldn't return error")
assert.EqualError(t, p.Submit(demoFunc), ErrPoolOverload.Error(),
"nonblocking submit when pool is full should get an ErrPoolOverload")
// interrupt f to get an available worker
close(ch)
<-ch1
assert.NoError(t, p.Submit(demoFunc), "nonblocking submit when pool is not full shouldn't return error")
}
func TestMaxBlockingSubmit(t *testing.T) {
poolSize := 10
p, err := NewPool(poolSize, WithMaxBlockingTasks(1))
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release()
for i := 0; i < poolSize-1; i++ {
assert.NoError(t, p.Submit(longRunningFunc), "submit when pool is not full shouldn't return error")
}
ch := make(chan struct{})
f := func() {
<-ch
}
// p is full now.
assert.NoError(t, p.Submit(f), "submit when pool is not full shouldn't return error")
var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
go func() {
// should be blocked. blocking num == 1
if err := p.Submit(demoFunc); err != nil {
errCh <- err
}
wg.Done()
}()
time.Sleep(1 * time.Second)
// already reached max blocking limit
assert.EqualError(t, p.Submit(demoFunc), ErrPoolOverload.Error(),
"blocking submit when pool reach max blocking submit should return ErrPoolOverload")
// interrupt f to make blocking submit successful.
close(ch)
wg.Wait()
select {
case <-errCh:
t.Fatalf("blocking submit when pool is full should not return error")
default:
}
}
func TestNonblockingSubmitWithFunc(t *testing.T) {
poolSize := 10
var wg sync.WaitGroup
p, err := NewPoolWithFunc(poolSize, func(i interface{}) {
longRunningPoolFunc(i)
wg.Done()
}, WithNonblocking(true))
assert.NoError(t, err, "create TimingPool failed: %v", err)
defer p.Release()
ch := make(chan struct{})
wg.Add(poolSize)
for i := 0; i < poolSize-1; i++ {
assert.NoError(t, p.Invoke(ch), "nonblocking submit when pool is not full shouldn't return error")
}
// p is full now.
assert.NoError(t, p.Invoke(ch), "nonblocking submit when pool is not full shouldn't return error")
assert.EqualError(t, p.Invoke(nil), ErrPoolOverload.Error(),
"nonblocking submit when pool is full should get an ErrPoolOverload")
// interrupt f to get an available worker
close(ch)
wg.Wait()
assert.NoError(t, p.Invoke(nil), "nonblocking submit when pool is not full shouldn't return error")
}
func TestMaxBlockingSubmitWithFunc(t *testing.T) {
poolSize := 10
p, err := NewPoolWithFunc(poolSize, longRunningPoolFunc, WithMaxBlockingTasks(1))
assert.NoError(t, err, "create TimingPool failed: %v", err)
defer p.Release()
for i := 0; i < poolSize-1; i++ {
assert.NoError(t, p.Invoke(Param), "submit when pool is not full shouldn't return error")
}
ch := make(chan struct{})
// p is full now.
assert.NoError(t, p.Invoke(ch), "submit when pool is not full shouldn't return error")
var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
go func() {
// should be blocked. blocking num == 1
if err := p.Invoke(Param); err != nil {
errCh <- err
}
wg.Done()
}()
time.Sleep(1 * time.Second)
// already reached max blocking limit
assert.EqualErrorf(t, p.Invoke(Param), ErrPoolOverload.Error(),
"blocking submit when pool reach max blocking submit should return ErrPoolOverload: %v", err)
// interrupt one func to make blocking submit successful.
close(ch)
wg.Wait()
select {
case <-errCh:
t.Fatalf("blocking submit when pool is full should not return error")
default:
}
}
func TestRebootDefaultPool(t *testing.T) {
defer Release()
Reboot()
var wg sync.WaitGroup
wg.Add(1)
_ = Submit(func() {
demoFunc()
wg.Done()
})
wg.Wait()
Release()
assert.EqualError(t, Submit(nil), ErrPoolClosed.Error(), "pool should be closed")
Reboot()
wg.Add(1)
assert.NoError(t, Submit(func() { wg.Done() }), "pool should be rebooted")
wg.Wait()
}
func TestRebootNewPool(t *testing.T) {
var wg sync.WaitGroup
p, err := NewPool(10)
assert.NoErrorf(t, err, "create Pool failed: %v", err)
defer p.Release()
wg.Add(1)
_ = p.Submit(func() {
demoFunc()
wg.Done()
})
wg.Wait()
p.Release()
assert.EqualError(t, p.Submit(nil), ErrPoolClosed.Error(), "pool should be closed")
p.Reboot()
wg.Add(1)
assert.NoError(t, p.Submit(func() { wg.Done() }), "pool should be rebooted")
wg.Wait()
p1, err := NewPoolWithFunc(10, func(i interface{}) {
demoPoolFunc(i)
wg.Done()
})
assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err)
defer p1.Release()
wg.Add(1)
_ = p1.Invoke(1)
wg.Wait()
p1.Release()
assert.EqualError(t, p1.Invoke(nil), ErrPoolClosed.Error(), "pool should be closed")
p1.Reboot()
wg.Add(1)
assert.NoError(t, p1.Invoke(1), "pool should be rebooted")
wg.Wait()
}
func TestInfinitePool(t *testing.T) {
c := make(chan struct{})
p, _ := NewPool(-1)
_ = p.Submit(func() {
_ = p.Submit(func() {
<-c
})
})
c <- struct{}{}
if n := p.Running(); n != 2 {
t.Errorf("expect 2 workers running, but got %d", n)
}
if n := p.Free(); n != -1 {
t.Errorf("expect -1 of free workers by unlimited pool, but got %d", n)
}
p.Tune(10)
if capacity := p.Cap(); capacity != -1 {
t.Fatalf("expect capacity: -1 but got %d", capacity)
}
var err error
_, err = NewPool(-1, WithPreAlloc(true))
if err != ErrInvalidPreAllocSize {
t.Errorf("expect ErrInvalidPreAllocSize but got %v", err)
}
}
func TestInfinitePoolWithFunc(t *testing.T) {
c := make(chan struct{})
p, _ := NewPoolWithFunc(-1, func(i interface{}) {
demoPoolFunc(i)
<-c
})
_ = p.Invoke(10)
_ = p.Invoke(10)
c <- struct{}{}
c <- struct{}{}
if n := p.Running(); n != 2 {
t.Errorf("expect 2 workers running, but got %d", n)
}
if n := p.Free(); n != -1 {
t.Errorf("expect -1 of free workers by unlimited pool, but got %d", n)
}
p.Tune(10)
if capacity := p.Cap(); capacity != -1 {
t.Fatalf("expect capacity: -1 but got %d", capacity)
}
var err error
_, err = NewPoolWithFunc(-1, demoPoolFunc, WithPreAlloc(true))
if err != ErrInvalidPreAllocSize {
t.Errorf("expect ErrInvalidPreAllocSize but got %v", err)
}
}
func TestReleaseWhenRunningPool(t *testing.T) {
var wg sync.WaitGroup
p, _ := NewPool(1)
wg.Add(2)
go func() {
t.Log("start aaa")
defer func() {
wg.Done()
t.Log("stop aaa")
}()
for i := 0; i < 30; i++ {
j := i
_ = p.Submit(func() {
t.Log("do task", j)
time.Sleep(1 * time.Second)
})
}
}()
go func() {
t.Log("start bbb")
defer func() {
wg.Done()
t.Log("stop bbb")
}()
for i := 100; i < 130; i++ {
j := i
_ = p.Submit(func() {
t.Log("do task", j)
time.Sleep(1 * time.Second)
})
}
}()
time.Sleep(3 * time.Second)
p.Release()
t.Log("wait for all goroutines to exit...")
wg.Wait()
}
func TestReleaseWhenRunningPoolWithFunc(t *testing.T) {
var wg sync.WaitGroup
p, _ := NewPoolWithFunc(1, func(i interface{}) {
t.Log("do task", i)
time.Sleep(1 * time.Second)
})
wg.Add(2)
go func() {
t.Log("start aaa")
defer func() {
wg.Done()
t.Log("stop aaa")
}()
for i := 0; i < 30; i++ {
_ = p.Invoke(i)
}
}()
go func() {
t.Log("start bbb")
defer func() {
wg.Done()
t.Log("stop bbb")
}()
for i := 100; i < 130; i++ {
_ = p.Invoke(i)
}
}()
time.Sleep(3 * time.Second)
p.Release()
t.Log("wait for all goroutines to exit...")
wg.Wait()
}
func TestRestCodeCoverage(t *testing.T) {
_, err := NewPool(-1, WithExpiryDuration(-1))
t.Log(err)
_, err = NewPool(1, WithExpiryDuration(-1))
t.Log(err)
_, err = NewPoolWithFunc(-1, demoPoolFunc, WithExpiryDuration(-1))
t.Log(err)
_, err = NewPoolWithFunc(1, demoPoolFunc, WithExpiryDuration(-1))
t.Log(err)
options := Options{}
options.ExpiryDuration = time.Duration(10) * time.Second
options.Nonblocking = true
options.PreAlloc = true
poolOpts, _ := NewPool(1, WithOptions(options))
t.Logf("Pool with options, capacity: %d", poolOpts.Cap())
p0, _ := NewPool(TestSize, WithLogger(log.New(os.Stderr, "", log.LstdFlags)))
defer func() {
_ = p0.Submit(demoFunc)
}()
defer p0.Release()
for i := 0; i < n; i++ {
_ = p0.Submit(demoFunc)
}
t.Logf("pool, capacity:%d", p0.Cap())
t.Logf("pool, running workers number:%d", p0.Running())
t.Logf("pool, free workers number:%d", p0.Free())
p0.Tune(TestSize)
p0.Tune(TestSize / 10)
t.Logf("pool, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running())
pprem, _ := NewPool(TestSize, WithPreAlloc(true))
defer func() {
_ = pprem.Submit(demoFunc)
}()
defer pprem.Release()
for i := 0; i < n; i++ {
_ = pprem.Submit(demoFunc)
}
t.Logf("pre-malloc pool, capacity:%d", pprem.Cap())
t.Logf("pre-malloc pool, running workers number:%d", pprem.Running())
t.Logf("pre-malloc pool, free workers number:%d", pprem.Free())
pprem.Tune(TestSize)
pprem.Tune(TestSize / 10)
t.Logf("pre-malloc pool, after tuning capacity, capacity:%d, running:%d", pprem.Cap(), pprem.Running())
p, _ := NewPoolWithFunc(TestSize, demoPoolFunc)
defer func() {
_ = p.Invoke(Param)
}()
defer p.Release()
for i := 0; i < n; i++ {
_ = p.Invoke(Param)
}
time.Sleep(DefaultCleanIntervalTime)
t.Logf("pool with func, capacity:%d", p.Cap())
t.Logf("pool with func, running workers number:%d", p.Running())
t.Logf("pool with func, free workers number:%d", p.Free())
p.Tune(TestSize)
p.Tune(TestSize / 10)
t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running())
ppremWithFunc, _ := NewPoolWithFunc(TestSize, demoPoolFunc, WithPreAlloc(true))
defer func() {
_ = ppremWithFunc.Invoke(Param)
}()
defer ppremWithFunc.Release()
for i := 0; i < n; i++ {
_ = ppremWithFunc.Invoke(Param)
}
time.Sleep(DefaultCleanIntervalTime)
t.Logf("pre-malloc pool with func, capacity:%d", ppremWithFunc.Cap())
t.Logf("pre-malloc pool with func, running workers number:%d", ppremWithFunc.Running())
t.Logf("pre-malloc pool with func, free workers number:%d", ppremWithFunc.Free())
ppremWithFunc.Tune(TestSize)
ppremWithFunc.Tune(TestSize / 10)
t.Logf("pre-malloc pool with func, after tuning capacity, capacity:%d, running:%d", ppremWithFunc.Cap(),
ppremWithFunc.Running())
}
func TestPoolTuneScaleUp(t *testing.T) {
c := make(chan struct{})
p, _ := NewPool(2)
for i := 0; i < 2; i++ {
_ = p.Submit(func() {
<-c
})
}
if n := p.Running(); n != 2 {
t.Errorf("expect 2 workers running, but got %d", n)
}
// test pool tune scale up one
p.Tune(3)
_ = p.Submit(func() {
<-c
})
if n := p.Running(); n != 3 {
t.Errorf("expect 3 workers running, but got %d", n)
}
// test pool tune scale up multiple
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = p.Submit(func() {
<-c
})
}()
}
p.Tune(8)
wg.Wait()
if n := p.Running(); n != 8 {
t.Errorf("expect 8 workers running, but got %d", n)
}
for i := 0; i < 8; i++ {
c <- struct{}{}
}
p.Release()
// test PoolWithFunc
pf, _ := NewPoolWithFunc(2, func(i interface{}) {
<-c
})
for i := 0; i < 2; i++ {
_ = pf.Invoke(1)
}
if n := pf.Running(); n != 2 {
t.Errorf("expect 2 workers running, but got %d", n)
}
// test pool tune scale up one
pf.Tune(3)
_ = pf.Invoke(1)
if n := pf.Running(); n != 3 {
t.Errorf("expect 3 workers running, but got %d", n)
}
// test pool tune scale up multiple
var pfwg sync.WaitGroup
for i := 0; i < 5; i++ {
pfwg.Add(1)
go func() {
defer pfwg.Done()
_ = pf.Invoke(1)
}()
}
pf.Tune(8)
pfwg.Wait()
if n := pf.Running(); n != 8 {
t.Errorf("expect 8 workers running, but got %d", n)
}
for i := 0; i < 8; i++ {
c <- struct{}{}
}
close(c)
pf.Release()
}
func TestReleaseTimeout(t *testing.T) {
p, _ := NewPool(10)
for i := 0; i < 5; i++ {
_ = p.Submit(func() {
time.Sleep(time.Second)
})
}
assert.NotZero(t, p.Running())
err := p.ReleaseTimeout(2 * time.Second)
assert.NoError(t, err)
var pf *PoolWithFunc
pf, _ = NewPoolWithFunc(10, func(i interface{}) {
dur := i.(time.Duration)
time.Sleep(dur)
})
for i := 0; i < 5; i++ {
_ = pf.Invoke(time.Second)
}
assert.NotZero(t, pf.Running())
err = pf.ReleaseTimeout(2 * time.Second)
assert.NoError(t, err)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/greatesoft/ants.git
[email protected]:greatesoft/ants.git
greatesoft
ants
ants
master

搜索帮助