代码拉取完成,页面将自动刷新
package GPool
import (
"log"
"sync"
"sync/atomic"
"time"
)
var (
CLOSED int32 = 13
WORKING int32 = 12
)
var defaultSize int32 = 5
type finder struct {
W *worker
next *finder
}
type Pool struct {
globalQueue chan *task
cap int32
running int32 // finder的数量
workGroup sync.WaitGroup // 执行任务是否完成
taskGroup sync.WaitGroup // 任务添加是否完成
workerHead *finder
mux sync.Mutex
isClose int32
workerCache sync.Pool // 存储复用
recoveryTime time.Duration // worker超时回收
}
func NewPool(capacity int32) *Pool {
if capacity <= 0 {
capacity = defaultSize
}
return &Pool {
globalQueue: make(chan *task, capacity * 2),
cap: capacity,
workGroup: sync.WaitGroup{},
workerHead: &finder{
W: nil,
next: nil,
},
mux: sync.Mutex{},
isClose: WORKING,
workerCache: sync.Pool{},
recoveryTime: 300 * time.Millisecond,
}
}
func isPoolClosed(p *Pool) bool {
if atomic.LoadInt32(&p.isClose) > WORKING {
log.Println("--- Pool is closed, add task failed ---")
return true
}
return false
}
func (p *Pool) AddTask(target func(...interface{}), args []interface{}) {
if isPoolClosed(p) {
return
}
if target != nil {
p.taskGroup.Add(1)
newTask := &task{
workFunc: target,
args: args,
}
p.globalQueue <- newTask
p.taskGroup.Done()
}
}
func (p *Pool) addFinder(w *worker) {
p.mux.Lock()
defer p.mux.Unlock()
p.running++
newFinder := &finder{
W: w,
next: nil,
}
if atomic.LoadInt32(&p.running) == 0 {
p.workerHead.next = newFinder
return
}
newFinder.next = p.workerHead.next
p.workerHead.next = newFinder
}
func (p *Pool) getWorker() *worker {
if isPoolClosed(p) {
return nil
}
for {
for h := p.workerHead.next; h != nil; h = h.next {
if h.W.status == FREE {
h.W.mux.Lock()
h.W.status = RUNNING
h.W.heartTime = time.Now() // 更新心跳时间
h.W.mux.Unlock()
return h.W
}
}
var newWorker *worker
if atomic.LoadInt32(&p.running) < p.cap {
if one := p.workerCache.Get(); one != nil {
newWorker = one.(*worker)
} else {
newWorker = &worker{
status: RUNNING,
work: nil,
mux: sync.Mutex{},
heartTime: time.Now(),
}
}
p.addFinder(newWorker)
return newWorker
}
}
}
func (p *Pool) recoverWorker() {
for {
if isPoolClosed(p) {
return
}
currentTime := time.Now()
p.mux.Lock()
for you := p.workerHead; you != nil; you = you.next {
cur := you.next
if cur == nil {
break
}
if currentTime.Sub(cur.W.heartTime) >= p.recoveryTime {
you.next = cur.next
cur.next = nil
p.workerCache.Put(cur.W)
}
}
p.mux.Unlock()
time.Sleep(500 * time.Millisecond)
}
}
func (p *Pool) closeGQ() {
p.taskGroup.Wait()
close(p.globalQueue)
}
func (p *Pool) Close() {
atomic.AddInt32(&p.isClose, 1)
}
func (p *Pool) Wait() {
p.workGroup.Wait()
}
func (p *Pool) run(everTask *task) {
oneWorker := p.getWorker()
defer func(w *worker) {
if err := recover(); err != nil {
log.Println(err)
}
p.workGroup.Done()
w.mux.Lock()
w.status = FREE
w.mux.Unlock()
}(oneWorker)
oneWorker.work = everTask
oneWorker.run()
}
func (p *Pool) Run() {
go p.closeGQ()
go p.recoverWorker()
for {
everTask, ok := <-p.globalQueue
if !ok {
break
}
p.workGroup.Add(1)
oneWorker := p.getWorker()
go func() {
defer func(w *worker) {
if err := recover(); err != nil {
log.Println(err)
}
p.workGroup.Done()
w.mux.Lock()
w.status = FREE
w.mux.Unlock()
}(oneWorker)
oneWorker.work = everTask
oneWorker.run()
}()
}
p.workGroup.Wait()
p.Close()
}
func (p *Pool) Run2() {
go p.closeGQ()
go p.recoverWorker()
for {
everTask, ok := <-p.globalQueue
if !ok {
break
}
p.workGroup.Add(1)
go p.run(everTask)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。