1 Star 0 Fork 0

Bin/kir

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
connection.go 6.10 KB
一键复制 编辑 原始数据 按行查看 历史
bin you 提交于 2024-09-22 01:05 . 优化前端显示
package main
import (
"encoding/binary"
"fmt"
"sync"
"time"
"github.com/bwmarrin/snowflake" // 添加snowflake包
)
const (
COLUMN_FINISHED = 1
RESULT_FINISHED = 2
)
var node *snowflake.Node // 添加snowflake节点
func init() {
var err error
node, err = snowflake.NewNode(1) // 使用节点ID 1 初始化snowflake节点
if err != nil {
panic(err)
}
}
type preparedStatement struct {
query string
params int
paramTypes []byte // 添加这个字段
}
type monitoredConnection struct {
preparedStatements map[uint32]*preparedStatement
stmtMutex sync.RWMutex
sendBuffer []byte
recvBuffer []byte
sendChannel chan []byte
recvChannel chan []byte
closeChannel chan struct{}
// 当前执行的sql
currentSQL string
currentResult *queryResult
currentState int
}
func newMonitoredConnection() *monitoredConnection {
mr := &monitoredConnection{
preparedStatements: make(map[uint32]*preparedStatement),
sendChannel: make(chan []byte),
recvChannel: make(chan []byte),
closeChannel: make(chan struct{}),
}
go mr.processData()
return mr
}
func (mr *monitoredConnection) Close() {
mr.closeChannel <- struct{}{}
}
func (mr *monitoredConnection) addSql(sql string) {
mr.currentSQL = sql
mr.currentResult = &queryResult{}
mr.currentState = 0
}
func (mr *monitoredConnection) addSqlResult() {
sqlMutex.Lock()
defer sqlMutex.Unlock()
SQLS = append(SQLS, &SqlStruct{Id: node.Generate().String(), Sql: mr.currentSQL, Datetime: CustomTime(time.Now()), Result: mr.currentResult})
if len(SQLS) > 100 {
SQLS = SQLS[1:]
}
mr.reset()
}
func (mr *monitoredConnection) reset() {
mr.currentSQL = ""
mr.currentResult = nil
mr.currentState = 0
}
func (mr *monitoredConnection) processData() {
defer close(mr.closeChannel)
defer close(mr.sendChannel)
defer close(mr.recvChannel)
for {
select {
case data := <-mr.recvChannel:
mr.recvBuffer = append(mr.recvBuffer, data...)
// 归还buffer
bufferPool.Put(data)
mr.parseBuffer("recv", &mr.recvBuffer)
case data := <-mr.sendChannel:
mr.sendBuffer = append(mr.sendBuffer, data...)
// 归还buffer
bufferPool.Put(data)
mr.parseBuffer("send", &mr.sendBuffer)
case <-mr.closeChannel:
return
}
}
}
func (mr *monitoredConnection) parseBuffer(direction string, buffer *[]byte) {
for {
if len(*buffer) < 4 {
// 缓冲区中的数据不足以解析出一个完整的数据包头部
return
}
// 获取数据包长度
length := int((*buffer)[0]) | int((*buffer)[1])<<8 | int((*buffer)[2])<<16
if len(*buffer) < length+4 {
// 缓冲区中的数据不足以解析出一个完整的数据包
return
}
// 解析完整的数据包
packet := (*buffer)[4 : length+4]
// fmt.Printf("解析数据包: 长度 %d, 方向 %s\n", length, mr.direction)
if direction == "recv" {
mr.parseRecvPacket(packet)
} else {
mr.parseSendPacket(packet)
}
// 移除已解析的数据包
*buffer = (*buffer)[length+4:]
}
}
func (mr *monitoredConnection) parseRecvPacket(packet []byte) {
if len(packet) < 1 {
return
}
// 处理MySQL服务器的响应
if len(packet) >= 12 && packet[0] == 0x00 && isPrepareResponse(packet) {
// 可能是Prepare语句的响应
mr.handlePrepareResponse(packet)
} else if mr.currentSQL != "" {
if packet[0] == 0x00 {
mr.handleOKPacket(packet)
} else if len(packet) > 1 && packet[0] == 0x03 || packet[0] == 0x04 {
// 字段定义包
if mr.currentState == 0 {
mr.handleFieldPacket(packet)
} else if mr.currentState == 1 {
mr.handleRowPacket(packet)
}
} else if packet[0] != 0xfe && packet[0] != 0x00 {
// 数据行包
mr.handleRowPacket(packet)
} else if packet[0] == 0xfe {
// EOF包,表示结果集传输完成
mr.handleEOFPacket(packet)
}
}
}
func (mr *monitoredConnection) parseSendPacket(packet []byte) {
if len(packet) < 1 {
return
}
switch packet[0] {
case 0x03: // COM_QUERY
query := string(packet[1:])
mr.addSql(query)
fmt.Println("SQL:", query)
case 0x16: // COM_STMT_PREPARE
mr.handlePrepare(packet[1:])
case 0x17: // COM_STMT_EXECUTE
mr.handleExecute(packet[1:])
}
}
// 添加一个新方法来处理MySQL服务器的Prepare响应
func (mr *monitoredConnection) handlePrepareResponse(data []byte) {
if len(data) < 12 {
fmt.Println("Prepare响应数据不足")
return
}
stmtID := binary.LittleEndian.Uint32(data[1:5])
columnCount := binary.LittleEndian.Uint16(data[5:7])
paramCount := binary.LittleEndian.Uint16(data[7:9])
mr.stmtMutex.Lock()
defer mr.stmtMutex.Unlock()
if stmt, ok := mr.preparedStatements[0]; ok {
stmt.params = int(paramCount)
mr.preparedStatements[stmtID] = stmt // 使用实际的 stmtID
delete(mr.preparedStatements, 0) // 删除临时的 key
fmt.Printf("更新Prepare语句,ID: %d,参数数量: %d,列数: %d\n", stmtID, paramCount, columnCount)
} else {
fmt.Printf("未找到对应的Prepare语句,ID: %d\n", stmtID)
}
}
func (mr *monitoredConnection) handlePrepare(data []byte) {
sql := string(data)
fmt.Println("捕获的Prepare语句:", sql)
// 生成一个新的 preparedStatement 并保存
stmt := &preparedStatement{
query: sql,
params: 0, // 初始化参数数量
}
stmtID := binary.LittleEndian.Uint32(data[:4])
fmt.Println("stmtID:", stmtID)
mr.stmtMutex.Lock()
mr.preparedStatements[stmtID] = stmt
mr.stmtMutex.Unlock()
}
func (mr *monitoredConnection) handleExecute(data []byte) {
if len(data) < 5 {
fmt.Println("无效的Execute包")
return
}
stmtID := binary.LittleEndian.Uint32(data[0:4])
mr.stmtMutex.RLock()
defer mr.stmtMutex.RUnlock()
stmt, ok := mr.preparedStatements[stmtID]
if !ok {
fmt.Printf("未找到对应的Prepare语句,ID: %d\n", stmtID)
return
}
// 解析参数
params, newParamTypes, err := parseParams(data[5:], stmt)
if err != nil {
fmt.Println("解析参数失败:", err)
return
}
// 如果收到新的参数类型,更新它们
if newParamTypes != nil {
mr.stmtMutex.Lock()
stmt.paramTypes = newParamTypes
mr.stmtMutex.Unlock()
}
// 打印重构的SQL
reconstructedSQL := reconstructSQL(stmt.query, params)
mr.addSql(reconstructedSQL)
fmt.Println("SQL:", reconstructedSQL)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/llyb120/kir.git
[email protected]:llyb120/kir.git
llyb120
kir
kir
master

搜索帮助