1 Star 0 Fork 0

孙烨/网络编程 nio-Netty学习

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
readme.txt 21.70 KB
一键复制 编辑 原始数据 按行查看 历史
孙烨 提交于 2023-06-13 18:15 . 八、致谢-----------
写在前面的介绍:
包括nio基础 jdk自带的nio包,从阻塞非阻塞模式,到多路复用,再到netty的使用 力求用简单的示例,将nio从原理剖析明白
一、nio基础(preposition)
1.说明:
non-blocking io 非阻塞io
2.组成
channel通道、buffer缓冲区、selector选择器
常见的channel有fileChannel、datagramChannel、socketChannel、serverSocketChannel
常见的buffer有byteBuffer、shortBuffer、intBuffer。。。
selector选择器配合线程管理channel,获取channel事件并转给thread处理
3.bytebuffer的使用
1.使用过程:
获得bytebuffer对象,比如通过静态方法 ByteBuffer.allocate(10)
向buffer对象写入数据,比如调用read方法
调用buffer的flip方法到读模式
读取数据,比如调用buffer的get方法
调用buffer的clear方法或者compact方法切换到写模式
...重复上述步骤...(所谓的读写模式,本质上就是指针位置的改变)
2.byteBuffer主要属性
byteBuffer.capacity();//容量,装多少数据
byteBuffer.position();//索引指针,当前数据读或者写到哪了
byteBuffer.limit();//读写限制,应该读取多少字节或者写多少字节
3.buffer读写数据方式
1.写入数据
channel.read()
或者buffer.put()
2.读出数据
channel.write()
或者buffer.get()
get方法会让position指针移动,可以调用rewind将position置位零
或者通过get(int i)的方式获取索引i位置的数据,这样读取指针不会移动
3.mark reset
buffer.mark可以做一个标记
buffer.reset将position重置到mark所标记的位置
4.字符串与bytebuffer的转换
1.字符串转bytebuffer
byteBuffer.put(str.getBytes());
StandardCharsets.UTF_8.encode(str)
ByteBuffer.wrap(str.getBytes());
5.黏包半包解析
1.网络中多条数据发送给服务端,但由于某些原因数据在接收时进行了重新组合
发送
hello
,tom
,i am sunye
接受可能
hello ,to
m,i am
sunye
4.文件编程fileChannel
1.说明:
fileChannel只能工作在阻塞模式之下
通过fileInputStream 获取只写channel
fileOutputStream 获取只写channel
或者randomAccessFile 获取根据定义读写模式的channel
2.读
channel.read(buffer),返回值表示读取到的字节数,-1表示读取到了文件末尾
3.写
推荐写法
byteBuffer buffer = ...
buffer.put(...)
buffer.flip()
//因为channel.write方法并不一定保证会将数据写入,和channel的类型以及状态有关,
//例如,处于非阻塞模式的套接字通道不能 写入的字节数多于套接字输出缓冲区中的可用字节数。
while(buffer.hasRemaining()){
channel.write(buffer)
}
默认考虑性能影响不会讲数据立刻写入磁盘,可以调用force(true)方法将文件内容和元信息(权限等)立刻写入磁盘
4.关闭
见一些在finally或者使用try-with-resource
channel.close或者调用流的close
5.位置
channel同样有 position可以获取或者设置当前位置
通过size可以获得文件大小
6.数据传输
channel.transferTo(从哪个位置 读多少数据 传输到哪个channel)
单文件最大不可超过两G,如果超过则需要多次传输
常用文件操作api
paths获取目录工具类
Files对文件以及文件夹的操作工具类,walk以及walkFileTree方法使得对文件文件夹的操作都很容易
二、网络编程:(network)
1.阻塞:
一个方法的执行会影响别的方法的执行,会进行阻塞 cpu歇着
比如说 ServerSocketChannel.accept() 建立连接的方法 和 SocketChannel.read() 读取数据的方法
默认都会使得线程停止进行等待
2.非阻塞:
一个方法的执行不会影响别的方法的执行,不会进行阻塞 cpu动着
比如说通过设置 ServerSocketChannel.configureBlocking(false) 和 SocketChannel.configureBlocking(false) 使用非阻塞连接
ServerSocketChannel.accept() 建立连接的方法 和 SocketChannel.read() 读取数据的方法 如果无数据则会返回null值不影响程序执行
3.selector模式
通过selector对非阻塞的channel的事件进行监控
事件 accept服务端收到客户端请求连接事件 connect客户端连接成功 read可读事件 writer可写事件
通过 channel.register(selector, 0, null); 注册需要监听的channel
通过 selector.select(); 对事件进行监听
通过 selector.selectedKeys() 获得事件集进行处理 或者通过 selectedKey.cancel() 对事件进行取消
特别说明一下读事件
客户端断开时会发送一个读事件,正常断开read==-1 异常断开会抛出IOException异常
断开之后服务端需要
key.cancel();//将当前channel 从select监听列表中移除掉
channel.close();//将当前channel 从select监听列表中移除掉
4.消息边界问题:
1.说明:
读取数据,客户端发送数据,但是服务端不能一次性读取导致消息被分割成多个部分
导致服务端对数据解析失败
写数据时,一次性输出大量信息,可能导致channel满了,此时程序会不断尝试发出信息但是发出失败
导致程序再次阻塞,这与nio理念不符
2.读数据解决思路:
加大接受数据长度(可能浪费带宽)
增加结束标志(传输效率较低,且需要定义一个比较大的存放数据位置,当然也可以定义的较小 然后进行扩容...)
LTV模式,长度类型数据,消息头消息体模式,消息头中说明消息体的长度(比较好用的多)
3.对于buffer的大小是很难控制的
数据太多要扩容,数据太少要缩容...当然netty都做完了自适应netty
4.写数据解决思路:
发送失败时 selectKey可以追加关注可写事件,同时保存当前未完成的数据挂载到selectKey上
等到channel可以写了,再继续进行写操作
三、网络编程-多线程版
1.单线程问题:
虽然多路复用可以实现单线程处理多事件,但是如果某个事件处理时间较长
也会影响其它事件的处理
2.使用多线程
要注意代码执行顺序,如果select()方法被阻塞可以通过wakeup()主动唤醒
通过boss负责建立链接
work负责读请求,同时可以使用多个work增加处理能力,多个work要注意负载均衡
四、相关概念说明:
1.channel和stream(指例如 FileInputStream)
Stream不会自动缓冲数据,而channel会利用系统的发送缓冲区和接收缓冲区
Stream只支持阻塞api,channel同时支持阻塞,非阻塞,网络channel可以配合selector实现多路复用(记住只有网络channel才能配合selector使用哦)
都为全双工。读写都可以同时进行
2.io
io步骤: java-->读请求-->内核api-->立即返回/阻塞返回-->展示给用户
阻塞io,等待返回结果
非阻塞Io,立即返回结果
同步io,影响程序执行,同一线程
异步io,不影响程序执行,不同线程
多路复用--同步阻塞
3.零拷贝
指的是计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。
这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。
DMA(Direct Memory Access,直接内存存取) 是所有现代电脑的重要特色
它允许不同速度的硬件装置来沟通,而不需要依赖于CPU 的大量中断负载。
在java nio中使用 ByteBuffer.allocateDirect申请系统内存,可以避免用户态转为和内核态之间的数据复制
使用 channel.transferTo 或者channel.transferFrom 可以避免内核态到用户态再到内核态之间的转换,可直接在内核态之间进行数据转换
所谓零拷贝,本质上是避免内核状态切换带来的数据拷贝问题,适合数据流较小的传输,如果文件过大会占用内核的缓冲区
4.aio
简单说明aio 操作系统所支持的异步io,该功能在netty5中被广泛引入但由于linux对此支持并不友好(使用多路复用)...
目前netty5处于开发阶段,并未正式使用 aio.AioFileChannel 此示例仅简单说明异步io中线程的执行过程
五、netty入门
1.概述netty
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
(翻译精髓)-通过多路复用(selector)和多线程,使得发送和接受进行分离,形成事件驱动的异步io模型
2.评价地位:
netty在java网络应用框架的地位 相当于spring在javaee中的地位
有网络通信需求基本使用netty
例如:rocketMQ、elsearch、grpc、dubbo、spring5-webflux、zookeeper
3.使用比较:
原生nio:
需要构建通信协议、解决数据传输粘包半包问题、epoll空轮询导致cpu100%...
netty:
对上述问题进行了封装
4.使用:
客户端:netty.HelloClient_01
服务端:netty.HelloServer_01
5.组件
EventLoop:
直译为事件循环,是一个单线程执行器维护一个selector、里面的run方法处理Channel上的io事件
继承了java.util.concurrent.ScheduledExecutorService 拥有线程池的方法
还继承了io.netty.util.concurrent.OrderedEventExecutor 拥有判断线程是否属于当前EventLoop 以及查看当前线程属于哪个EventLoop
EventLoopGroup:
直译为事件循环组,是一组EventLoop
Channel一般会调用EventLoopGroup的register方法绑定其中一个EventLoop
后续Channel上的io事件都由此EventLoop来处理,保证io事件处理时的线程安全
Channel:
close()可以关闭
closeFuture()处理close之后的操作,sync是同步等待关闭,addListener异步等待关闭,基本使用异步 全都是主线程跑完然nio线程进行各种回调处理
pipeline()添加handler处理器
write()写入数据
writeAndFlush()写入并发出
Future/Promise
用于异步处理结果,为啥要异步嘞,netty的优势并不仅仅是多线程提高效率,
而是利用多线程将工作进行细分,原理和cpu工作的流水线设计是一致的,通过细分在任务量多的时候可以更充分地利用资源
所提升的是任务吞吐量,对响应时间并没有很多优势(甚至减慢)
关系说明:
netty-promise继承自netty-Future继承自jdk-Future
jdk-Future 用于同步等待任务结束得到结果,成功或者失败
netty-Future 用于同步或者异步等待任务结束得到结果,成功或者失败
netty-promise 脱离任务独立存在,作为两个线程之间传递结果的容器
Handler/Pipeline
ChannelHandler 用来处理Channel上的各种事件,分为入栈ChannelInboundHandlerAdapter、出栈两种
ChannelHandler 链接为一串称为 Pipeline
Bytebuf:
同样具有堆内存或者直接内存,堆内存收到垃圾回收影响,直接内存读写性能更高且不会收到GC影响
ByteBufAllocator.DEFAULT.heapBuffer();
ByteBufAllocator.DEFAULT.directBuffer();
还区分为池化、非池化
非池化每次创建都是新的bytebuf实例,对直接内存或者堆内存都是昂贵的代价
池化则可重用池中存在的bytebuf实例,采用类似 jemalloc 内存分配算法
通过虚拟机参数可调整 -Dio.netty.allocator.type=pooled|unpooled
组成:
capacity:当前容量
maxCapacity:最大容量(默认为2147483647)
读指针位置(数据起始位置) 区别于nioByteBuffer使用一个指针需要来回切换读写模式
写指针位置(当前数据位置)
写入:
写入导致写指针位置后移,可写区域会变少,可读区域会变大,如果超过当前容量会触发扩容
读取:
读取会使得读指针位置后移,可读区域会变少,可写区域会变大
回收:
heapBuffer 使用jvm内存,可以等待GC回收
directBuffer 使用直接内存,需要手动回收(也可以等待GC回收)
pooledBuffer 池化机制需要更加复杂的规则进行回
netty通过引用计数的规则来控制回收内存,每个ByteBuf都实现ReferenceCounted接口 初始值为1
调用release()方法使得计数-1,retain()使得计数+1 计数为0则对ByteBuf进行回收
handler头尾有一个head和tail,会将他们接受到的byteBuf全都给释放掉
零拷贝的Slice、duplicate、asReadOnlyBuffer:
Slice:
对原始byteBuf进行切片成多个byteBuf,切片后的byteBuf不发生内存复制仍然使用原始内存
但是维护自己独立的read、write指针,不允许追加数据,可进行修改
duplicate:
拷贝当前byteBuf,修改数据会相互影响,可新增数据且新增数据不会相互影响
asReadOnlyBuffer:
只读数据,不允许任何修改
零拷贝的CompositeByteBuf
CompositeByteBuf.addComponents(b1...b2)
同一地址相互影响
Unpooled.wrappedBuffer 工具类底层就是通过CompositeByteBuf实现零拷贝的合并
Copy一系列方法:
是深度拷贝,和原始数据无相互影响
6.粘包/半包
1.粘包原因:
应用层:接收方ByteBuf设置太大,netty默认为1024
滑动窗口:TCP请求滑动窗口机制允许客户端在一段时间内可连续发送请求,此时服务端处理不及时会导致数据暂存在滑动窗口中发生粘包现象
Nagle算法:用于自动连接许多的小缓冲器消息
2.半包原因
应用层:接收方ByteBuf小于实际发送数据量
滑动窗口:窗口满了但是数据未发送完成,只能等待ack响应才能继续发送
链路层:MSS限制,当发送数据超过这一容量之后数据会被切片发送
3.本质原因:流式传输,没有限制边界
4.解决方案:
1.短连接:HelloClient_02 HelloServer_02
发一个消息就断开,以连接断开变相的作为消息边界...可解决粘包,但是无法解决半包问题,接收方一次性接受不了就算断开连接,接收方也会分两次接收数据
2.定长解码器:FixedLengthFrameDecoder
在服务端使用FixedLengthFrameDecoder添加定长处理器
3.分隔符解码:LineBasedFrameDecoder/LineBasedFrameDecoder
使用换行或者自定义分隔符
4.根据长度字段的长度解码器:LengthFieldBasedFrameDecoder
这是目前生产环境主用的方式,消息头消息体的模式
7.协议的解析与设计
1. 比如说模拟Redis的resp协议
要发送这个命令 set key value
则先发送命令长度3 再发送set
再先发送命令长度3 再发送key
再先发送命令长度5 再发送value
示例:agreement.TestRedis
2.http协议(netty已经把解码器封装好了...)
3.自定义协议:
比如说redis的协议使用回车换行很多,字节不够紧凑会消耗一部分带宽
自定义协议一般有以下几个要素
魔数,判断数据包是否是有效的数据包 比如说class文件开头的cafe babe
版本号,用于支持协议的升级
序列化算法,消息正文采用的序列化方式,比如json、xml、protobuf、jdk、hessian
指令类型,登录、注册、单聊、群聊...业务相关
请求序号,为双工通信提供异步能力
正文长度,
消息正文,
4.Handler使用要注意线程安全问题
注意是否可以抽出来,主要判断该handler是否是有状态的
可以看一下源码上是否有@Sharable标记
ByteToMessageCodec 的子类不能使用这个标记,父类的构造方法对其进行了限制
如果想构建可重复使用的handler 可以使用MessageToMessageCodec 实现其方法
8.链接假死
1.问题原因:
网络设备出现故障,网络已经断开,但是应用程序没有感知到,仍然占用资源
2、检测器:
使用 IdleStateHandler 空闲链接检测器 检测读或者写空闲时间事件
使用 ChannelDuplexHandler 对捕获的超时事件进行处理
9.优化点
1.序列化和反序列化可以定义接口进行实现,避免在编解码器中写死
2.参数优化
客户端通过bootstrap的option配置 对 SocketChannel 生效
服务端分为了childOption 和option option-对ServerSocketChannel生效 childOption-对 SocketChannel 生效
CONNECT_TIMEOUT_MILLIS 连接超时时间,配置在客户端
SO_TIMEOUT 用于阻塞式IO中避免accept、read等方法的无限等待
SO_BACKLOG 属于serverSocketChannel的参数 linux2.2之前表示三次握手时的syncQueue和acceptQueue两个队列的大小
2.2之后使用 /proc/sys/net/ipv4/tcp_max_sys_backlog 制定syncQueue,在syncookies启用的情况下逻辑上不受限制该参数被忽略
2.2之后使用 /proc/sys/net/core/somaxcoon 制定acceptQueue,在使用listen函数时内核根据传入的backlog参数和系统参数相比以较小值为准
当队列满时,服务端将会对客户端发出拒绝连接信息
syncookies的视频解释: https://www.a10networks.com/resources/videos/what-are-syn-cookies-and-how-are-they-used/
ulimit-n 属于操作系统参数 临时操作系统参数限制用户可打开的最多文件数目 默认1024, 不修改容易遇到error: too many open files
TCP_NODELAY 属于socketChannel参数,延迟发送信息直到满足最小发送容量再发送,默认为true开启该功能,根据需要如果需要及时发送可关闭该参数
SO_SNDBUF 和 SO_RCVBUF 属于socketChannel参数,用于控制发送和接受缓冲区,当前操作系统一般都可动态调节 不建议手动干预
ALLOCATOR 属于socketChannel参数,分配byteBuf 和ctx.alloc()
RCVBUF_ALLOCATOR 属于socketChannel参数 控制netty 接收缓冲区的大小
六、RPC框架
1.实现思路:
在消息定义的父类中增加两个消息类型,请求对象和响应对象
请求消息要需要明确接口名称、方法名称、返回类型、执行参数,
响应消息要明确返回值和异常值
(只不过聊天传输文字内容 rpc传输方法内容...)
2.实现过程:
服务端代码:
服务端接受客户端的调用请求,可通过反射或者其他方式(比如spring的容器)来执行方法,将执行结果返回到客户端
找到class对象 找到方法 方法invoke执行获取返回值 返回值转为期望的返回值类型 返回值返回到client
客户端代码:
将请求信息发送到服务端 接受服务端返回信息 根据返回信息处理业务逻辑
3.示例代码:
略...
七、netty源码分析
1、netty的基本流程
Selector selector = Selector.open; //获得一个管理器
NioServerSocketChannel attachment = new NioServerSocketChannel(); //创建一个server netty创建的是nioServerSocketChannel
ServerSocketChannel ssc = ServerSocketChannel.open(); //创建一个server jdk原生的
ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, 0, attachment); //服务注册到管理器并关联netty的nio服务
ssc.bind(new InetSocketAddress(8080)); //绑定端口
sscKey.intersOps(SelectionKey.OP_ACCEPT); // 触发channel的active事件,在head中关注accept事件
2、对应说明:
serverBootstrap.bind()方法为入口 到AbstractBootstrap.doBind方法
initAndRegister 完成初始化channel并注册到selectionKey
doBind0 完成绑定操作 -
八、致谢-----------
netty学习告一段了,感谢 黑马程序员 && bilibili
https://www.bilibili.com/video/BV1py4y1E7oA/
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/qq2528825468/netty-learn.git
[email protected]:qq2528825468/netty-learn.git
qq2528825468
netty-learn
网络编程 nio-Netty学习
master

搜索帮助