1 Star 0 Fork 0

scgg/GPool

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
pool.go 3.70 KB
一键复制 编辑 原始数据 按行查看 历史
scgg 提交于 2021-12-22 17:24 . 负载生成器
package GPool
import (
"log"
"sync"
"sync/atomic"
"time"
)
var (
CLOSED int32 = 13
WORKING int32 = 12
)
var defaultSize int32 = 5
type finder struct {
W *worker
next *finder
}
type Pool struct {
globalQueue chan *task
cap int32
running int32 // finder的数量
workGroup sync.WaitGroup // 执行任务是否完成
taskGroup sync.WaitGroup // 任务添加是否完成
workerHead *finder
mux sync.Mutex
isClose int32
workerCache sync.Pool // 存储复用
recoveryTime time.Duration // worker超时回收
}
func NewPool(capacity int32) *Pool {
if capacity <= 0 {
capacity = defaultSize
}
return &Pool {
globalQueue: make(chan *task, capacity * 2),
cap: capacity,
workGroup: sync.WaitGroup{},
workerHead: &finder{
W: nil,
next: nil,
},
mux: sync.Mutex{},
isClose: WORKING,
workerCache: sync.Pool{},
recoveryTime: 300 * time.Millisecond,
}
}
func isPoolClosed(p *Pool) bool {
if atomic.LoadInt32(&p.isClose) > WORKING {
log.Println("--- Pool is closed, add task failed ---")
return true
}
return false
}
func (p *Pool) AddTask(target func(...interface{}), args []interface{}) {
if isPoolClosed(p) {
return
}
if target != nil {
p.taskGroup.Add(1)
newTask := &task{
workFunc: target,
args: args,
}
p.globalQueue <- newTask
p.taskGroup.Done()
}
}
func (p *Pool) addFinder(w *worker) {
p.mux.Lock()
defer p.mux.Unlock()
p.running++
newFinder := &finder{
W: w,
next: nil,
}
if atomic.LoadInt32(&p.running) == 0 {
p.workerHead.next = newFinder
return
}
newFinder.next = p.workerHead.next
p.workerHead.next = newFinder
}
func (p *Pool) getWorker() *worker {
if isPoolClosed(p) {
return nil
}
for {
for h := p.workerHead.next; h != nil; h = h.next {
if h.W.status == FREE {
h.W.mux.Lock()
h.W.status = RUNNING
h.W.heartTime = time.Now() // 更新心跳时间
h.W.mux.Unlock()
return h.W
}
}
var newWorker *worker
if atomic.LoadInt32(&p.running) < p.cap {
if one := p.workerCache.Get(); one != nil {
newWorker = one.(*worker)
} else {
newWorker = &worker{
status: RUNNING,
work: nil,
mux: sync.Mutex{},
heartTime: time.Now(),
}
}
p.addFinder(newWorker)
return newWorker
}
}
}
func (p *Pool) recoverWorker() {
for {
if isPoolClosed(p) {
return
}
currentTime := time.Now()
p.mux.Lock()
for you := p.workerHead; you != nil; you = you.next {
cur := you.next
if cur == nil {
break
}
if currentTime.Sub(cur.W.heartTime) >= p.recoveryTime {
you.next = cur.next
cur.next = nil
p.workerCache.Put(cur.W)
}
}
p.mux.Unlock()
time.Sleep(500 * time.Millisecond)
}
}
func (p *Pool) closeGQ() {
p.taskGroup.Wait()
close(p.globalQueue)
}
func (p *Pool) Close() {
atomic.AddInt32(&p.isClose, 1)
}
func (p *Pool) Wait() {
p.workGroup.Wait()
}
func (p *Pool) run(everTask *task) {
oneWorker := p.getWorker()
defer func(w *worker) {
if err := recover(); err != nil {
log.Println(err)
}
p.workGroup.Done()
w.mux.Lock()
w.status = FREE
w.mux.Unlock()
}(oneWorker)
oneWorker.work = everTask
oneWorker.run()
}
func (p *Pool) Run() {
go p.closeGQ()
go p.recoverWorker()
for {
everTask, ok := <-p.globalQueue
if !ok {
break
}
p.workGroup.Add(1)
oneWorker := p.getWorker()
go func() {
defer func(w *worker) {
if err := recover(); err != nil {
log.Println(err)
}
p.workGroup.Done()
w.mux.Lock()
w.status = FREE
w.mux.Unlock()
}(oneWorker)
oneWorker.work = everTask
oneWorker.run()
}()
}
p.workGroup.Wait()
p.Close()
}
func (p *Pool) Run2() {
go p.closeGQ()
go p.recoverWorker()
for {
everTask, ok := <-p.globalQueue
if !ok {
break
}
p.workGroup.Add(1)
go p.run(everTask)
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/scgg/gpool.git
[email protected]:scgg/gpool.git
scgg
gpool
GPool
master

搜索帮助