2 Star 19 Fork 4

不稳定丶大神/lucky

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
broadcast.go 3.34 KB
一键复制 编辑 原始数据 按行查看 历史
火星上的乞丐 提交于 2021-02-20 18:09 . re-named packages
package lucky
import (
"errors"
"github.com/google/uuid"
"github.com/helloh2o/lucky/log"
"runtime/debug"
"sync/atomic"
"time"
)
// BroadcastNode 广播转发节点
type BroadcastNode struct {
// 节点ID
NodeId string
// 网络连接
Connections map[interface{}]IConnection
// 当前连接数量
clientSize int64
// message channel
onMessage chan interface{}
recentMessages []interface{}
// AddConn
addConnChan chan IConnection
delConnChan chan string
closeFlag int64
}
// errFoo node closed error
var errFoo = errors.New("node closed")
// NewBroadcastNode return a new BroadcastNode
func NewBroadcastNode() *BroadcastNode {
return &BroadcastNode{
Connections: make(map[interface{}]IConnection),
NodeId: uuid.New().String(),
onMessage: make(chan interface{}),
addConnChan: make(chan IConnection),
delConnChan: make(chan string),
}
}
// Serve the node
func (bNode *BroadcastNode) Serve() {
go func() {
defer func() {
for _, conn := range bNode.Connections {
conn.SetNode(nil)
}
}()
for {
// 优先管理连接
select {
// add conn
case ic := <-bNode.addConnChan:
bNode.Connections[ic.GetUuid()] = ic
bNode.clientSize++
// conn leave
case key := <-bNode.delConnChan:
delete(bNode.Connections, key)
bNode.clientSize--
default:
select {
case pkg := <-bNode.onMessage:
if pkg == nil {
log.Release("============= BroadcastNode %s, stop serve =============", bNode.NodeId)
// stop Serve
return
}
bNode.recentMessages = append(bNode.recentMessages, pkg)
// cache recent 100
recentSize := len(bNode.recentMessages)
if recentSize > 100 {
bNode.recentMessages = bNode.recentMessages[recentSize-100:]
}
bNode.broadcast(pkg)
default:
time.Sleep(time.Millisecond * 50)
}
}
}
}()
}
func (bNode *BroadcastNode) broadcast(msg interface{}) {
defer func() {
if r := recover(); r != nil {
log.Error("write frame error %v, stack %s", r, string(debug.Stack()))
}
}()
if bNode.clientSize == 0 {
return
}
log.Debug("amount %d, broadcast msg %+v", bNode.clientSize, msg)
for _, conn := range bNode.Connections {
conn.WriteMsg(msg)
}
log.Debug(" ======= broadcast ok ======= ")
}
// OnRawMessage bytes
func (bNode *BroadcastNode) OnRawMessage([]byte) error { return nil }
// OnProtocolMessage interface
func (bNode *BroadcastNode) OnProtocolMessage(msg interface{}) error {
if bNode.available() {
bNode.onMessage <- msg
}
return errFoo
}
// GetAllMessage return chan []interface{}
func (bNode *BroadcastNode) GetAllMessage() chan []interface{} {
data := make(chan []interface{}, 1)
data <- bNode.recentMessages
return data
}
// AddConn by conn
func (bNode *BroadcastNode) AddConn(conn IConnection) error {
if bNode.available() {
bNode.addConnChan <- conn
return nil
}
return errFoo
}
// DelConn by key
func (bNode *BroadcastNode) DelConn(key string) error {
if bNode.available() {
bNode.delConnChan <- key
return nil
}
return errFoo
}
// Complete sync
func (bNode *BroadcastNode) Complete() error {
return nil
}
// Destroy the node
func (bNode *BroadcastNode) Destroy() error {
if bNode.available() {
atomic.AddInt64(&bNode.closeFlag, 1)
go func() {
bNode.onMessage <- nil
}()
}
return errFoo
}
func (bNode *BroadcastNode) available() bool {
return atomic.LoadInt64(&bNode.closeFlag) == 0
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/helloh2o/lucky.git
[email protected]:helloh2o/lucky.git
helloh2o
lucky
lucky
master

搜索帮助