本文共 5149 字,大约阅读时间需要 17 分钟。
Channel接口定义了一个与ChannelInboundHandler API密切相关的状态模型,其主要包含4种状态:
ChannelUnregistered
Channel已被创建,但尚未注册到EventLoop。ChannelRegistered
Channel已成功注册到EventLoop,能够处理I/O操作。ChannelActive
Channel已连接到远程节点,进入活跃状态,可进行读写。ChannelInactive
Channel未连接到远程节点,处于非活跃状态。状态变化会触发相应事件,由ChannelPipeline中的ChannelHandler处理。
ChannelHandler接口定义了与Channel相关的关键操作:
handlerAdded
当ChannelHandler被添加到ChannelPipeline时触发。handlerRemoved
当ChannelHandler从ChannelPipeline中移除时触发。exceptionCaught
发生异常时触发,如未捕获的入站异常。ChannelInboundHandler
处理入站数据和状态变化,如读取、状态改变等。ChannelOutboundHandler
处理出站操作,如绑定、连接、断开、写入数据等。ChannelInboundHandler接口定义了入站事件的处理方法:
channelRegistered
Channel已注册到EventLoop。channelUnregistered
Channel已从EventLoop注销。channelActive/Inactive
Channel状态变化提示。channelRead
处理即将读取的数据。channelReadComplete
处理全部读取完成后的数据。userEventTriggered
及时触发用户定义事件。channelWritabilityChanged
写入状态改变提示。ChannelOutboundHandler接口定义了出站操作的处理方法:
ChannelHandlerContext为ChannelHandler和ChannelPipeline之间提供交互接口:
事件触发:通过fire*方法分发给下一个相应的ChannelHandler。
I/O操作:提供统一的操作接口,便于处理入站和出站事件。
ChannelHandlerContext在事件传播时遵循ChannelPipeline的逻辑:
事件传播:将事件从第一个适配的ChannelHandler开始,直到所有相关ChannelHandler处理完毕。
操作完成:通过ChannelFuture返回,支持异步处理和错误处理。
为了简化开发,Netty提供了ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter适配器类,这些适配器实现了基础逻辑,开发者可按照需求扩展。
##Client与Server代码示例
public class EchoClient { ByteBuf buffer = Unpooled.copiedBuffer("Hello, World", Charset.forName("UTF-8")); void connect(final int port, final String host) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChildChannelHandler()); final ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().closeFuture().sync(); try { while (!future.isSuccess() && !future.channel().isClosed()) { // 处理连接错误 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { Throwable cause = future.cause(); System.out.println("连接失败:" + cause.getMessage()); } } }); Thread.sleep(1); } } finally { group.shutdownGracefully(); } } class ChildChannelHandler extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("客户端已连接到服务器"); ch.pipeline().write(buffer); ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("进入活跃状态"); ctx.write(buffer); } }); } }} public class EchoServer { final ByteBuf buffer = Unpooled.copiedBuffer("Hello, World", Charset.forName("UTF-8")); void bind(final int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .childHandler(new ChildChannelHandler()); final ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); while (!future.isSuccess() && !future.channel().isClosed()) { Thread.sleep(100); } closeResources(bossGroup, workerGroup); } class ChildChannelHandler extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("服务器已启动并接受连接"); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf incoming = (ByteBuf) msg; System.out.println("接收到数据:" + incoming UTF-8 decoding); ctx.writeAndFlush(buffer); } }); } } static void closeResources(EventLoopGroup... groups) { for (EventLoopGroup group : groups) { try { group.shutdownGracefully(); } catch (Exception e) { System.out.println("关闭事件循环组失败:" + e.getMessage()); } } }} ChannelHandler家族是Netty处理通信逻辑的核心机制,涵盖了从连接到数据传输的全过程。通过灵活的事件触发和异步模型,开发者可以构建高效、可扩展的联网应用。
转载地址:http://zemsz.baihongyu.com/