diff --git a/backend/balancer.go b/backend/balancer.go index 972aa0ebc734ffe4f69686e3847c0d01ae40edd5..f47811250faf1741ce4d401897567894f362fba8 100644 --- a/backend/balancer.go +++ b/backend/balancer.go @@ -122,23 +122,30 @@ func (n *Node) getNextSlaveByWeight() (*DB, error) { return db, nil } -func (n *Node) workerSlaveLsn(ctx context.Context, lsnChan chan int, masterLsn uint64) { +func (n *Node) workerSlaveLsn(ctx context.Context, lsnChan chan int, masterLsn uint64, nl map[string]uint64) { + var parseUint uint64 LOOP: for { // save standby node index where LSN meet the conditions for i, db := range n.Slave { - val, flag := n.NodeLsn.Load(strings.Split(db.addr, ":")[0]) - if flag { - parseUint := val.(uint64) - if golog.GlobalSysLogger.Level() <= golog.LevelDebug { - golog.Debug("balancer", "GetNextSlave", - fmt.Sprintf("slave node index: [%d], addr: [%s], 10hex lsn: [%d]", i, db.addr, parseUint), 0) - } - if parseUint >= masterLsn { - lsnChan <- i - break LOOP + if nl != nil && nl[strings.Split(db.addr, ":")[0]] != 0 { + parseUint = nl[strings.Split(db.addr, ":")[0]] + }else { + val, flag := n.NodeLsn.Load(strings.Split(db.addr, ":")[0]) + if flag { + parseUint = val.(uint64) + }else { + continue } } + if golog.GlobalSysLogger.Level() <= golog.LevelDebug { + golog.Debug("balancer", "GetNextSlave", + fmt.Sprintf("slave node index: [%d], addr: [%s], 10hex lsn: [%d]", i, db.addr, parseUint), 0) + } + if parseUint >= masterLsn { + lsnChan <- i + break LOOP + } } select { case <-ctx.Done(): //等待上级通知 @@ -148,23 +155,23 @@ LOOP: } } -func (n *Node) GetNextSlave() (*DB, error) { +func (n *Node) GetNextSlave(ctx context.Context) (*DB, error) { if n.Cfg.LoadBalanceMode == "random" { return n.getNextSlaveByRandom() } else if n.Cfg.LoadBalanceMode == "master" { return n.Master, nil } else if n.Cfg.LoadBalanceMode == "weight" { return n.getNextSlaveByWeight() - } else if n.Cfg.LoadBalanceMode == "lsn" { + } else if n.Cfg.LoadBalanceMode == config.Lsn { // Determine whether the LSN number meets the requirements // first get current db_table's LSN, if not exist will use node's LSN var masterLsn uint64 // strong consistence - if n.Cfg.ReadConsistenceLevel == "strong" { + if n.Cfg.ReadConsistenceLevel == config.GlobalStrong { // get latest LSN from master node masterLsn = n.getMasterLsn(n.Cfg.Master) // session consistence - } else { + } else if n.Cfg.ReadConsistenceLevel == config.Strong { if golog.GlobalSysLogger.Level() <= golog.LevelDebug { golog.Debug("balancer", "GetNextSlave", fmt.Sprintf("Node Cache Key: [%s]", n.NodeCacheKey), 0) @@ -186,59 +193,123 @@ func (n *Node) GetNextSlave() (*DB, error) { } } /* - // save standby node index where LSN meet the conditions - lsnSlice := make([]int, 0, 0) - for i, db := range n.Slave { - val, flag = n.NodeLsn.Load(strings.Split(db.addr, ":")[0]) - if flag { - parseUint := val.(uint64) + // save standby node index where LSN meet the conditions + lsnSlice := make([]int, 0, 0) + for i, db := range n.Slave { + val, flag = n.NodeLsn.Load(strings.Split(db.addr, ":")[0]) + if flag { + parseUint := val.(uint64) + if golog.GlobalSysLogger.Level() <= golog.LevelDebug { + golog.Debug("balancer", "GetNextSlave", + fmt.Sprintf("slave node index: [%d], addr: [%s], 10hex lsn: [%d]", i, db.addr, parseUint), 0) + } + if parseUint >= masterLsn { + lsnSlice = append(lsnSlice, i) + } + } + } + // Stores the subscript of the last selected node + var bestNodeIndex int + // Record the maximum load score of the node + maxLoadScore := 0 + // If the LSN of no standby node meets the conditions, + // an error will be reported and the request will fall to the master node. + if len(lsnSlice) <= 0 { + return nil, errors.ErrFormat("no slave node fix the lsn") + } else if len(lsnSlice) == 1 { + // If there is only one node, select this node without judging the load score + bestNodeIndex = lsnSlice[0] + } else { + // Compare the optimal load of the LSN qualified nodes in a round robin manner + // and serve as the node requesting the load + if config.Metrics { + for _, index := range lsnSlice { + if n.NodeLoad[index] >= maxLoadScore { + bestNodeIndex = index + maxLoadScore = n.NodeLoad[index] + } + } + } else { + bestNodeIndex = randNum(len(lsnSlice)) + } + } + + // According to the LSN condition, the index satisfying the node is extracted, + // and then the appropriate node is selected in the judgment. + // TODO LSN --> Cold and hot data --> node load, will impl 'Cold and hot data' in the future + // TODO Risk point: double network card problem. + // The IP of internal communication in the cluster is inconsistent with the IP of external user connection. + // The problem may be caused by IP mismatch when taking LSN. + return n.Slave[bestNodeIndex], nil + */ + } else if n.Cfg.ReadConsistenceLevel == config.Session { + if golog.GlobalSysLogger.Level() <= golog.LevelDebug { + golog.Debug("balancer", "GetNextSlave", + fmt.Sprintf("Node Cache Key: [%s]", n.NodeCacheKey), 0) + } + nl, ok := ctx.Value(config.NodeLsn).(map[string]uint64) + if ok { + if nl[n.NodeCacheKey] != 0 { + masterLsn = nl[n.NodeCacheKey] + if golog.GlobalSysLogger.Level() <= golog.LevelDebug { + golog.Debug("balancer", "GetNextSlave", + fmt.Sprintf("current use table [%s] latest LSN, 10hex lsn: [%d]", n.NodeCacheKey, masterLsn), 0) + } + }else if nl[strings.Split(n.Master.addr, ":")[0]] != 0 { + masterLsn = nl[strings.Split(n.Master.addr, ":")[0]] if golog.GlobalSysLogger.Level() <= golog.LevelDebug { golog.Debug("balancer", "GetNextSlave", - fmt.Sprintf("slave node index: [%d], addr: [%s], 10hex lsn: [%d]", i, db.addr, parseUint), 0) + fmt.Sprintf("master node addr: [%s], 10hex lsn: [%d]", n.Master.addr, masterLsn), 0) + } + }else { + val, flag := n.NodeLsn.Load(strings.Split(n.Master.addr, ":")[0]) + if flag { + masterLsn = val.(uint64) } - if parseUint >= masterLsn { - lsnSlice = append(lsnSlice, i) + // parse string LSN to uint64 + if golog.GlobalSysLogger.Level() <= golog.LevelDebug { + golog.Debug("balancer", "GetNextSlave", + fmt.Sprintf("master node addr: [%s], 10hex lsn: [%d]", n.Master.addr, masterLsn), 0) } } - } - // Stores the subscript of the last selected node - var bestNodeIndex int - // Record the maximum load score of the node - maxLoadScore := 0 - // If the LSN of no standby node meets the conditions, - // an error will be reported and the request will fall to the master node. - if len(lsnSlice) <= 0 { - return nil, errors.ErrFormat("no slave node fix the lsn") - } else if len(lsnSlice) == 1 { - // If there is only one node, select this node without judging the load score - bestNodeIndex = lsnSlice[0] - } else { - // Compare the optimal load of the LSN qualified nodes in a round robin manner - // and serve as the node requesting the load - if config.Metrics { - for _, index := range lsnSlice { - if n.NodeLoad[index] >= maxLoadScore { - bestNodeIndex = index - maxLoadScore = n.NodeLoad[index] + + lsnChan := make(chan int, 1) + ctx, cancel := context.WithCancel(context.Background()) + go n.workerSlaveLsn(ctx, lsnChan, masterLsn, nl) + for { + select { + case i := <-lsnChan: + return n.Slave[i], nil + // waiting for slave catch up master's LSN timeout + case <-time.After(time.Millisecond * time.Duration(n.Cfg.ConsistenceTimeout)): + // cancel go routine workerSlaveLsn + cancel() + // 0 means exec request with master + if n.Cfg.ConsistenceTimeoutAction == 0 { + if golog.GlobalSysLogger.Level() <= golog.LevelDebug { + golog.Debug("balancer", "GetNextSlave", + fmt.Sprintf("waiting for slave catch up master's LSN timeout, do exec on master addr: [%s]", n.Master.addr), 0) + } + return n.Master, nil + } else { + // else will return an err + return nil, errors.ErrWaitConsistTO } } - } else { - bestNodeIndex = randNum(len(lsnSlice)) + } + }else { + val, _ := n.NodeLsn.Load(strings.Split(n.Master.addr, ":")[0]) + masterLsn = val.(uint64) + // parse string LSN to uint64 + if golog.GlobalSysLogger.Level() <= golog.LevelDebug { + golog.Debug("balancer", "GetNextSlave", + fmt.Sprintf("master node addr: [%s], 10hex lsn: [%d]", n.Master.addr, masterLsn), 0) } } - - // According to the LSN condition, the index satisfying the node is extracted, - // and then the appropriate node is selected in the judgment. - // TODO LSN --> Cold and hot data --> node load, will impl 'Cold and hot data' in the future - // TODO Risk point: double network card problem. - // The IP of internal communication in the cluster is inconsistent with the IP of external user connection. - // The problem may be caused by IP mismatch when taking LSN. - return n.Slave[bestNodeIndex], nil - */ } lsnChan := make(chan int, 1) ctx, cancel := context.WithCancel(context.Background()) - go n.workerSlaveLsn(ctx, lsnChan, masterLsn) + go n.workerSlaveLsn(ctx, lsnChan, masterLsn, nil) for { select { case i := <-lsnChan: @@ -249,6 +320,10 @@ func (n *Node) GetNextSlave() (*DB, error) { cancel() // 0 means exec request with master if n.Cfg.ConsistenceTimeoutAction == 0 { + if golog.GlobalSysLogger.Level() <= golog.LevelDebug { + golog.Debug("balancer", "GetNextSlave", + fmt.Sprintf("waiting for slave catch up master's LSN timeout, do exec on master addr: [%s]", n.Master.addr), 0) + } return n.Master, nil } else { // else will return an err @@ -264,7 +339,7 @@ func (n *Node) GetNextSlave() (*DB, error) { index = n.BestNodeIndexByMetric } return n.Slave[index], nil - } else if n.Cfg.LoadBalanceMode == "cache" { + } else if n.Cfg.LoadBalanceMode == config.Cache { // if use simple parse will use lex analysis, and can not get table name, so change to weight mode if config.SimpleParse { return n.getNextSlaveByWeight() diff --git a/backend/node.go b/backend/node.go index e9817a497f5fa851046869eeb42fb76f45d30df5..fde38b573937bc74a52f6c1fa2b6f77fc80032db 100644 --- a/backend/node.go +++ b/backend/node.go @@ -194,7 +194,7 @@ func (n *Node) GetMasterConnPg(dbname string, dbuser string) (*BackendConn, erro func (n *Node) GetSlaveConn() (*BackendConn, error) { n.Lock() - db, err := n.GetNextSlave() + db, err := n.GetNextSlave(nil) n.Unlock() if err != nil { return nil, err @@ -210,9 +210,9 @@ func (n *Node) GetSlaveConn() (*BackendConn, error) { return db.GetConn() } -func (n *Node) GetSlaveConnPg(dbname string, dbuser string, tablename string) (*BackendConn, error) { +func (n *Node) GetSlaveConnPg(ctx context.Context, dbname string, dbuser string, tablename string) (*BackendConn, error) { n.NodeCacheKey = dbname + "_" + tablename - db, err := n.GetNextSlave() + db, err := n.GetNextSlave(ctx) if err != nil { return nil, err } @@ -252,7 +252,7 @@ func (n *Node) checkMaster() { if config.Mysql != config.DbType { // get master and read-only nodes latest LSN in loop // get cached tables relationship with nodes in loop - if n.Cfg.LoadBalanceMode == "lsn" || n.Cfg.LoadBalanceMode == "cache" { + if n.Cfg.LoadBalanceMode == config.Lsn || n.Cfg.LoadBalanceMode == config.Cache { n.getLsnAndCacheMetadata(n.Master.checkConn, n.Cfg.Master) } } @@ -506,114 +506,118 @@ func (n *Node) getLsnAndCacheMetadata(conn *Conn, masterStr string) { } // init read-only node's LSN - rows, er := conn.ConnPgx.Query(context.Background(), "select client_addr::text, replay_lsn from pg_stat_replication;") - if er != nil { - if golog.GlobalSysLogger.Level() <= golog.LevelError { - golog.Error("node", "ParseMaster", fmt.Sprintf("Query pg_stat_replication failed: %s", er.Error()), 0) - } - } - var addr string - var lsn string - for rows.Next() { - er = rows.Scan(&addr, &lsn) + if n.Cfg.LoadBalanceMode == config.Lsn { + rows, er := conn.ConnPgx.Query(context.Background(), "select client_addr::text, replay_lsn from pg_stat_replication;") if er != nil { if golog.GlobalSysLogger.Level() <= golog.LevelError { - golog.Error("node", "ParseMaster", fmt.Sprintf("scan client_addr,replay_lsn err : %s", er.Error()), 0) + golog.Error("node", "ParseMaster", fmt.Sprintf("Query pg_stat_replication failed: %s", er.Error()), 0) } - - break } - if val, err := pgLsnInInternal(lsn); err != nil { - if golog.GlobalSysLogger.Level() <= golog.LevelError { - golog.Error("node", "ParseMaster", fmt.Sprintf("slave node parse LSN 16hex to 10hex err : %s, addr: %s", - err.Error(), addr), 0) + var addr string + var lsn string + for rows.Next() { + er = rows.Scan(&addr, &lsn) + if er != nil { + if golog.GlobalSysLogger.Level() <= golog.LevelError { + golog.Error("node", "ParseMaster", fmt.Sprintf("scan client_addr,replay_lsn err : %s", er.Error()), 0) + } + + break } + if val, err := pgLsnInInternal(lsn); err != nil { + if golog.GlobalSysLogger.Level() <= golog.LevelError { + golog.Error("node", "ParseMaster", fmt.Sprintf("slave node parse LSN 16hex to 10hex err : %s, addr: %s", + err.Error(), addr), 0) + } - } else { - n.NodeLsn.Store(strings.Split(addr, "/")[0], val) - if golog.GlobalSysLogger.Level() <= golog.LevelTrace { - golog.Trace("node", "ParseMaster", - fmt.Sprintf("slave node parse LSN, addr: %s, lsn 16hex: %s, lsn 10hex: %d ", addr, lsn, val), 0) + } else { + n.NodeLsn.Store(strings.Split(addr, "/")[0], val) + if golog.GlobalSysLogger.Level() <= golog.LevelTrace { + golog.Trace("node", "ParseMaster", + fmt.Sprintf("slave node parse LSN, addr: %s, lsn 16hex: %s, lsn 10hex: %d ", addr, lsn, val), 0) + } } } - } - // init master node's LSN - er = conn.ConnPgx.QueryRow(context.Background(), "select pg_current_wal_lsn();").Scan(&lsn) - if er != nil { - golog.Error("node", "ParseMaster", "scan pg_current_wal_lsn err !"+er.Error(), 0) - } else { - if val, err := pgLsnInInternal(lsn); err != nil { - if golog.GlobalSysLogger.Level() <= golog.LevelError { - golog.Error("node", "ParseMaster", fmt.Sprintf("master node parse LSN 16hex to 10hex err : %s, addr: %s", - err.Error(), masterStr), 0) - } + // init master node's LSN + er = conn.ConnPgx.QueryRow(context.Background(), "select pg_current_wal_lsn();").Scan(&lsn) + if er != nil { + golog.Error("node", "ParseMaster", "scan pg_current_wal_lsn err !"+er.Error(), 0) } else { - n.NodeLsn.Store(strings.Split(masterStr, ":")[0], val) - if golog.GlobalSysLogger.Level() <= golog.LevelTrace { - golog.Trace("node", "ParseMaster", - fmt.Sprintf("master node parse LSN, addr: %s, lsn 16hex: %s , lsn 10hex: %d", masterStr, lsn, val), 0) + if val, err := pgLsnInInternal(lsn); err != nil { + if golog.GlobalSysLogger.Level() <= golog.LevelError { + golog.Error("node", "ParseMaster", fmt.Sprintf("master node parse LSN 16hex to 10hex err : %s, addr: %s", + err.Error(), masterStr), 0) + } + } else { + n.NodeLsn.Store(strings.Split(masterStr, ":")[0], val) + if golog.GlobalSysLogger.Level() <= golog.LevelTrace { + golog.Trace("node", "ParseMaster", + fmt.Sprintf("master node parse LSN, addr: %s, lsn 16hex: %s , lsn 10hex: %d", masterStr, lsn, val), 0) + } } } } - // init metadata about table which node cached - var he3MetaConninfo string - er = conn.ConnPgx.QueryRow(context.Background(), "show he3_meta_conninfo;").Scan(&he3MetaConninfo) - if er != nil { - if golog.GlobalSysLogger.Level() <= golog.LevelError { - golog.Error("node", "ParseMaster", fmt.Sprintf("Query he3_meta_conninfo failed: %s", er.Error()), 0) - } - } else { - vo, ero := parseHe3MetaConninfo(he3MetaConninfo) - if ero != nil { + if n.Cfg.LoadBalanceMode == config.Cache { + // init metadata about table which node cached + var he3MetaConninfo string + err = conn.ConnPgx.QueryRow(context.Background(), "show he3_meta_conninfo;").Scan(&he3MetaConninfo) + if err != nil { if golog.GlobalSysLogger.Level() <= golog.LevelError { - golog.Error("node", "ParseMaster", fmt.Sprintf("parse he3_meta_conninfo error: %s \n", he3MetaConninfo), 0) + golog.Error("node", "ParseMaster", fmt.Sprintf("Query he3_meta_conninfo failed: %s", err.Error()), 0) } } else { - urlExample := "postgres://" + vo.user + ":" + vo.password + "@" + vo.host + ":" + vo.port + "/postgres" - if golog.GlobalSysLogger.Level() <= golog.LevelInfo { - golog.Info("node", "ParseMaster", fmt.Sprintf("he3_meta_conninfo url: %s \n", urlExample), 0) - } - connMeta, metaErr := pgx.Connect(context.Background(), urlExample) - defer connMeta.Close(context.Background()) - if metaErr != nil { + vo, ero := parseHe3MetaConninfo(he3MetaConninfo) + if ero != nil { if golog.GlobalSysLogger.Level() <= golog.LevelError { - golog.Error("node", "ParseMaster", fmt.Sprintf("Unable to connect to he3 meta database: %s \n", metaErr), 0) + golog.Error("node", "ParseMaster", fmt.Sprintf("parse he3_meta_conninfo error: %s \n", he3MetaConninfo), 0) } } else { - rows, er = connMeta.Query(context.Background(), "select datname, relname, clientaddr from pg_hot_data where action='precache';") - if er != nil { + urlExample := "postgres://" + vo.user + ":" + vo.password + "@" + vo.host + ":" + vo.port + "/postgres" + if golog.GlobalSysLogger.Level() <= golog.LevelInfo { + golog.Info("node", "ParseMaster", fmt.Sprintf("he3_meta_conninfo url: %s \n", urlExample), 0) + } + connMeta, metaErr := pgx.Connect(context.Background(), urlExample) + defer connMeta.Close(context.Background()) + if metaErr != nil { if golog.GlobalSysLogger.Level() <= golog.LevelError { - golog.Error("node", "ParseMaster", fmt.Sprintf("Query pg_hot_data failed: %s", er.Error()), 0) + golog.Error("node", "ParseMaster", fmt.Sprintf("Unable to connect to he3 meta database: %s \n", metaErr), 0) } - } - // Deal with `pg_hot_data` delete refresh, delete all data and restore - n.NodeCache.Range(func(key, value interface{}) bool { - n.NodeCache.Delete(key) - return true - }) - var datname string - var relname string - var clientaddr string - for rows.Next() { - er = rows.Scan(&datname, &relname, &clientaddr) + } else { + rows, er := connMeta.Query(context.Background(), "select datname, relname, clientaddr from pg_hot_data where action='precache';") if er != nil { if golog.GlobalSysLogger.Level() <= golog.LevelError { - golog.Error("node", "ParseMaster", fmt.Sprintf("scan datname, relname err : %s", er.Error()), 0) + golog.Error("node", "ParseMaster", fmt.Sprintf("Query pg_hot_data failed: %s", er.Error()), 0) } - break - } - if golog.GlobalSysLogger.Level() <= golog.LevelTrace { - golog.Trace("node", "ParseMaster", fmt.Sprintf("datname: %s, relname: %s, client_addr: %s", datname, relname, clientaddr), 0) } - val, flag := n.NodeCache.Load(datname + "_" + relname) - var nodeCacheVal []string - if !flag { - nodeCacheVal = make([]string, 0) - } else { - nodeCacheVal = val.([]string) + // Deal with `pg_hot_data` delete refresh, delete all data and restore + n.NodeCache.Range(func(key, value interface{}) bool { + n.NodeCache.Delete(key) + return true + }) + var datname string + var relname string + var clientaddr string + for rows.Next() { + er = rows.Scan(&datname, &relname, &clientaddr) + if er != nil { + if golog.GlobalSysLogger.Level() <= golog.LevelError { + golog.Error("node", "ParseMaster", fmt.Sprintf("scan datname, relname err : %s", er.Error()), 0) + } + break + } + if golog.GlobalSysLogger.Level() <= golog.LevelTrace { + golog.Trace("node", "ParseMaster", fmt.Sprintf("datname: %s, relname: %s, client_addr: %s", datname, relname, clientaddr), 0) + } + val, flag := n.NodeCache.Load(datname + "_" + relname) + var nodeCacheVal []string + if !flag { + nodeCacheVal = make([]string, 0) + } else { + nodeCacheVal = val.([]string) + } + nodeCacheVal = append(nodeCacheVal, clientaddr) + n.NodeCache.LoadOrStore(datname+"_"+relname, nodeCacheVal) } - nodeCacheVal = append(nodeCacheVal, clientaddr) - n.NodeCache.LoadOrStore(datname+"_"+relname, nodeCacheVal) } } } diff --git a/config/config.go b/config/config.go index 534d2e16d52e74c58d3000842b71c57297afaaee..57441a3b3280935e14b5528b048e8443acae6d87 100644 --- a/config/config.go +++ b/config/config.go @@ -61,6 +61,12 @@ const ( Mysql = "mysql" PG = "postgresql" DefaultHe3User = "he3proxy" + NodeLsn = "nodeLsn" + Session = "session" + GlobalStrong = "global_strong" + Strong = "strong" + Lsn = "lsn" + Cache = "cache" ) type MirrorExecMsg struct { diff --git a/doc/He3Proxy/configure_description.md b/doc/He3Proxy/configure_description.md index 726fd2706bdd8d8a5593f29f7c3d03c97a9fe32b..f2c6234e2eb5150a48c1dd89d1b3389472301b1a 100644 --- a/doc/He3Proxy/configure_description.md +++ b/doc/He3Proxy/configure_description.md @@ -124,9 +124,10 @@ nodes: load_balance_mode: weight # 通过pg_stat_replication获取lsn信息的间隔时间,配合‘load_balance_mode: lsn’使用,单位:ms lsn_cache_req_interval: 60 - # 读一致性级别,支持 strong, session ,‘load_balance_mode=lsn’时有效 - # strong: 强一致性 - # session: 会话一致性 + # 读一致性级别,支持 global_strong, strong, session ,‘load_balance_mode=lsn’时有效 + # global_strong: 全局强一致性 + # strong: 保证使用一个he3proxy服务时的强一致性 + # session: 会话一致性,即链接内查询的一致性 read_consistence_level: session # 强一致性附加参数,‘read_consistence_level: strong’ 时有效 # consistence_timeout: 等待备节点追上主节点lsn的超时时间. 单位: ms diff --git a/etc/he3proxy.yaml b/etc/he3proxy.yaml index 2df5901038180697b2e5fb08e56625e780fbc6b1..b6a8cac7705c505dbbf4e10553bfcc8c244b233e 100644 --- a/etc/he3proxy.yaml +++ b/etc/he3proxy.yaml @@ -76,8 +76,9 @@ nodes: load_balance_mode: weight # request LSN and table cache node relationship metadata time interval, unit: ms lsn_cache_req_interval: 60 - # Optional: strong, session, default session. Come into effect when load_balance_mode=lsn. - # strong: global consistency + # Optional: global_strong, strong, session, default session. Come into effect when load_balance_mode=lsn. + # global_strong: global consistency + # strong: single he3proxy consistency # session: session consistency read_consistence_level: session # param for strong consistence, consistence_timeout means waiting for slave catch up master's lsn. unit: ms diff --git a/proxy/server/conn_pgsql.go b/proxy/server/conn_pgsql.go index d463fb3f8633fa6dca51d914b38bfaab0aa9e9a4..ffca42b3df2258ac7acebba9128d4f52629ea49b 100644 --- a/proxy/server/conn_pgsql.go +++ b/proxy/server/conn_pgsql.go @@ -855,11 +855,29 @@ readloop: } // set LSN to node if addr != "" { - cc.nodes["node1"].NodeLsn.Store(strings.Split(addr, ":")[0], lsn.LSN) + if cc.nodes["node1"].Cfg.LoadBalanceMode == config.Lsn{ + if cc.nodes["node1"].Cfg.ReadConsistenceLevel == config.Session { + nl, ok := ctx.Value(config.NodeLsn).(map[string]uint64) + if ok { + nl[strings.Split(addr, ":")[0]] = lsn.LSN + } + }else { + cc.nodes["node1"].NodeLsn.Store(strings.Split(addr, ":")[0], lsn.LSN) + } + } } // set LSN to db_table if cc.table != "" && cc.db != "" { - cc.nodes["node1"].NodeLsn.Store(cc.db+"_"+cc.table, lsn.LSN) + if cc.nodes["node1"].Cfg.LoadBalanceMode == config.Lsn { + if cc.nodes["node1"].Cfg.ReadConsistenceLevel == config.Session { + nl, ok := ctx.Value(config.NodeLsn).(map[string]uint64) + if ok { + nl[cc.db+"_"+cc.table] = lsn.LSN + } + }else { + cc.nodes["node1"].NodeLsn.Store(cc.db+"_"+cc.table, lsn.LSN) + } + } } continue case 'E': diff --git a/proxy/server/conn_preshard.go b/proxy/server/conn_preshard.go index ac7d69fb10d1cca518a91138c935b62ab7d687ea..dac72680f6d4e1ac4e89d331cf9dd45697f86160 100644 --- a/proxy/server/conn_preshard.go +++ b/proxy/server/conn_preshard.go @@ -228,7 +228,7 @@ func (c *ClientConn) preHandlePg(sql string, ctx context.Context) (*backend.Back return nil, nil } //get connection in DB - return c.getBackendConnPg(executeDB.ExecNode, executeDB.IsSlave) + return c.getBackendConnPg(ctx, executeDB.ExecNode, executeDB.IsSlave) } func (c *ClientConn) GetTransExecDB(tokens []string, sql string) (*ExecuteDB, error) { diff --git a/proxy/server/conn_query.go b/proxy/server/conn_query.go index 46d6440c8bc328126fe96f143cb5bfefa8149c73..d975ca4757eba2d1fb7065570d80b17c963a1cbc 100644 --- a/proxy/server/conn_query.go +++ b/proxy/server/conn_query.go @@ -15,6 +15,7 @@ package server import ( + "context" "fmt" "runtime" "strings" @@ -166,10 +167,10 @@ func (c *ClientConn) getBackendConn(n *backend.Node, fromSlave bool) (co *backen return } -func (c *ClientConn) getBackendConnPg(n *backend.Node, fromSlave bool) (co *backend.BackendConn, err error) { +func (c *ClientConn) getBackendConnPg(ctx context.Context, n *backend.Node, fromSlave bool) (co *backend.BackendConn, err error) { if !c.isInTrxPg { if fromSlave { - co, err = n.GetSlaveConnPg(c.db, c.user, c.table) + co, err = n.GetSlaveConnPg(ctx, c.db, c.user, c.table) //如果是链接池满的错误 则直接返回错误 不在尝试使用master节点 if err == errors.ErrMaxPoolIsFull || err == errors.ErrWaitConsistTO { return co, err diff --git a/proxy/server/server.go b/proxy/server/server.go index 4dc239863175ef32911290c4f19d42af2ffe686b..80244d664ccff66172b88f90b5c96c7aecd65772 100644 --- a/proxy/server/server.go +++ b/proxy/server/server.go @@ -522,6 +522,10 @@ func (s *Server) onConn(c net.Conn) { } } + // save node-lsn relationship + nodeLsn :=make(map[string]uint64) + ctx = context.WithValue(ctx, config.NodeLsn, nodeLsn) + conn.RunPg(ctx) } //else {