From c55050fd40bc7fa7b2dc51bdfdfb273128b46441 Mon Sep 17 00:00:00 2001 From: wangyao Date: Tue, 18 Oct 2022 09:23:10 +0800 Subject: [PATCH 1/4] improve: concurrent err --- backend/balancer.go | 4 +- backend/db.go | 2 + proxy/server/conn.go | 6 ++ proxy/server/conn_pgsql.go | 146 ++++++++++++++++++++-------------- proxy/server/conn_preshard.go | 4 +- proxy/server/conn_query.go | 2 +- proxy/server/server.go | 17 ++-- 7 files changed, 110 insertions(+), 71 deletions(-) diff --git a/backend/balancer.go b/backend/balancer.go index cf935a0..e75d61c 100644 --- a/backend/balancer.go +++ b/backend/balancer.go @@ -195,7 +195,9 @@ func (n *Node) getNextSlaveByCache() []int { if tableCachedNodes != nil && len(tableCachedNodes) > 0 { for _, cacheNode := range tableCachedNodes { for i, db := range n.Slave { - if cacheNode == strings.Split(db.addr, ":")[0] { + //if cacheNode == strings.Split(db.addr, ":")[0] { + // cacheNode ip:port + if cacheNode == db.addr { golog.Debug("balancer", "GetNextSlave", fmt.Sprintf("cacheNode matched addr: %s", cacheNode), 0) indexs = append(indexs, i) } diff --git a/backend/db.go b/backend/db.go index c4dc782..97806a4 100644 --- a/backend/db.go +++ b/backend/db.go @@ -365,6 +365,8 @@ func (db *DB) PopConnPg(dbname string, dbuser string) (*Conn, error) { return nil, err } } else { + db.Lock() + defer db.Unlock() if db.cacheConnsMap == nil { db.cacheConnsMap = make(map[string]chan *Conn) } diff --git a/proxy/server/conn.go b/proxy/server/conn.go index 54c747d..7164028 100644 --- a/proxy/server/conn.go +++ b/proxy/server/conn.go @@ -80,6 +80,12 @@ type ClientConn struct { //TODO Environment variables should be placed in the connection cache, and the variables should be set first each time the statement is executed alwaysCurNode bool // Flag for always exec sql in same node + + // save command sql for extended query + dataSend []byte + + // save receive data from backend, will send to client + dataRecv []byte } const ( diff --git a/proxy/server/conn_pgsql.go b/proxy/server/conn_pgsql.go index 74821e6..452bd95 100644 --- a/proxy/server/conn_pgsql.go +++ b/proxy/server/conn_pgsql.go @@ -102,14 +102,13 @@ func (cc *ClientConn) handshake(ctx context.Context) error { // handle client request through goroutine func (c *ClientConn) RunPg(ctx context.Context) { - dataSend := make([]byte, 0) defer func() { r := recover() if err, ok := r.(error); ok { const size = 4096 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] - golog.Error(moduleName, "Run", + golog.Error(moduleName, "RunPg", err.Error(), 0, "stack", string(buf)) } @@ -157,7 +156,7 @@ func (c *ClientConn) RunPg(ctx context.Context) { } // handle receive msg - if err, dataSend = c.dispatchPg(ctx, header, msg, dataSend); err != nil { + if err = c.dispatchPg(ctx, header, msg); err != nil { c.proxy.counter.IncrErrLogTotal() if err == io.EOF { continue @@ -184,19 +183,19 @@ func (c *ClientConn) RunPg(ctx context.Context) { // It also gets a token from server which is used to limit the concurrently handling clients. // The most frequently used command is ComQuery. // PostgreSQL Modified -func (cc *ClientConn) dispatchPg(ctx context.Context, header []byte, data []byte, dataSend []byte) ( - error, []byte) { +func (cc *ClientConn) dispatchPg(ctx context.Context, header []byte, data []byte) error { cc.proxy.counter.IncrClientQPS() cmd := header[0] var err error + golog.Trace(moduleName, "dispatchPg", "cmd str:" + string(cmd), cc.connectionId) switch cmd { case 'Q': /* simple query */ simpleQuery := pgproto3.Query{} if err := simpleQuery.Decode(data); err != nil { - return err, dataSend + return err } err = cc.handleQueryPg(ctx, simpleQuery.String, append(header, data...)) - return err, dataSend + return err /* extend query protocol, msg send in sequence first phase: @@ -222,7 +221,7 @@ func (cc *ClientConn) dispatchPg(ctx context.Context, header []byte, data []byte case 'P': /* parse */ parse := pgproto3.Parse{} if err := parse.Decode(data); err != nil { - return err, dataSend + return err } sql := parse.Query if cc.backendConn == nil || cc.backendConn.Conn == nil { @@ -231,11 +230,21 @@ func (cc *ClientConn) dispatchPg(ctx context.Context, header []byte, data []byte // TODO but the scheme makes load balance unuseful, no suitable scheme by now. Will keep thinking. cc.status &= ^mysql.SERVER_STATUS_AUTOCOMMIT cc.status |= mysql.SERVER_STATUS_IN_TRANS + // In benchmarkSQL, the extended-query protocol and the simple-query protocol will be used at the same time. + // After the query statement use extended protocol change to the simple protocol, + // when a ReadyForQuery in a non transaction is received, the backend connection will be released, and the previous parse statement will be lost. + // So set alwaysCurNode to true, that continue to use the current connection. + // TODO In future will save parse statement in clientConnection, and init first when get backend connection. + cc.alwaysCurNode = true cc.backendConn, err = cc.preHandlePg(sql, ctx) + if err == nil { + golog.Debug(moduleName, "parse", + fmt.Sprintf("exec sql [%s] by node [%s]", sql, cc.backendConn.GetAddr()), cc.connectionId, "dbname", cc.db) + } } if cc == nil || cc.backendConn == nil || err != nil { golog.Error(moduleName, "RunPg", err.Error(), cc.connectionId) - return nil, dataSend + return nil } // save conn info to map, use for cancel request. @@ -244,65 +253,65 @@ func (cc *ClientConn) dispatchPg(ctx context.Context, header []byte, data []byte clientConnMap.Unlock() // packaging send msg - dataSend = append(dataSend, header...) - dataSend = append(dataSend, data...) + cc.dataSend = append(cc.dataSend, header...) + cc.dataSend = append(cc.dataSend, data...) // save parse name, use for delete parse when retrieve to connection pool if cc.backendConn.ParName == nil { cc.backendConn.ParName = make([]string, 0) } cc.backendConn.ParName = append(cc.backendConn.ParName, parse.Name) - return nil, dataSend + return nil case 'B': /* bind */ - dataSend = append(dataSend, header...) - dataSend = append(dataSend, data...) - return nil, dataSend + cc.dataSend = append(cc.dataSend, header...) + cc.dataSend = append(cc.dataSend, data...) + return nil case 'E': /* execute */ - dataSend = append(dataSend, header...) - dataSend = append(dataSend, data...) - return nil, dataSend + cc.dataSend = append(cc.dataSend, header...) + cc.dataSend = append(cc.dataSend, data...) + return nil case 'F': /* fastpath function call */ case 'C': /* close */ c := pgproto3.Close{} if err := c.Decode(data); err != nil { - return err, dataSend + return err } err = cc.handleStmtClosePg(ctx, c) - return err, dataSend + return err case 'D': /* describe */ - dataSend = append(dataSend, header...) - dataSend = append(dataSend, data...) - return nil, dataSend + cc.dataSend = append(cc.dataSend, header...) + cc.dataSend = append(cc.dataSend, data...) + return nil case 'H': /* flush */ // return cc.flush(ctx) case 'S': /* sync */ - dataSend = append(dataSend, header...) - dataSend = append(dataSend, data...) - err = cc.handleStmtSyncPg(ctx, dataSend) - dataSend = make([]byte, 0) - return err, dataSend + cc.dataSend = append(cc.dataSend, header...) + cc.dataSend = append(cc.dataSend, data...) + err = cc.handleStmtSyncPg(ctx) + cc.dataSend = make([]byte, 0) + return err case 'X': /*Client Terminate*/ - return io.EOF, dataSend + return io.EOF case 'd': /* copy data */ - dataSend = append(dataSend, header...) - dataSend = append(dataSend, data...) - return nil, dataSend + cc.dataSend = append(cc.dataSend, header...) + cc.dataSend = append(cc.dataSend, data...) + return nil case 'c': /* copy done */ - dataSend = append(dataSend, header...) - dataSend = append(dataSend, data...) - err = cc.handleCopy(ctx, dataSend) - dataSend = make([]byte, 0) - return err, dataSend + cc.dataSend = append(cc.dataSend, header...) + cc.dataSend = append(cc.dataSend, data...) + err = cc.handleCopy(ctx) + cc.dataSend = make([]byte, 0) + return err case 'f': /* copy fail */ - dataSend = append(dataSend, header...) - dataSend = append(dataSend, data...) - err = cc.handleCopy(ctx, dataSend) - dataSend = make([]byte, 0) - return err, dataSend + cc.dataSend = append(cc.dataSend, header...) + cc.dataSend = append(cc.dataSend, data...) + err = cc.handleCopy(ctx) + cc.dataSend = make([]byte, 0) + return err default: - return errors.ErrFormat("command %d not supported now", cmd), dataSend + return errors.ErrFormat("command %d not supported now", cmd) } - return errors.ErrFormat("command %d not supported now", cmd), dataSend + return errors.ErrFormat("command %d not supported now", cmd) } // handle simple query protocol @@ -376,30 +385,31 @@ func (cc *ClientConn) handleQueryPg(ctx context.Context, sql string, data []byte // receive server connection msg, add deal with it func (cc *ClientConn) receiveBackendMsg(ctx context.Context) error { - dataRes := make([]byte, 0) readloop: for { data, err := cc.backendConn.Conn.ReadPgPacket() if err != nil { + golog.Error(moduleName, "receiveBackendMsg", fmt.Sprintf("read packet from backend err: %s", err.Error()), cc.connectionId) return err } + golog.Trace(moduleName, "receiveBackendMsg", fmt.Sprintf("recv packet from backend msg type: %s", string(data[0])), cc.connectionId) // deal with copy msg if data[0] == 'G' || data[0] == 'W' { // in transaction cc.status &= ^mysql.SERVER_STATUS_AUTOCOMMIT cc.status |= mysql.SERVER_STATUS_IN_TRANS - dataRes = append(dataRes, data...) - cc.WriteData(dataRes) - dataRes = make([]byte, 0) + cc.dataRecv = append(cc.dataRecv, data...) + cc.WriteData(cc.dataRecv) + cc.dataRecv = make([]byte, 0) break readloop } if data[0] == 'H' { cc.status &= ^mysql.SERVER_STATUS_AUTOCOMMIT cc.status |= mysql.SERVER_STATUS_IN_TRANS - dataRes = append(dataRes, data...) - cc.WriteData(dataRes) - dataRes = make([]byte, 0) + cc.dataRecv = append(cc.dataRecv, data...) + cc.WriteData(cc.dataRecv) + cc.dataRecv = make([]byte, 0) continue } // add new protocol 'L' for read consistency @@ -442,12 +452,15 @@ readloop: } } - dataRes = append(dataRes, data...) - cc.WriteData(dataRes) - dataRes = make([]byte, 0) + cc.dataRecv = append(cc.dataRecv, data...) + cc.WriteData(cc.dataRecv) + cc.dataRecv = make([]byte, 0) break readloop } - dataRes = append(dataRes, data...) + if data[0] == 'E' { + golog.Error(moduleName, "receiveBackendMsg", fmt.Sprintf("read err packet from backend: %s", string(data)), cc.connectionId) + } + cc.dataRecv = append(cc.dataRecv, data...) // TODO At present, all data are returned. // In the future, we need to consider the situation of multiple data. @@ -480,29 +493,40 @@ func (cc *ClientConn) writeCloseComplete() error { return cc.WriteData(closeComplete.Encode(nil)) } -func (cc *ClientConn) handleStmtSyncPg(ctx context.Context, data []byte) error { +func (cc *ClientConn) handleStmtSyncPg(ctx context.Context) error { + cc.Lock() + defer cc.Unlock() + if cc.backendConn == nil || cc.backendConn.Conn == nil { - return nil + golog.Error(moduleName, "handleStmtSyncPg", "backend connection is null, current send data is: " + string(cc.dataSend), cc.connectionId) + var err error + cc.backendConn, err = cc.preHandlePg("select 1;", ctx) + if err != nil { + golog.Error(moduleName, "handleStmtSyncPg", "reconnect backend err: "+err.Error(), cc.connectionId) + return err + } + //return nil } - err := cc.backendConn.Conn.WritePgPacket(data) + err := cc.backendConn.Conn.WritePgPacket(cc.dataSend) if err != nil { - golog.Error("server", "handleStmtSyncPg", "write msg err: "+err.Error(), cc.connectionId) + golog.Error(moduleName, "handleStmtSyncPg", "write msg err: "+err.Error(), cc.connectionId) return err } err = cc.receiveBackendMsg(ctx) if err != nil { + golog.Error(moduleName, "handleStmtSyncPg", "recv backend msg err: "+err.Error(), cc.connectionId) return err } return nil } -func (cc *ClientConn) handleCopy(ctx context.Context, data []byte) error { +func (cc *ClientConn) handleCopy(ctx context.Context) error { if cc.backendConn == nil { return nil } - err := cc.backendConn.Conn.WritePgPacket(data) + err := cc.backendConn.Conn.WritePgPacket(cc.dataSend) if err != nil { golog.Error(moduleName, "handleCopy", "write msg err: "+err.Error(), cc.connectionId) return err diff --git a/proxy/server/conn_preshard.go b/proxy/server/conn_preshard.go index 144d48f..40fae85 100644 --- a/proxy/server/conn_preshard.go +++ b/proxy/server/conn_preshard.go @@ -289,7 +289,9 @@ func (c *ClientConn) GetExecDB(tokens []string, sql string) (*ExecuteDB, error) switch n := stmt.AST.(type) { case *tree.Select, *tree.ShowVar, *tree.ShowZoneConfig: if strings.Contains(strings.ToLower(n.String()), "for update") || - strings.Contains(strings.ToLower(n.String()), "for share") { + strings.Contains(strings.ToLower(n.String()), "for share") || + strings.Contains(strings.ToLower(n.String()), "setval") || + strings.Contains(strings.ToLower(n.String()), "nextval") { continue } // TODO Just get first table so far diff --git a/proxy/server/conn_query.go b/proxy/server/conn_query.go index 9dc0820..06220f1 100644 --- a/proxy/server/conn_query.go +++ b/proxy/server/conn_query.go @@ -342,10 +342,10 @@ func (c *ClientConn) closeConn(conn *backend.BackendConn, rollback bool) { if c.isInTransaction() { return } - defer conn.Close() if rollback { conn.Rollback() } + conn.Close() } func (c *ClientConn) closeShardConns(conns map[string]*backend.BackendConn, rollback bool) { diff --git a/proxy/server/server.go b/proxy/server/server.go index 2a79a0f..206e426 100644 --- a/proxy/server/server.go +++ b/proxy/server/server.go @@ -368,6 +368,8 @@ func (s *Server) newClientConn(co net.Conn) *ClientConn { c.beginFlag = BEGIN_UNSTART c.alwaysCurNode = false + c.dataSend = make([]byte, 0) + c.dataRecv = make([]byte, 0) return c } @@ -400,7 +402,7 @@ func (s *Server) onConn(c net.Conn, dbType string) { return } - if dbType == "mysql" { + if dbType == config.Mysql { if err := conn.Handshake(); err != nil { golog.Error("server", "onConn", err.Error(), 0) conn.writeError(err) @@ -411,7 +413,7 @@ func (s *Server) onConn(c net.Conn, dbType string) { conn.schema = s.GetSchema(conn.user) conn.Run() - } else if dbType == "postgresql" { + } else { ctx := context.Background() if err := conn.handshake(ctx); err != nil { @@ -428,12 +430,13 @@ func (s *Server) onConn(c net.Conn, dbType string) { } conn.RunPg(ctx) - } else { - golog.Error("server", "onConn", dbType+"this db type is not support", 0) - conn.writeError(errors.ErrUnsupportDbType) - conn.Close() - return } + //else { + // golog.Error("server", "onConn", dbType+"this db type is not support", 0) + // conn.writeError(errors.ErrUnsupportDbType) + // conn.Close() + // return + //} } func (s *Server) ChangeProxy(v string) error { -- Gitee From e3f6db3156564303e83026c435a169ebe52899d2 Mon Sep 17 00:00:00 2001 From: wangyao Date: Fri, 21 Oct 2022 19:32:16 +0800 Subject: [PATCH 2/4] improve: extended query for insert --- backend/balancer.go | 23 ++++---- backend/db.go | 39 +++++++------- backend/node.go | 9 ++-- proxy/server/conn.go | 6 ++- proxy/server/conn_pgsql.go | 98 +++++++++++++++++++++-------------- proxy/server/conn_preshard.go | 7 +++ proxy/server/conn_query.go | 14 +++++ proxy/server/server.go | 1 + 8 files changed, 121 insertions(+), 76 deletions(-) diff --git a/backend/balancer.go b/backend/balancer.go index e75d61c..80108dc 100644 --- a/backend/balancer.go +++ b/backend/balancer.go @@ -113,24 +113,29 @@ func (n *Node) GetNextSlave() (*DB, error) { var masterLsn uint64 golog.Debug("balancer", "GetNextSlave", fmt.Sprintf("Node Cache Key: [%s]", n.NodeCacheKey), 0) - masterLsn = n.NodeLsn[n.NodeCacheKey] - if masterLsn == 0 { - masterLsn = n.NodeLsn[strings.Split(n.Master.addr, ":")[0]] + val, flag := n.NodeLsn.Load(n.NodeCacheKey) + if !flag { + val, _ = n.NodeLsn.Load(strings.Split(n.Master.addr, ":")[0]) + masterLsn = val.(uint64) // parse string LSN to uint64 golog.Debug("balancer", "GetNextSlave", fmt.Sprintf("master node addr: [%s], 10hex lsn: [%d]", n.Master.addr, masterLsn), 0) - }else { + } else { + masterLsn = val.(uint64) golog.Debug("balancer", "GetNextSlave", fmt.Sprintf("current use table [%s] latest LSN, 10hex lsn: [%d]", n.NodeCacheKey, masterLsn), 0) } // save standby node index where LSN meet the conditions lsnSlice := make([]int, 0, 0) for i, db := range n.Slave { - parseUint := n.NodeLsn[strings.Split(db.addr, ":")[0]] - golog.Debug("balancer", "GetNextSlave", - fmt.Sprintf("slave node index: [%d], addr: [%s], 10hex lsn: [%d]", i, db.addr, parseUint), 0) - if parseUint >= masterLsn { - lsnSlice = append(lsnSlice, i) + val, flag = n.NodeLsn.Load(strings.Split(db.addr, ":")[0]) + if flag { + parseUint := val.(uint64) + golog.Debug("balancer", "GetNextSlave", + fmt.Sprintf("slave node index: [%d], addr: [%s], 10hex lsn: [%d]", i, db.addr, parseUint), 0) + if parseUint >= masterLsn { + lsnSlice = append(lsnSlice, i) + } } } // Stores the subscript of the last selected node diff --git a/backend/db.go b/backend/db.go index 97806a4..278ce19 100644 --- a/backend/db.go +++ b/backend/db.go @@ -21,8 +21,6 @@ import ( "sync/atomic" "time" - "github.com/jackc/pgproto3/v2" - "gitee.com/he3db/he3proxy/config" "gitee.com/he3db/he3proxy/core/golog" @@ -415,7 +413,7 @@ func (db *DB) PopConnPg(dbname string, dbuser string) (*Conn, error) { //else { co = db.GetConnFromCache(cacheConns) if co == nil { - golog.Debug("db", "PopConnPg", "conn is nil", 0) + golog.Warn("db", "PopConnPg", "conn is nil", 0) // 需要判断是否大于最大链接数 if db.IsExceedMaxConns() { err = errors.ErrConnIsFull @@ -589,7 +587,6 @@ func (db *DB) PushConn(co *Conn, err error) { type BackendConn struct { *Conn db *DB - ParName []string //parse 通道name IsInTransaction bool // 是否在事务中 } @@ -599,21 +596,21 @@ func (p *BackendConn) Close() { if p.Conn.pkgErr != nil || p.IsInTransaction { p.db.closeConn(p.Conn) } else { - var data []byte - if p.ParName != nil { - for _, name := range p.ParName { - // 'S'关闭一个准备的语句;或者'P'关闭一个入口。 - // 结合之前pgx链接报错 prepared statement "lrupsc_1_0" already exists (SQLSTATE 42P05) - // 因此这里应该使用S 进行关闭 - c := pgproto3.Close{'S', name} - data = c.Encode(data) - //c1 := pgproto3.Close{'P', name} - //data = c1.Encode(data) - } - f := pgproto3.Flush{} - data = f.Encode(data) - p.ParName = nil - } + //var data []byte + //if p.ParName != nil { + // for _, name := range p.ParName { + // // 'S'关闭一个准备的语句;或者'P'关闭一个入口。 + // // 结合之前pgx链接报错 prepared statement "lrupsc_1_0" already exists (SQLSTATE 42P05) + // // 因此这里应该使用S 进行关闭 + // c := pgproto3.Close{'S', name} + // data = c.Encode(data) + // //c1 := pgproto3.Close{'P', name} + // //data = c1.Encode(data) + // } + // f := pgproto3.Flush{} + // data = f.Encode(data) + // p.ParName = nil + //} //reset := pgproto3.Query{"RESET ALL"} //data = reset.Encode(data) //err := p.Conn.WritePgPacket(data) @@ -644,7 +641,7 @@ func (db *DB) GetConn() (*BackendConn, error) { if err != nil { return nil, err } - return &BackendConn{c, db, nil, false}, nil + return &BackendConn{c, db, false}, nil } func (db *DB) GetConnPg(dbname string, dbuser string) (*BackendConn, error) { @@ -652,7 +649,7 @@ func (db *DB) GetConnPg(dbname string, dbuser string) (*BackendConn, error) { if err != nil { return nil, err } - return &BackendConn{c, db, nil, false}, nil + return &BackendConn{c, db, false}, nil } func (db *DB) SetLastPing() { diff --git a/backend/node.go b/backend/node.go index 2307243..b1d1243 100644 --- a/backend/node.go +++ b/backend/node.go @@ -55,7 +55,7 @@ type Node struct { BestNodeIndexByMetric int // save node-lsn relationship - NodeLsn map[string]uint64 + NodeLsn sync.Map // Node load score NodeLoad map[int]int @@ -467,9 +467,6 @@ func (n *Node) getLsnAndCacheMetadata(conn *Conn, masterStr string) { if er != nil { golog.Error("node", "ParseMaster", fmt.Sprintf("Query pg_stat_replication failed: %s", er.Error()), 0) } - if n.NodeLsn == nil { - n.NodeLsn = make(map[string]uint64) - } var addr string var lsn string for rows.Next() { @@ -482,7 +479,7 @@ func (n *Node) getLsnAndCacheMetadata(conn *Conn, masterStr string) { golog.Error("node", "ParseMaster", fmt.Sprintf("slave node parse LSN 16hex to 10hex err : %s, addr: %s", err.Error(), addr), 0) } else { - n.NodeLsn[strings.Split(addr, "/")[0]] = val + n.NodeLsn.Store(strings.Split(addr, "/")[0], val) golog.Trace("node", "ParseMaster", fmt.Sprintf("slave node parse LSN, addr: %s, lsn 16hex: %s, lsn 10hex: %d ", addr, lsn, val), 0) } @@ -496,7 +493,7 @@ func (n *Node) getLsnAndCacheMetadata(conn *Conn, masterStr string) { golog.Error("node", "ParseMaster", fmt.Sprintf("master node parse LSN 16hex to 10hex err : %s, addr: %s", err.Error(), masterStr), 0) } else { - n.NodeLsn[strings.Split(masterStr, ":")[0]] = val + n.NodeLsn.Store(strings.Split(masterStr, ":")[0], val) golog.Trace("node", "ParseMaster", fmt.Sprintf("master node parse LSN, addr: %s, lsn 16hex: %s , lsn 10hex: %d", masterStr, lsn, val), 0) } diff --git a/proxy/server/conn.go b/proxy/server/conn.go index 7164028..406e2c9 100644 --- a/proxy/server/conn.go +++ b/proxy/server/conn.go @@ -86,6 +86,11 @@ type ClientConn struct { // save receive data from backend, will send to client dataRecv []byte + + // flag for use extended query protocol + parseFlag bool + + Parse sync.Map //parse 通道name及parse信息 } const ( @@ -165,7 +170,6 @@ func (c *ClientConn) Close() error { c.c.Close() c.backendConn.Close() - c.closed = true return nil diff --git a/proxy/server/conn_pgsql.go b/proxy/server/conn_pgsql.go index 452bd95..d4b3a51 100644 --- a/proxy/server/conn_pgsql.go +++ b/proxy/server/conn_pgsql.go @@ -50,10 +50,7 @@ const protocolSSL = false const defaultWriterSize = 16 * 1024 const moduleName = "CONN_PGSQL" -var clientConnMap = struct { - sync.RWMutex - m map[uint32]*ClientConn -}{m: make(map[uint32]*ClientConn, 0)} +var clientConnMap sync.Map //----------------------------------------------------------------- // Handshake of PG @@ -76,19 +73,20 @@ func (cc *ClientConn) handshake(ctx context.Context) error { switch m.(type) { // handle for cancel request (ctrl+c) case *pgproto3.CancelRequest: - clientConnMap.RLock() - c := clientConnMap.m[m.(*pgproto3.CancelRequest).ProcessID] - clientConnMap.RUnlock() - if c != nil && c.backendConn != nil && c.backendConn.ConnPg.PgConn() != nil { - cancelRequest := &pgproto3.CancelRequest{ProcessID: c.backendConn.ConnPg.PgConn().PID(), - SecretKey: c.backendConn.ConnPg.PgConn().SecretKey()} - err = c.backendConn.Conn.WritePgPacket(cancelRequest.Encode(nil)) - if err != nil { - golog.Error(moduleName, "CancelRequest", "write msg err: "+err.Error(), cc.connectionId) - return err + val, flag := clientConnMap.Load(m.(*pgproto3.CancelRequest).ProcessID) + if flag { + c := val.(*ClientConn) + if c != nil && c.backendConn != nil && c.backendConn.ConnPg.PgConn() != nil { + cancelRequest := &pgproto3.CancelRequest{ProcessID: c.backendConn.ConnPg.PgConn().PID(), + SecretKey: c.backendConn.ConnPg.PgConn().SecretKey()} + err = c.backendConn.Conn.WritePgPacket(cancelRequest.Encode(nil)) + if err != nil { + golog.Error(moduleName, "CancelRequest", "write msg err: "+err.Error(), cc.connectionId) + return err + } } + cc.Close() } - cc.Close() return err case *pgproto3.SSLRequest: return cc.handleSSLRequest(ctx) @@ -122,9 +120,7 @@ func (c *ClientConn) RunPg(ctx context.Context) { } c.clean() c.Close() - clientConnMap.Lock() - delete(clientConnMap.m, c.connectionId) - clientConnMap.Unlock() + clientConnMap.Delete(c.connectionId) }() // flag for just use master node, just use for some special cases // treat it simple and crude, set in transaction @@ -187,7 +183,7 @@ func (cc *ClientConn) dispatchPg(ctx context.Context, header []byte, data []byte cc.proxy.counter.IncrClientQPS() cmd := header[0] var err error - golog.Trace(moduleName, "dispatchPg", "cmd str:" + string(cmd), cc.connectionId) + golog.Trace(moduleName, "dispatchPg", "cmd str:"+string(cmd), cc.connectionId) switch cmd { case 'Q': /* simple query */ simpleQuery := pgproto3.Query{} @@ -223,24 +219,30 @@ func (cc *ClientConn) dispatchPg(ctx context.Context, header []byte, data []byte if err := parse.Decode(data); err != nil { return err } + // save parse name, use for delete parse when retrieve to connection pool + cc.Parse.Store(parse.Name, parse) sql := parse.Query if cc.backendConn == nil || cc.backendConn.Conn == nil { // parse phase will reuse connect session, if exec select first than exec insert will get an error. // we use temporary scheme to fix it, set session in transaction, so session will choose master node. // TODO but the scheme makes load balance unuseful, no suitable scheme by now. Will keep thinking. - cc.status &= ^mysql.SERVER_STATUS_AUTOCOMMIT - cc.status |= mysql.SERVER_STATUS_IN_TRANS + //cc.status &= ^mysql.SERVER_STATUS_AUTOCOMMIT + //cc.status |= mysql.SERVER_STATUS_IN_TRANS // In benchmarkSQL, the extended-query protocol and the simple-query protocol will be used at the same time. // After the query statement use extended protocol change to the simple protocol, // when a ReadyForQuery in a non transaction is received, the backend connection will be released, and the previous parse statement will be lost. // So set alwaysCurNode to true, that continue to use the current connection. // TODO In future will save parse statement in clientConnection, and init first when get backend connection. - cc.alwaysCurNode = true + //cc.alwaysCurNode = true + cc.parseFlag = true cc.backendConn, err = cc.preHandlePg(sql, ctx) if err == nil { golog.Debug(moduleName, "parse", fmt.Sprintf("exec sql [%s] by node [%s]", sql, cc.backendConn.GetAddr()), cc.connectionId, "dbname", cc.db) } + if cc != nil && cc.backendConn != nil { + clientConnMap.Store(cc.connectionId, cc) + } } if cc == nil || cc.backendConn == nil || err != nil { golog.Error(moduleName, "RunPg", err.Error(), cc.connectionId) @@ -248,18 +250,11 @@ func (cc *ClientConn) dispatchPg(ctx context.Context, header []byte, data []byte } // save conn info to map, use for cancel request. - clientConnMap.Lock() - clientConnMap.m[cc.connectionId] = cc - clientConnMap.Unlock() + clientConnMap.Store(cc.connectionId, cc) // packaging send msg cc.dataSend = append(cc.dataSend, header...) cc.dataSend = append(cc.dataSend, data...) - // save parse name, use for delete parse when retrieve to connection pool - if cc.backendConn.ParName == nil { - cc.backendConn.ParName = make([]string, 0) - } - cc.backendConn.ParName = append(cc.backendConn.ParName, parse.Name) return nil case 'B': /* bind */ cc.dataSend = append(cc.dataSend, header...) @@ -314,6 +309,21 @@ func (cc *ClientConn) dispatchPg(ctx context.Context, header []byte, data []byte return errors.ErrFormat("command %d not supported now", cmd) } +func (cc *ClientConn) handleParsePrepare(ctx context.Context) error { + var parseData []byte + cc.Parse.Range(func(key, value interface{}) bool { + parse := (value).(pgproto3.Parse) + parseData = parse.Encode(parseData) + return true + }) + golog.Debug(moduleName, "handleParsePrepare", fmt.Sprintf("write cached parse data is: %s", string(parseData)), cc.connectionId) + err := cc.backendConn.Conn.WritePgPacket(parseData) + if err != nil { + golog.Error(moduleName, "handleParsePrepare", fmt.Sprintf("write parse to connection err: %s", err.Error()), cc.connectionId) + } + return err +} + // handle simple query protocol func (cc *ClientConn) handleQueryPg(ctx context.Context, sql string, data []byte) error { @@ -326,9 +336,8 @@ func (cc *ClientConn) handleQueryPg(ctx context.Context, sql string, data []byte } if cc != nil && cc.backendConn != nil { - clientConnMap.Lock() - clientConnMap.m[cc.connectionId] = cc - clientConnMap.Unlock() + clientConnMap.Store(cc.connectionId, cc) + cc.handleParsePrepare(ctx) } } else { // change status, use for load balance in begin statement.(select first and than insert/update/delete) @@ -420,11 +429,11 @@ readloop: golog.Debug("pg conn", "receiveBackendMsg", fmt.Sprintf("addr: %s, lsn: %d", addr, lsn.LSN), cc.connectionId) // set LSN to node if addr != "" { - cc.nodes["node1"].NodeLsn[strings.Split(addr, ":")[0]] = lsn.LSN + cc.nodes["node1"].NodeLsn.Store(strings.Split(addr, ":")[0], lsn.LSN) } // set LSN to db_table if cc.table != "" && cc.db != "" { - cc.nodes["node1"].NodeLsn[cc.db+"_"+cc.table] = lsn.LSN + cc.nodes["node1"].NodeLsn.Store(cc.db+"_"+cc.table, lsn.LSN) } continue } @@ -478,6 +487,11 @@ func (cc *ClientConn) handleStmtClosePg(ctx context.Context, close pgproto3.Clos if cc.backendConn == nil { return nil } + // Delete Parse record if closed. + cc.Parse.Load(close.Name) + if _,flag := cc.Parse.Load(close.Name); flag { + cc.Parse.Delete(close.Name) + } data := close.Encode(nil) err := cc.backendConn.Conn.WritePgPacket(data) if err != nil { @@ -494,17 +508,23 @@ func (cc *ClientConn) writeCloseComplete() error { } func (cc *ClientConn) handleStmtSyncPg(ctx context.Context) error { - cc.Lock() - defer cc.Unlock() + + defer cc.closeConn(cc.backendConn, false) if cc.backendConn == nil || cc.backendConn.Conn == nil { - golog.Error(moduleName, "handleStmtSyncPg", "backend connection is null, current send data is: " + string(cc.dataSend), cc.connectionId) + golog.Warn(moduleName, "handleStmtSyncPg", "backend connection is null, current send data is: "+string(cc.dataSend), cc.connectionId) var err error - cc.backendConn, err = cc.preHandlePg("select 1;", ctx) + // TODO + cc.backendConn, err = cc.preHandlePg("begin;", ctx) + if err != nil { golog.Error(moduleName, "handleStmtSyncPg", "reconnect backend err: "+err.Error(), cc.connectionId) return err } + if cc != nil && cc.backendConn != nil { + clientConnMap.Store(cc.connectionId, cc) + cc.handleParsePrepare(ctx) + } //return nil } err := cc.backendConn.Conn.WritePgPacket(cc.dataSend) diff --git a/proxy/server/conn_preshard.go b/proxy/server/conn_preshard.go index 40fae85..6576fd2 100644 --- a/proxy/server/conn_preshard.go +++ b/proxy/server/conn_preshard.go @@ -228,6 +228,8 @@ func (c *ClientConn) GetTransExecDB(tokens []string, sql string) (*ExecuteDB, er //if sql need shard return nil, else return the unshard db func (c *ClientConn) GetExecDB(tokens []string, sql string) (*ExecuteDB, error) { + golog.Debug("conn_preshard", "GetExecDB", + fmt.Sprintf("tokens: [%v], sql: [%s]", tokens, sql), c.connectionId) tokensLen := len(tokens) // The logic of PG node selection is processed. // Since there is no need to divide the database and table, the original kingshard logic is removed. @@ -312,6 +314,11 @@ func (c *ClientConn) GetExecDB(tokens []string, sql string) (*ExecuteDB, error) } continue case *tree.BeginTransaction: + if c.parseFlag { + c.status &= ^mysql.SERVER_STATUS_AUTOCOMMIT + c.status |= mysql.SERVER_STATUS_IN_TRANS + continue + } // replica node also can exec begin c.beginFlag = BEGIN_PRESTART return c.getSelectExecDB(sql, tokens, tokensLen) diff --git a/proxy/server/conn_query.go b/proxy/server/conn_query.go index 06220f1..54e956c 100644 --- a/proxy/server/conn_query.go +++ b/proxy/server/conn_query.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "github.com/jackc/pgproto3/v2" + "gitee.com/he3db/he3proxy/backend" "gitee.com/he3db/he3proxy/core/errors" "gitee.com/he3db/he3proxy/core/golog" @@ -345,6 +347,18 @@ func (c *ClientConn) closeConn(conn *backend.BackendConn, rollback bool) { if rollback { conn.Rollback() } + + // close prepare statement and clean parseData + parseData := make([]byte, 0) + c.Parse.Range(func(key, value interface{}) bool { + parse := (value).(pgproto3.Parse) + cl := pgproto3.Close{ObjectType: 'S', Name: parse.Name} + parseData = cl.Encode(parseData) + return true + }) + golog.Debug(moduleName, "RunPg", fmt.Sprintf("close prepare statement: [%s]", string(parseData)), c.connectionId) + c.backendConn.Conn.WritePgPacket(parseData) + conn.Close() } diff --git a/proxy/server/server.go b/proxy/server/server.go index 206e426..eec9af4 100644 --- a/proxy/server/server.go +++ b/proxy/server/server.go @@ -370,6 +370,7 @@ func (s *Server) newClientConn(co net.Conn) *ClientConn { c.alwaysCurNode = false c.dataSend = make([]byte, 0) c.dataRecv = make([]byte, 0) + c.parseFlag = false return c } -- Gitee From 9593e562c41655d2318b5086bcb425dc65550d7b Mon Sep 17 00:00:00 2001 From: wangyao Date: Mon, 24 Oct 2022 19:19:23 +0800 Subject: [PATCH 3/4] improve concurrent-map --- backend/balancer.go | 25 ++++++----- backend/db.go | 85 +++++++++++++++++++++----------------- backend/node.go | 74 +++++++++++++++++++-------------- cmd/he3proxy/main.go | 2 +- config/config.go | 18 ++++---- proxy/server/conn_pgsql.go | 2 +- proxy/server/conn_query.go | 2 +- 7 files changed, 117 insertions(+), 91 deletions(-) diff --git a/backend/balancer.go b/backend/balancer.go index 80108dc..bb7d281 100644 --- a/backend/balancer.go +++ b/backend/balancer.go @@ -195,20 +195,23 @@ func (n *Node) GetNextSlave() (*DB, error) { func (n *Node) getNextSlaveByCache() []int { indexs := make([]int, 0) - tableCachedNodes := n.NodeCache[n.NodeCacheKey] - golog.Debug("balancer", "GetNextSlave", fmt.Sprintf("table cached key:%s, nodes: %s", n.NodeCacheKey, tableCachedNodes), 0) - if tableCachedNodes != nil && len(tableCachedNodes) > 0 { - for _, cacheNode := range tableCachedNodes { - for i, db := range n.Slave { - //if cacheNode == strings.Split(db.addr, ":")[0] { - // cacheNode ip:port - if cacheNode == db.addr { - golog.Debug("balancer", "GetNextSlave", fmt.Sprintf("cacheNode matched addr: %s", cacheNode), 0) - indexs = append(indexs, i) + val, flag := n.NodeCache.Load(n.NodeCacheKey) + if flag { + tableCachedNodes := val.([]string) + golog.Debug("balancer", "GetNextSlave", fmt.Sprintf("table cached key:%s, nodes: %s", n.NodeCacheKey, tableCachedNodes), 0) + if tableCachedNodes != nil && len(tableCachedNodes) > 0 { + for _, cacheNode := range tableCachedNodes { + for i, db := range n.Slave { + //if cacheNode == strings.Split(db.addr, ":")[0] { + // cacheNode ip:port + if cacheNode == db.addr { + golog.Debug("balancer", "GetNextSlave", fmt.Sprintf("cacheNode matched addr: %s", cacheNode), 0) + indexs = append(indexs, i) + } } } + golog.Debug("balancer", "GetNextSlave", fmt.Sprintf("cacheNode matched indexs: %v", indexs), 0) } - golog.Debug("balancer", "GetNextSlave", fmt.Sprintf("cacheNode matched indexs: %v", indexs), 0) } return indexs } diff --git a/backend/db.go b/backend/db.go index 278ce19..28e5b59 100644 --- a/backend/db.go +++ b/backend/db.go @@ -60,9 +60,10 @@ type DB struct { popConnCount int64 //pg - cacheConnsMap map[string]chan *Conn + cacheConnsMap sync.Map + cacheCountNum int32 currConnCount int64 //当前总链接数 - maxPoolNum int //支持的最大db链接池 + maxPoolNum int32 //支持的最大db链接池 } func Open(addr string, user string, password string, dbName string, maxConnNum int) (*DB, error) { @@ -255,16 +256,21 @@ func (db *DB) closeConn(co *Conn) error { // If the user link is not cached in the connection pool, it will be released directly. // If the link is cached, the current link will be released and a new link will be generated to join the pool. // The number of connections in the maintenance pool will remain unchanged - conns := db.cacheConnsMap[db.user+db.db] - if conns == nil || len(conns) == db.InitConnNum { + val, flag := db.cacheConnsMap.Load(db.user+db.db) + if !flag { atomic.AddInt64(&db.currConnCount, -1) - } else { - conn, err := db.newConn(db.user) - if err != nil { - return nil + }else { + conns := val.(chan *Conn) + if len(conns) == db.InitConnNum { + atomic.AddInt64(&db.currConnCount, -1) + } else { + conn, err := db.newConn(db.user) + if err != nil { + return nil + } + conns <- conn + atomic.AddInt64(&db.pushConnCount, 1) } - conns <- conn - atomic.AddInt64(&db.pushConnCount, 1) } co.Close() } @@ -363,19 +369,16 @@ func (db *DB) PopConnPg(dbname string, dbuser string) (*Conn, error) { return nil, err } } else { + var cacheConns chan *Conn db.Lock() - defer db.Unlock() - if db.cacheConnsMap == nil { - db.cacheConnsMap = make(map[string]chan *Conn) - } - cacheConns := db.cacheConnsMap[dbuser+dbname] - if cacheConns == nil { + val, flag := db.cacheConnsMap.Load(dbuser+dbname) + if !flag { if db.IsExceedMaxConns() { err = errors.ErrConnIsFull return nil, err } //判断是否超过最大链接池数量 - if len(db.cacheConnsMap) >= db.maxPoolNum { + if db.cacheCountNum >= db.maxPoolNum { err = errors.ErrMaxPoolIsFull return nil, err } @@ -398,7 +401,8 @@ func (db *DB) PopConnPg(dbname string, dbuser string) (*Conn, error) { atomic.AddInt64(&db.pushConnCount, 1) atomic.AddInt64(&db.currConnCount, 1) } - db.cacheConnsMap[dbuser+dbname] = cacheConns + db.cacheConnsMap.Store(dbuser+dbname, cacheConns) + atomic.AddInt32(&db.cacheCountNum, 1) //}() //返回一个链接 @@ -409,8 +413,11 @@ func (db *DB) PopConnPg(dbname string, dbuser string) (*Conn, error) { //} //atomic.AddInt64(&db.pushConnCount, 1) //atomic.AddInt64(&db.currConnCount, 1) + }else { + cacheConns = val.(chan *Conn) } - //else { + db.Unlock() + co = db.GetConnFromCache(cacheConns) if co == nil { golog.Warn("db", "PopConnPg", "conn is nil", 0) @@ -428,7 +435,6 @@ func (db *DB) PopConnPg(dbname string, dbuser string) (*Conn, error) { atomic.AddInt64(&db.pushConnCount, 1) atomic.AddInt64(&db.currConnCount, 1) } - //} } //TODO 事物处理 @@ -511,16 +517,15 @@ func (db *DB) PushConnForExtendedProtocol(co *Conn, err error) { if co == nil { return } - db.RLock() - conns := db.cacheConnsMap[db.user+db.db] - db.RUnlock() + val, flag := db.cacheConnsMap.Load(db.user+db.db) // 因后端链接未销毁 到时扩展协议时报错: prepared statement "lrupsc_1_0" already exists (SQLSTATE 42P05) // 如果成功创建了一个命名的预备语句对象,那么它将持续到当前会话结束, 除非被明确地删除 (暂未实现 需要采用close命令关闭),现在是直接关闭了链接 // http://www.postgres.cn/docs/14/protocol-message-formats.html co.Close() - if conns == nil || err != nil { + if !flag || err != nil { return } + conns := val.(chan *Conn) co.pushTimestamp = time.Now().Unix() co, err = db.newConn("") if err != nil { @@ -564,22 +569,28 @@ func (db *DB) PushConn(co *Conn, err error) { if co == nil { return } - db.RLock() - conns := db.cacheConnsMap[db.user+db.db] - db.RUnlock() - if conns == nil || err != nil || len(conns) == db.InitConnNum { - co.Close() - atomic.AddInt64(&db.currConnCount, -1) - return - } - co.pushTimestamp = time.Now().Unix() - select { - case conns <- co: - return - default: + val, flag := db.cacheConnsMap.Load(db.user+db.db) + if !flag || err != nil { co.Close() atomic.AddInt64(&db.currConnCount, -1) return + }else { + conns := val.(chan *Conn) + if len(conns) == db.InitConnNum { + co.Close() + atomic.AddInt64(&db.currConnCount, -1) + return + }else { + co.pushTimestamp = time.Now().Unix() + select { + case conns <- co: + return + default: + co.Close() + atomic.AddInt64(&db.currConnCount, -1) + return + } + } } } } diff --git a/backend/node.go b/backend/node.go index b1d1243..5f9ab61 100644 --- a/backend/node.go +++ b/backend/node.go @@ -61,7 +61,7 @@ type Node struct { NodeLoad map[int]int // save node-cached table relationship - NodeCache map[string][]string + NodeCache sync.Map // db_table NodeCacheKey string @@ -115,17 +115,17 @@ func (n *Node) checkMasterConnsCache(min int) { //回收空闲超时链接 func dealConnsCache(db *DB, min int) { - connsMap := db.cacheConnsMap - if connsMap != nil { - for dbName, conns := range connsMap { + if db.cacheCountNum != 0 { + db.cacheConnsMap.Range(func(key, value interface{}) bool { + conns := value.(chan *Conn) golog.Debug("Node", "checkMasterConnsCache", " clean conn cache while timeout ", 0, - "dbName", dbName, "currentConnSize", len(conns), "db", db.Addr(), + "dbName", key, "currentConnSize", len(conns), "db", db.Addr(), "initConnsSize", db.InitConnNum, "currConnCount", db.currConnCount) if conns != nil && len(conns) >= db.InitConnNum { if conn, ok := <-conns; ok { if conn == nil { - return + return true } //TODO 是否遍历取最短的? connMinTime := time.Now().Unix() - conn.pushTimestamp @@ -133,22 +133,24 @@ func dealConnsCache(db *DB, min int) { " clean conn cache while timeout ", 0, "minconnTime", connMinTime) if connMinTime/60 >= int64(min) { //超时清空 先关闭channel通道 逐个关闭链接再置空 - close(connsMap[dbName]) + close(conns) for { - co, ok := <-connsMap[dbName] + co, ok := <-conns if !ok { break } co.Close() atomic.AddInt64(&db.currConnCount, -1) } - delete(connsMap, dbName) + db.cacheConnsMap.Delete(key) + atomic.AddInt32(&db.cacheCountNum, -1) } else { - connsMap[dbName] <- conn + conns <- conn } } } - } + return true + }) } } @@ -238,6 +240,14 @@ func (n *Node) checkMaster() { if atomic.LoadInt32(&(db.state)) != ManualDown { atomic.StoreInt32(&(db.state), Up) } + // loop for system_table pg_stat_replication & pg_hot_data + if config.Mysql != os.Getenv(config.DbTypeEnv) { + // get master and read-only nodes latest LSN in loop + // get cached tables relationship with nodes in loop + if n.Cfg.LoadBalanceMode == "lsn" || n.Cfg.LoadBalanceMode == "cache" { + go n.getLsnAndCacheMetadata(n.Master.checkConn, n.Cfg.Master) + } + } return } @@ -451,16 +461,18 @@ func (n *Node) DownSlave(addr string, state int32) error { // get data from db in loop func (n *Node) getLsnAndCacheMetadata(conn *Conn, masterStr string) { - //var err error + var err error for { - //n.Master, err = n.OpenDB(masterStr) - //if err != nil { - // golog.Error("node", "getLsnAndCacheMetadata", fmt.Sprintf("open master db err : %s, addr: %s", - // err.Error(), masterStr), 0) - // time.Sleep(time.Duration(n.Cfg.LsnCacheReqInterval) * time.Second) - // continue - //} - //conn := db.checkConn + if conn == nil || conn.ConnPg.Ping(context.Background()) != nil { + n.Master, err = n.OpenDB(masterStr) + if err != nil { + golog.Error("node", "getLsnAndCacheMetadata", fmt.Sprintf("open master db err : %s, addr: %s", + err.Error(), masterStr), 0) + time.Sleep(time.Duration(n.Cfg.LsnCacheReqInterval) * time.Second) + continue + } + conn = n.Master.checkConn + } // init read-only node's LSN rows, er := conn.ConnPg.Query(context.Background(), "select client_addr::text, replay_lsn from pg_stat_replication;") @@ -503,7 +515,11 @@ func (n *Node) getLsnAndCacheMetadata(conn *Conn, masterStr string) { if er != nil { golog.Error("node", "ParseMaster", fmt.Sprintf("Query pg_hot_data failed: %s", er.Error()), 0) } - n.NodeCache = make(map[string][]string) + // Deal with `pg_hot_data` delete refresh, delete all data and restore + n.NodeCache.Range(func(key, value interface{}) bool { + n.NodeCache.Delete(key) + return true + }) var datname string var relname string var clientaddr string @@ -514,12 +530,15 @@ func (n *Node) getLsnAndCacheMetadata(conn *Conn, masterStr string) { break } golog.Trace("node", "ParseMaster", fmt.Sprintf("datname: %s, relname: %s, client_addr: %s", datname, relname, clientaddr), 0) - nodeCacheVal := n.NodeCache[datname+"_"+relname] - if nodeCacheVal == nil { + val, flag := n.NodeCache.Load(datname + "_" + relname) + var nodeCacheVal []string + if !flag { nodeCacheVal = make([]string, 0) + } else { + nodeCacheVal = val.([]string) } nodeCacheVal = append(nodeCacheVal, clientaddr) - n.NodeCache[datname+"_"+relname] = nodeCacheVal + n.NodeCache.LoadOrStore(datname+"_"+relname, nodeCacheVal) } time.Sleep(time.Duration(n.Cfg.LsnCacheReqInterval) * time.Second) @@ -533,13 +552,6 @@ func (n *Node) ParseMaster(masterStr string) error { var err error n.Master, err = n.OpenDB(masterStr) - if config.Mysql != os.Getenv(config.DbTypeEnv){ - // get master and read-only nodes latest LSN in loop - // get cached tables relationship with nodes in loop - if n.Cfg.LoadBalanceMode == "lsn" || n.Cfg.LoadBalanceMode == "cache" { - go n.getLsnAndCacheMetadata(n.Master.checkConn, masterStr) - } - } return err } diff --git a/cmd/he3proxy/main.go b/cmd/he3proxy/main.go index 06c2c43..6ba0e73 100644 --- a/cmd/he3proxy/main.go +++ b/cmd/he3proxy/main.go @@ -110,7 +110,7 @@ func main() { pyroscope.Start(pyroscope.Config{ ApplicationName: "He3Proxy", ServerAddress: pyroscopeAddr, - Logger: pyroscope.StandardLogger, + Logger: nil, Tags: map[string]string{"region": os.Getenv("REGION")}, // optionally, if authentication is enabled, specify the API key: diff --git a/config/config.go b/config/config.go index 7c52352..793d3ff 100644 --- a/config/config.go +++ b/config/config.go @@ -77,21 +77,21 @@ type NodeConfig struct { // pg connect pool max idle time. unit: min ConnMaxIdleTime int `yaml:"max_conns_idle_time"` // The maximum number of link pools can support several dB library pairs of link pools at the same time - MaxPoolNum int `yaml:"max_pool_num"` + MaxPoolNum int32 `yaml:"max_pool_num"` // Number of cached links per DB link pool InitConnCount int `yaml:"init_conn_count"` // Load balancing mode (weight mode, metric): obtain high-quality nodes according to Prometheus index evaluation LoadBalanceMode string `yaml:"load_balance_mode"` // pg prometheus configuration, help to choose perfect exec node - PgPrometheusAddr string `yaml:"pg_prometheus_addr"` - MonitorPgNodes []string `yaml:"monitor_pg_node"` - PgExporterName string `yaml:"pg_exporter_name"` - NodeExporterName string `yaml:"node_exporter_name"` - NodeCpuMode string `yaml:"node_cpu_mode"` - TimeInterval string `yaml:"statistics_time_interval"` - PgDataDiskName string `yaml:"pg_data_disk_device_name"` - MetricsReqInterval int `yaml:"metrics_req_interval"` + PgPrometheusAddr string `yaml:"pg_prometheus_addr"` + MonitorPgNodes []string `yaml:"monitor_pg_node"` + PgExporterName string `yaml:"pg_exporter_name"` + NodeExporterName string `yaml:"node_exporter_name"` + NodeCpuMode string `yaml:"node_cpu_mode"` + TimeInterval string `yaml:"statistics_time_interval"` + PgDataDiskName string `yaml:"pg_data_disk_device_name"` + MetricsReqInterval int `yaml:"metrics_req_interval"` LsnCacheReqInterval int `yaml:"lsn_cache_req_interval"` // -------------------------------------------------------------- } diff --git a/proxy/server/conn_pgsql.go b/proxy/server/conn_pgsql.go index d4b3a51..b1b28f1 100644 --- a/proxy/server/conn_pgsql.go +++ b/proxy/server/conn_pgsql.go @@ -509,7 +509,7 @@ func (cc *ClientConn) writeCloseComplete() error { func (cc *ClientConn) handleStmtSyncPg(ctx context.Context) error { - defer cc.closeConn(cc.backendConn, false) + //defer cc.closeConn(cc.backendConn, false) if cc.backendConn == nil || cc.backendConn.Conn == nil { golog.Warn(moduleName, "handleStmtSyncPg", "backend connection is null, current send data is: "+string(cc.dataSend), cc.connectionId) diff --git a/proxy/server/conn_query.go b/proxy/server/conn_query.go index 54e956c..d076b62 100644 --- a/proxy/server/conn_query.go +++ b/proxy/server/conn_query.go @@ -356,7 +356,7 @@ func (c *ClientConn) closeConn(conn *backend.BackendConn, rollback bool) { parseData = cl.Encode(parseData) return true }) - golog.Debug(moduleName, "RunPg", fmt.Sprintf("close prepare statement: [%s]", string(parseData)), c.connectionId) + golog.Debug(moduleName, "closeConn", fmt.Sprintf("close prepare statement: [%s]", string(parseData)), c.connectionId) c.backendConn.Conn.WritePgPacket(parseData) conn.Close() -- Gitee From b8590b25cb958586efd1e5f640443e6f96ec317c Mon Sep 17 00:00:00 2001 From: wangyao Date: Thu, 27 Oct 2022 15:33:52 +0800 Subject: [PATCH 4/4] fix simple-query err for query lb in trx --- backend/node.go | 139 ++++++++++++++++------------------ proxy/server/conn_pgsql.go | 102 ++++++++++++++++++++----- proxy/server/conn_preshard.go | 15 +++- proxy/server/conn_query.go | 10 ++- 4 files changed, 169 insertions(+), 97 deletions(-) diff --git a/backend/node.go b/backend/node.go index 5f9ab61..2ef1b5a 100644 --- a/backend/node.go +++ b/backend/node.go @@ -73,7 +73,7 @@ func (n *Node) CheckNode() { for n.Online { n.checkMaster() n.checkSlave() - time.Sleep(16 * time.Second) + time.Sleep(time.Duration(n.Cfg.LsnCacheReqInterval) * time.Second) } } @@ -245,7 +245,7 @@ func (n *Node) checkMaster() { // get master and read-only nodes latest LSN in loop // get cached tables relationship with nodes in loop if n.Cfg.LoadBalanceMode == "lsn" || n.Cfg.LoadBalanceMode == "cache" { - go n.getLsnAndCacheMetadata(n.Master.checkConn, n.Cfg.Master) + n.getLsnAndCacheMetadata(n.Master.checkConn, n.Cfg.Master) } } return @@ -462,86 +462,81 @@ func (n *Node) DownSlave(addr string, state int32) error { // get data from db in loop func (n *Node) getLsnAndCacheMetadata(conn *Conn, masterStr string) { var err error - for { - if conn == nil || conn.ConnPg.Ping(context.Background()) != nil { - n.Master, err = n.OpenDB(masterStr) - if err != nil { - golog.Error("node", "getLsnAndCacheMetadata", fmt.Sprintf("open master db err : %s, addr: %s", - err.Error(), masterStr), 0) - time.Sleep(time.Duration(n.Cfg.LsnCacheReqInterval) * time.Second) - continue - } - conn = n.Master.checkConn + if conn == nil || conn.ConnPg.Ping(context.Background()) != nil { + n.Master, err = n.OpenDB(masterStr) + if err != nil { + golog.Error("node", "getLsnAndCacheMetadata", fmt.Sprintf("open master db err : %s, addr: %s", + err.Error(), masterStr), 0) + return } + conn = n.Master.checkConn + } - // init read-only node's LSN - rows, er := conn.ConnPg.Query(context.Background(), "select client_addr::text, replay_lsn from pg_stat_replication;") + // init read-only node's LSN + rows, er := conn.ConnPg.Query(context.Background(), "select client_addr::text, replay_lsn from pg_stat_replication;") + if er != nil { + golog.Error("node", "ParseMaster", fmt.Sprintf("Query pg_stat_replication failed: %s", er.Error()), 0) + } + var addr string + var lsn string + for rows.Next() { + er = rows.Scan(&addr, &lsn) if er != nil { - golog.Error("node", "ParseMaster", fmt.Sprintf("Query pg_stat_replication failed: %s", er.Error()), 0) + golog.Error("node", "ParseMaster", fmt.Sprintf("scan client_addr,replay_lsn err : %s", er.Error()), 0) + break } - var addr string - var lsn string - for rows.Next() { - er = rows.Scan(&addr, &lsn) - if er != nil { - golog.Error("node", "ParseMaster", fmt.Sprintf("scan client_addr,replay_lsn err : %s", er.Error()), 0) - break - } - if val, err := pgLsnInInternal(lsn); err != nil { - golog.Error("node", "ParseMaster", fmt.Sprintf("slave node parse LSN 16hex to 10hex err : %s, addr: %s", - err.Error(), addr), 0) - } else { - n.NodeLsn.Store(strings.Split(addr, "/")[0], val) - golog.Trace("node", "ParseMaster", - fmt.Sprintf("slave node parse LSN, addr: %s, lsn 16hex: %s, lsn 10hex: %d ", addr, lsn, val), 0) - } + if val, err := pgLsnInInternal(lsn); err != nil { + golog.Error("node", "ParseMaster", fmt.Sprintf("slave node parse LSN 16hex to 10hex err : %s, addr: %s", + err.Error(), addr), 0) + } else { + n.NodeLsn.Store(strings.Split(addr, "/")[0], val) + golog.Trace("node", "ParseMaster", + fmt.Sprintf("slave node parse LSN, addr: %s, lsn 16hex: %s, lsn 10hex: %d ", addr, lsn, val), 0) } - // init master node's LSN - er = conn.ConnPg.QueryRow(context.Background(), "select pg_current_wal_lsn();").Scan(&lsn) - if er != nil { - golog.Error("node", "ParseMaster", "scan pg_current_wal_lsn err !"+er.Error(), 0) + } + // init master node's LSN + er = conn.ConnPg.QueryRow(context.Background(), "select pg_current_wal_lsn();").Scan(&lsn) + if er != nil { + golog.Error("node", "ParseMaster", "scan pg_current_wal_lsn err !"+er.Error(), 0) + } else { + if val, err := pgLsnInInternal(lsn); err != nil { + golog.Error("node", "ParseMaster", fmt.Sprintf("master node parse LSN 16hex to 10hex err : %s, addr: %s", + err.Error(), masterStr), 0) } else { - if val, err := pgLsnInInternal(lsn); err != nil { - golog.Error("node", "ParseMaster", fmt.Sprintf("master node parse LSN 16hex to 10hex err : %s, addr: %s", - err.Error(), masterStr), 0) - } else { - n.NodeLsn.Store(strings.Split(masterStr, ":")[0], val) - golog.Trace("node", "ParseMaster", - fmt.Sprintf("master node parse LSN, addr: %s, lsn 16hex: %s , lsn 10hex: %d", masterStr, lsn, val), 0) - } + n.NodeLsn.Store(strings.Split(masterStr, ":")[0], val) + golog.Trace("node", "ParseMaster", + fmt.Sprintf("master node parse LSN, addr: %s, lsn 16hex: %s , lsn 10hex: %d", masterStr, lsn, val), 0) } - // init metadata about table which node cached - rows, er = conn.ConnPg.Query(context.Background(), "select datname, relname, clientaddr from pg_hot_data;") + } + // init metadata about table which node cached + rows, er = conn.ConnPg.Query(context.Background(), "select datname, relname, clientaddr from pg_hot_data;") + if er != nil { + golog.Error("node", "ParseMaster", fmt.Sprintf("Query pg_hot_data failed: %s", er.Error()), 0) + } + // Deal with `pg_hot_data` delete refresh, delete all data and restore + n.NodeCache.Range(func(key, value interface{}) bool { + n.NodeCache.Delete(key) + return true + }) + var datname string + var relname string + var clientaddr string + for rows.Next() { + er = rows.Scan(&datname, &relname, &clientaddr) if er != nil { - golog.Error("node", "ParseMaster", fmt.Sprintf("Query pg_hot_data failed: %s", er.Error()), 0) + golog.Error("node", "ParseMaster", fmt.Sprintf("scan datname, relname err : %s", er.Error()), 0) + break } - // Deal with `pg_hot_data` delete refresh, delete all data and restore - n.NodeCache.Range(func(key, value interface{}) bool { - n.NodeCache.Delete(key) - return true - }) - var datname string - var relname string - var clientaddr string - for rows.Next() { - er = rows.Scan(&datname, &relname, &clientaddr) - if er != nil { - golog.Error("node", "ParseMaster", fmt.Sprintf("scan datname, relname err : %s", er.Error()), 0) - break - } - golog.Trace("node", "ParseMaster", fmt.Sprintf("datname: %s, relname: %s, client_addr: %s", datname, relname, clientaddr), 0) - val, flag := n.NodeCache.Load(datname + "_" + relname) - var nodeCacheVal []string - if !flag { - nodeCacheVal = make([]string, 0) - } else { - nodeCacheVal = val.([]string) - } - nodeCacheVal = append(nodeCacheVal, clientaddr) - n.NodeCache.LoadOrStore(datname+"_"+relname, nodeCacheVal) + golog.Trace("node", "ParseMaster", fmt.Sprintf("datname: %s, relname: %s, client_addr: %s", datname, relname, clientaddr), 0) + val, flag := n.NodeCache.Load(datname + "_" + relname) + var nodeCacheVal []string + if !flag { + nodeCacheVal = make([]string, 0) + } else { + nodeCacheVal = val.([]string) } - - time.Sleep(time.Duration(n.Cfg.LsnCacheReqInterval) * time.Second) + nodeCacheVal = append(nodeCacheVal, clientaddr) + n.NodeCache.LoadOrStore(datname+"_"+relname, nodeCacheVal) } } diff --git a/proxy/server/conn_pgsql.go b/proxy/server/conn_pgsql.go index b1b28f1..4e673fa 100644 --- a/proxy/server/conn_pgsql.go +++ b/proxy/server/conn_pgsql.go @@ -30,6 +30,7 @@ import ( "strings" "sync" "time" + "unsafe" "github.com/jackc/pgproto3/v2" "github.com/jackc/pgx/v4" @@ -76,7 +77,7 @@ func (cc *ClientConn) handshake(ctx context.Context) error { val, flag := clientConnMap.Load(m.(*pgproto3.CancelRequest).ProcessID) if flag { c := val.(*ClientConn) - if c != nil && c.backendConn != nil && c.backendConn.ConnPg.PgConn() != nil { + if c != nil && c.backendConn != nil && c.backendConn.Conn != nil && c.backendConn.ConnPg.PgConn() != nil { cancelRequest := &pgproto3.CancelRequest{ProcessID: c.backendConn.ConnPg.PgConn().PID(), SecretKey: c.backendConn.ConnPg.PgConn().SecretKey()} err = c.backendConn.Conn.WritePgPacket(cancelRequest.Encode(nil)) @@ -316,6 +317,9 @@ func (cc *ClientConn) handleParsePrepare(ctx context.Context) error { parseData = parse.Encode(parseData) return true }) + if parseData == nil || len(parseData) == 0 { + return nil + } golog.Debug(moduleName, "handleParsePrepare", fmt.Sprintf("write cached parse data is: %s", string(parseData)), cc.connectionId) err := cc.backendConn.Conn.WritePgPacket(parseData) if err != nil { @@ -326,7 +330,6 @@ func (cc *ClientConn) handleParsePrepare(ctx context.Context) error { // handle simple query protocol func (cc *ClientConn) handleQueryPg(ctx context.Context, sql string, data []byte) error { - var err error if cc.backendConn == nil || cc.backendConn.Conn == nil { cc.backendConn, err = cc.preHandlePg(sql, ctx) @@ -353,6 +356,24 @@ func (cc *ClientConn) handleQueryPg(ctx context.Context, sql string, data []byte golog.Debug(moduleName, "handleQueryPg", fmt.Sprintf("exec sql [%s] by node [%s]", sql, cc.backendConn.GetAddr()), cc.connectionId, "dbname", cc.db) + // deal with duplicate "begin", will return 'WARNING: there is already a transaction in progress'. + if cc.beginFlag == BEGIN_PRESTART_COMMIT && "BEGIN" == strings.ToUpper(strings.ReplaceAll(sql, ";", "")) { + errRes := pgproto3.ErrorResponse{ + Severity: "WARNING", + SeverityUnlocalized: "WARNING", + Code: "25001", + Message: "there is already a transaction in progress", + File: "xact.c", + Line: 3689, + Routine: "BeginTransactionBlock", + } + var nRes pgproto3.NoticeResponse + nRes = pgproto3.NoticeResponse(errRes) + cmdComplete := &pgproto3.CommandComplete{CommandTag: stringTobyteSlice("BEGIN")} + cc.WriteData((&pgproto3.ReadyForQuery{TxStatus: 'T'}).Encode(cmdComplete.Encode((&nRes).Encode(nil)))) + return nil + } + // handle for 'begin' statement, and when exec insert/update/delete statement add 'begin' // 1.begin with commit ('begin' to 'begin;...;commit') // 2.if select statement, will do load balance @@ -360,20 +381,41 @@ func (cc *ClientConn) handleQueryPg(ctx context.Context, sql string, data []byte // And set flag to BEGIN_RELSTART_BEGIN means in transaction // 4.deal with statement as in the transaction until commit if cc.beginFlag == BEGIN_PRESTART { - err = cc.backendConn.Conn.WritePgPacket((&pgproto3.Query{String: "COMMIT;"}).Encode(data)) + // write msg to client directly + cmdComplete := &pgproto3.CommandComplete{CommandTag: stringTobyteSlice("BEGIN")} + cc.WriteData((&pgproto3.ReadyForQuery{TxStatus: 'T'}).Encode(cmdComplete.Encode(nil))) + cc.beginFlag = BEGIN_PRESTART_COMMIT + return nil + } else if cc.beginFlag == BEGIN_RELSTART { + // when first exec write ops after begin, will prior exec 'begin' statement. + sqlStr := "BEGIN;" + err = cc.backendConn.Conn.WritePgPacket((&pgproto3.Query{String: sqlStr}).Encode(nil)) if err != nil { golog.Error(moduleName, "handleQueryPg", fmt.Sprintf("write msg err: %s", err.Error()), cc.connectionId) return err } - cc.beginFlag = BEGIN_PRESTART_COMMIT - } else if cc.beginFlag == BEGIN_RELSTART { - sqlStr := "BEGIN;" + sql - err = cc.backendConn.Conn.WritePgPacket((&pgproto3.Query{String: sqlStr}).Encode(nil)) + // consume msg from backend, but not return to client + for { + d, e := cc.backendConn.Conn.ReadPgPacket() + if e != nil { + golog.Error(moduleName, "receiveBackendMsg", fmt.Sprintf("read packet from backend err: %s", e.Error()), cc.connectionId) + return e + } + if d[0] == 'Z' { + cc.status &= ^mysql.SERVER_STATUS_AUTOCOMMIT + cc.status |= mysql.SERVER_STATUS_IN_TRANS + break + } + } + // exec current statement + err = cc.backendConn.Conn.WritePgPacket(data) if err != nil { golog.Error(moduleName, "handleQueryPg", fmt.Sprintf("write msg err: %s", err.Error()), cc.connectionId) return err } + // reset flag cc.beginFlag = BEGIN_RELSTART_BEGIN + } else { err = cc.backendConn.Conn.WritePgPacket(data) if err != nil { @@ -391,12 +433,21 @@ func (cc *ClientConn) handleQueryPg(ctx context.Context, sql string, data []byte return nil } +func stringTobyteSlice(s string) []byte { + + tmp1 := (*[2]uintptr)(unsafe.Pointer(&s)) + + tmp2 := [3]uintptr{tmp1[0], tmp1[1], tmp1[1]} + + return *(*[]byte)(unsafe.Pointer(&tmp2)) + +} + // receive server connection msg, add deal with it func (cc *ClientConn) receiveBackendMsg(ctx context.Context) error { readloop: for { - data, err := cc.backendConn.Conn.ReadPgPacket() if err != nil { golog.Error(moduleName, "receiveBackendMsg", fmt.Sprintf("read packet from backend err: %s", err.Error()), cc.connectionId) @@ -447,22 +498,34 @@ readloop: if cc.beginFlag == BEGIN_PRESTART_COMMIT { data = (&pgproto3.ReadyForQuery{TxStatus: 'T'}).Encode(nil) } else if cc.beginFlag == BEGIN_RELSTART_BEGIN { - cc.status &= ^mysql.SERVER_STATUS_AUTOCOMMIT - cc.status |= mysql.SERVER_STATUS_IN_TRANS + if q.TxStatus == 'I' && !cc.alwaysCurNode { + cc.status = mysql.SERVER_STATUS_AUTOCOMMIT + } else { + cc.status &= ^mysql.SERVER_STATUS_AUTOCOMMIT + cc.status |= mysql.SERVER_STATUS_IN_TRANS + } cc.beginFlag = BEGIN_COMMIT } else { - if q.TxStatus == 'T' { + if q.TxStatus == 'T' && !cc.isInTransaction() { cc.status &= ^mysql.SERVER_STATUS_AUTOCOMMIT cc.status |= mysql.SERVER_STATUS_IN_TRANS } else if q.TxStatus == 'I' && !cc.alwaysCurNode { - cc.status |= mysql.SERVER_STATUS_AUTOCOMMIT - cc.status &= ^mysql.SERVER_STATUS_IN_TRANS - cc.beginFlag = BEGIN_UNSTART + //cc.status |= mysql.SERVER_STATUS_AUTOCOMMIT + //cc.status &= ^mysql.SERVER_STATUS_IN_TRANS + if cc.isInTransaction() { + cc.status = mysql.SERVER_STATUS_AUTOCOMMIT + } + if cc.beginFlag != BEGIN_UNSTART { + cc.beginFlag = BEGIN_UNSTART + } } } cc.dataRecv = append(cc.dataRecv, data...) - cc.WriteData(cc.dataRecv) + err = cc.WriteData(cc.dataRecv) + if err != nil { + golog.Error(moduleName, "receiveBackendMsg", fmt.Sprintf("write data to backend err: %v", err), cc.connectionId) + } cc.dataRecv = make([]byte, 0) break readloop } @@ -474,9 +537,9 @@ readloop: // TODO At present, all data are returned. // In the future, we need to consider the situation of multiple data. // We need to set a threshold and return in batches - //if len(dataRes) > 16*1024 { - // cc.WriteData(dataRes) - // dataRes = make([]byte, 0) + //if len(cc.dataRecv) > 16*1024 { + // cc.WriteData(cc.dataRecv) + // cc.dataRecv = make([]byte, 0) //} } return nil @@ -489,7 +552,7 @@ func (cc *ClientConn) handleStmtClosePg(ctx context.Context, close pgproto3.Clos } // Delete Parse record if closed. cc.Parse.Load(close.Name) - if _,flag := cc.Parse.Load(close.Name); flag { + if _, flag := cc.Parse.Load(close.Name); flag { cc.Parse.Delete(close.Name) } data := close.Encode(nil) @@ -976,6 +1039,7 @@ func (cc *ClientConn) writeSSLRequest(ctx context.Context, pgRequestSSL byte) er func (cc *ClientConn) WriteData(data []byte) error { if n, err := cc.pkg.Wb.Write(data); err != nil { + golog.Error(moduleName, "WriteData", fmt.Sprintf("write data to backend err: %v", err), cc.connectionId) return mysql.ErrBadConn } else if n != len(data) { return mysql.ErrBadConn diff --git a/proxy/server/conn_preshard.go b/proxy/server/conn_preshard.go index 6576fd2..0c223b9 100644 --- a/proxy/server/conn_preshard.go +++ b/proxy/server/conn_preshard.go @@ -34,7 +34,7 @@ import ( "gitee.com/he3db/he3proxy/sqlparser" ) -var UnParseSQL = [7]string{"PREPARE", "COMMIT", "ROLLBACK", "LISTEN", "NOTIFY", "UNLISTEN", "VACUUM"} +var UnParseSQL = [6]string{"PREPARE", "ROLLBACK", "LISTEN", "NOTIFY", "UNLISTEN", "VACUUM"} type ExecuteDB struct { ExecNode *backend.Node @@ -320,8 +320,19 @@ func (c *ClientConn) GetExecDB(tokens []string, sql string) (*ExecuteDB, error) continue } // replica node also can exec begin - c.beginFlag = BEGIN_PRESTART + if c.beginFlag == BEGIN_UNSTART { + c.beginFlag = BEGIN_PRESTART + } return c.getSelectExecDB(sql, tokens, tokensLen) + case *tree.CommitTransaction: + if c.isInTransaction() { + continue + } else { + if c.beginFlag == BEGIN_PRESTART_COMMIT { + c.beginFlag = BEGIN_RELSTART + } + return c.getSelectExecDB(sql, tokens, tokensLen) + } } } } diff --git a/proxy/server/conn_query.go b/proxy/server/conn_query.go index d076b62..1ba5e0c 100644 --- a/proxy/server/conn_query.go +++ b/proxy/server/conn_query.go @@ -349,16 +349,18 @@ func (c *ClientConn) closeConn(conn *backend.BackendConn, rollback bool) { } // close prepare statement and clean parseData - parseData := make([]byte, 0) + var parseData []byte c.Parse.Range(func(key, value interface{}) bool { parse := (value).(pgproto3.Parse) cl := pgproto3.Close{ObjectType: 'S', Name: parse.Name} parseData = cl.Encode(parseData) return true }) - golog.Debug(moduleName, "closeConn", fmt.Sprintf("close prepare statement: [%s]", string(parseData)), c.connectionId) - c.backendConn.Conn.WritePgPacket(parseData) - + if parseData != nil || len(parseData) > 0 { + golog.Debug(moduleName, "closeConn", fmt.Sprintf("close prepare statement: [%s], len: [%d]", + string(parseData), len(parseData)), c.connectionId) + c.backendConn.Conn.WritePgPacket(parseData) + } conn.Close() } -- Gitee