netty概述:
Netty是由JBoss提供的一个java开源框架。Netty提供异步的、基于事件驱动的网络应用程序框架,用一开发高性能,高可靠的io程序
Netty可以帮助你快速、简单构建一个网络应用,相当于简化和流程化了NIO的开发过程
Netty是目前最流行的NIO框架,Netty在互联网领域、大数据分布计算领域、游戏行业、通信行业都获得了广泛的应用,知名的ES、Dubbo框架内部都采用了Netty。
nio存在问题:
NIO的类库和API繁杂,使用麻烦,需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
需要熟练掌握Java多线程编码,因为NIO编程涉及到Reactor模式,必须对多线程和网络编程非常熟悉,才能编写出好的NIO代码
Netty抽象出两组线程池BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写
BossGroup和WorkerGroup类型都是NioEventLoopGroup
NioEventGroup相当于事件循环组,这个组中含有多个事件循环,每一个事件循环都是NioEventLoop
NioEventLoop表示一个不断循环处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket通讯
NioEventLoopGroup可以有多个线程,既可以含有多个NioEventLoop
每个 Boss NioEventLoop执行的步骤有三部
1轮询accept事件
2处理accept事件,与client建立连接 ,生成NioSocketChannel,并将其注册到某个worker NioEventLoop上的selector
3处理任务队列中的队伍,即runAllTasks
每个Worker NIOEventLoop循环执行的步骤
轮询read、write事件
处理i/o事件,即read,write事件,在对应NioSocketChannel处理
处理任务队列中的队伍,即runAllTasks
8.每个Worker NIOEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel,即通过pipeline可以获取对应通道
Channel:
Channel是java NIO的一个基本构造,可以看做是传入或传出数据的载体。因此,它可以被打开被关闭,连接或者断开连接。
EventLoop与EventLoopGroup:
EventLoop定义了Netty的核心抽象,用来处理连接声明周期所发生的事件。在内部,将会为每个Channel分配一个EventLoop,EventLoopGroup是一个EventLoop池,里面包含了很多EventLoop。
Netty为每个Channel分配了一个EventLoop,用于处理连接用户请求、对用户请求处理等事件。EventLoop本身是一个线程驱动,在其生命周期只会绑定一个线程,让该线程处理一个Channel的所有IO事件。
一个Channel一旦与一个EventLoop绑定,那么在Channel整个生命周期内是不能改变的。一个EventLoop可以与多个Channel绑定。
ServerBootstrap与Bootstrap:
ServerBootstrap和Bootstrap被称为引导类,指对应用程序进行配置,并使它运行起来的过程。Netty处理引导的方式是使你的应用程序和你的引导层相隔离
Bootstrap是客户端的引导类,Bootstrap在调用bind()(连接UDP)和connect()(连接TCP)方法时,会新创建一个Channel,仅创建一个单独的、没有父Channel的Channel来实现所有的网络交换。
ServerBootstrap是服务端的引导类,ServerBootstrap在调用bind()方法时会创建一个ServerChannel来接收来自客户端的连接,并且该ServerChannel管理了多个子Channel用于同客户端之间的通信
ChannelHandel是对Channel中数据的处理器,这些处理器可以是系统本身定义好的编码器,也可以是用户自定义的。这些处理器会被统一添加到一个ChannelPipeline的对象中,然后按照添加的顺序对Channel中的数据进行处理。
ChannelFuture
Netty中所有的操作都是异步的,即操作不会立即得到返回结果,所有Netty定义了ChannelFuture 对象作为这个异步操作的代言人,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作的addListener() 方法为该异步操作添加监听器,为其注册回调:当结果出来后马上调用执行。
代码思路:
创建2个线程组BossGroup和WorkerGroup(BossGroup处理连接请求,workGroup真正的和客户端进行业务处理)
创建服务器端的启动对象,并对其配置参数
设置2个线程组
使用nioSocketChannel作为服务器的通道实现
设置线程队列得到连接
设置保持活动连接状态
给pipeline设置处理器
3启动服务器并绑定端口
4对关联通道进行监听
5 对2个线程组实行netty优雅退出机制
编写服务端
package com.liubujun.netty;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @Author: liubujun* @Date: 2023/2/6 17:05*/public class NettyServer {public static void main(String[] args) throws Exception {//1.创建2个线程组bossGroup和workerGroup//2 bossGroup只是处理连接请求,workerGroup真正的和客户端进行业务处理//3 两个都是无限循环NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {//创建服务器端的启动对象,配置参数ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workerGroup)//设置两个线程组.channel(NioServerSocketChannel.class) //使用nioSocketChannel作为服务器的通道实现.option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态.childHandler(new ChannelInitializer() {//给pipeline设置处理器@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyServerHandler());}});//给workerGroup的EventLoop对应的管道设置处理器System.out.println("....服务器 is ready...");//绑定一个端口并且同步,生成了一个ChannelFuture对象//启动服务器(并绑定端口)ChannelFuture cf = bootstrap.bind(6668).sync();//对关联通道进行监听cf.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
编写服务端处理器:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {//当通道就绪会触发该方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client"+ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server: 喵喵喵", CharsetUtil.UTF_8));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf)msg;System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));System.out.println("服务器的地址:"+ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
代码思路:
1创建一个线程组
2 创建一个客户端启动对象(客户端使用的是Bootstrap不是ServerBootstrap)并对其配置参数
设置线程组
使用nioSocketChannel作为服务器的通道实现
设置线程队列得到连接
给pipeline设置处理器
3 启动客户端连接服务器
4给关闭通道进行监听
5对线程组进行netty优雅关闭
编写客户端:
package com.liubujun.netty;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;/*** @Author: liubujun* @Date: 2023/2/8 15:14*/public class NettyClient {public static void main(String[] args) throws Exception{//客户端需要一个循环组NioEventLoopGroup group = new NioEventLoopGroup();try {//创建客户端的启动对象//注意客户端使用的是Bootstrap不是ServerBootstrapBootstrap bootstrap = new Bootstrap();bootstrap.group(group) //设置线程组.channel(NioSocketChannel.class) //设置客户端通道的实现类.handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器}});System.out.println("客户端 ok ...");//启动客户端连接服务器 ChannelFuture涉及到netty的异步模型ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();//给关闭通道进行监听channelFuture.channel().closeFuture().sync();}finally {group.shutdownGracefully();}}
}
客户端处理器:
package com.liubujun.netty;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @Author: liubujun* @Date: 2023/2/8 17:30*/public class NettyClientHandler extends ChannelInboundHandlerAdapter {//当通道就绪会触发该方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client"+ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server: 喵喵喵", CharsetUtil.UTF_8));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf)msg;System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));System.out.println("服务器的地址:"+ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
先启动服务端,再启动客户端测试:
客户端控制台输出:
服务端控制台输出:
上一篇:GP 常用字符串函数
下一篇:Antd-table全选踩坑记录