6 Star 46 Fork 22

springrain/dm

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
y.go 4.35 KB
一键复制 编辑 原始数据 按行查看 历史
springrain 提交于 2023-02-03 12:01 . v1.8.11 来自 达梦8.1.2.192
/*
* Copyright (c) 2000-2018, 达梦数据库有限公司.
* All rights reserved.
*/
package dm
import (
"bytes"
"math/rand"
"sync"
"time"
"gitee.com/chunanyong/dm/util"
)
/**
* dm_svc.conf中配置的服务名对应的一组实例, 以及相关属性和状态信息
*
* 需求:
* 1. 连接均匀分布在各个节点上
* 2. loginMode,loginStatus匹配
* 3. 连接异常节点比较耗时,在DB列表中包含异常节点时异常连接尽量靠后,减少对建连接速度的影响
*
*
* DB 连接顺序:
* 1. well distribution,每次连接都从列表的下一个节点开始
* 2. 用DB sort值按从大到小排序,sort为一个四位数XXXX,个位--serverStatus,十位--serverMode,共 有三种模式,最优先的 *100, 次优先的*10
*/
type epGroup struct {
name string
epList []*ep
props *Properties
epStartPos int32 // wellDistribute 起始位置
lock sync.Mutex
}
func newEPGroup(name string, serverList []*ep) *epGroup {
g := new(epGroup)
g.name = name
g.epList = serverList
if serverList == nil || len(serverList) == 0 {
g.epStartPos = -1
} else {
// 保证进程间均衡,起始位置采用随机值
g.epStartPos = rand.Int31n(int32(len(serverList))) - 1
}
return g
}
func (g *epGroup) connect(connector *DmConnector) (*DmConnection, error) {
var dbSelector = g.getEPSelector(connector)
var ex error = nil
// 如果配置了loginMode的主、备等优先策略,而未找到最高优先级的节点时持续循环switchtimes次,如果最终还是没有找到最高优先级则选择次优先级的
// 如果只有一个节点,一轮即可决定是否连接;多个节点时保证switchTimes轮尝试,最后一轮决定用哪个节点(由于节点已经按照模式优先级排序,最后一轮理论上就是连第一个节点)
var cycleCount int32
if len(g.epList) == 1 {
cycleCount = 1
} else {
cycleCount = connector.switchTimes + 1
}
for i := int32(0); i < cycleCount; i++ {
// 循环了一遍,如果没有符合要求的, 重新排序, 再尝试连接
conn, err := g.traverseServerList(connector, dbSelector, i == 0, i == cycleCount-1)
if err != nil {
ex = err
time.Sleep(time.Duration(connector.switchInterval) * time.Millisecond)
continue
}
return conn, nil
}
return nil, ex
}
func (g *epGroup) getEPSelector(connector *DmConnector) *epSelector {
if connector.epSelector == TYPE_HEAD_FIRST {
return newEPSelector(g.epList)
} else {
serverCount := int32(len(g.epList))
sortEPs := make([]*ep, serverCount)
g.lock.Lock()
defer g.lock.Unlock()
g.epStartPos = (g.epStartPos + 1) % serverCount
for i := int32(0); i < serverCount; i++ {
sortEPs[i] = g.epList[(i+g.epStartPos)%serverCount]
}
return newEPSelector(sortEPs)
}
}
/**
* 从指定编号开始,遍历一遍服务名中的ip列表,只连接指定类型(主机或备机)的ip
* @param servers
* @param checkTime
*
* @exception
* DBError.ECJDBC_INVALID_SERVER_MODE 有站点的模式不匹配
* DBError.ECJDBC_COMMUNITION_ERROR 所有站点都连不上
*/
func (g *epGroup) traverseServerList(connector *DmConnector, epSelector *epSelector, first bool, last bool) (*DmConnection, error) {
epList := epSelector.sortDBList(first)
errorMsg := bytes.NewBufferString("")
var ex error = nil // 第一个错误
for _, server := range epList {
conn, err := server.connect(connector)
if err != nil {
if ex == nil {
ex = err
}
errorMsg.WriteString("[")
errorMsg.WriteString(server.String())
errorMsg.WriteString("]")
errorMsg.WriteString(err.Error())
errorMsg.WriteString(util.StringUtil.LineSeparator())
continue
}
valid, err := epSelector.checkServerMode(conn, last)
if err != nil {
if ex == nil {
ex = err
}
errorMsg.WriteString("[")
errorMsg.WriteString(server.String())
errorMsg.WriteString("]")
errorMsg.WriteString(err.Error())
errorMsg.WriteString(util.StringUtil.LineSeparator())
continue
}
if !valid {
conn.close()
err = ECGO_INVALID_SERVER_MODE.throw()
if ex == nil {
ex = err
}
errorMsg.WriteString("[")
errorMsg.WriteString(server.String())
errorMsg.WriteString("]")
errorMsg.WriteString(err.Error())
errorMsg.WriteString(util.StringUtil.LineSeparator())
continue
}
return conn, nil
}
if ex != nil {
return nil, ex
}
return nil, ECGO_COMMUNITION_ERROR.addDetail(errorMsg.String()).throw()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/chunanyong/dm.git
[email protected]:chunanyong/dm.git
chunanyong
dm
dm
master

搜索帮助