博客
关于我
Netty(ChannelHandler 和 ChannelPipeline)
阅读量:573 次
发布时间:2019-03-09

本文共 5149 字,大约阅读时间需要 17 分钟。

ChannelHandler家族

Channel的生命周期

Channel接口定义了一个与ChannelInboundHandler API密切相关的状态模型,其主要包含4种状态:

  • ChannelUnregistered

    Channel已被创建,但尚未注册到EventLoop。

  • ChannelRegistered

    Channel已成功注册到EventLoop,能够处理I/O操作。

  • ChannelActive

    Channel已连接到远程节点,进入活跃状态,可进行读写。

  • ChannelInactive

    Channel未连接到远程节点,处于非活跃状态。

状态变化会触发相应事件,由ChannelPipeline中的ChannelHandler处理。


ChannelHandler的生命周期

ChannelHandler接口定义了与Channel相关的关键操作:

  • handlerAdded

    当ChannelHandler被添加到ChannelPipeline时触发。

  • handlerRemoved

    当ChannelHandler从ChannelPipeline中移除时触发。

  • exceptionCaught

    发生异常时触发,如未捕获的入站异常。


ChannelHandler的子接口

  • ChannelInboundHandler

    处理入站数据和状态变化,如读取、状态改变等。

  • ChannelOutboundHandler

    处理出站操作,如绑定、连接、断开、写入数据等。


ChannelInboundHandler接口

ChannelInboundHandler接口定义了入站事件的处理方法:

  • channelRegistered

    Channel已注册到EventLoop。

  • channelUnregistered

    Channel已从EventLoop注销。

  • channelActive/Inactive

    Channel状态变化提示。

  • channelRead

    处理即将读取的数据。

  • channelReadComplete

    处理全部读取完成后的数据。

  • userEventTriggered

    及时触发用户定义事件。

  • channelWritabilityChanged

    写入状态改变提示。


ChannelOutboundHandler接口

ChannelOutboundHandler接口定义了出站操作的处理方法:

  • bindlegant、connect、disconnect、close、deregister、flush、write
    用于完成出站操作,并支持异步处理。

ChannelHandlerContext接口

ChannelHandlerContext为ChannelHandler和ChannelPipeline之间提供交互接口:

  • 事件触发:通过fire*方法分发给下一个相应的ChannelHandler。

  • I/O操作:提供统一的操作接口,便于处理入站和出站事件。


ChannelHandlerContext的使用

ChannelHandlerContext在事件传播时遵循ChannelPipeline的逻辑:

  • 事件传播:将事件从第一个适配的ChannelHandler开始,直到所有相关ChannelHandler处理完毕。

  • 操作完成:通过ChannelFuture返回,支持异步处理和错误处理。


适配器与资源管理

为了简化开发,Netty提供了ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter适配器类,这些适配器实现了基础逻辑,开发者可按照需求扩展。


##Client与Server代码示例

public class EchoClient {    ByteBuf buffer = Unpooled.copiedBuffer("Hello, World", Charset.forName("UTF-8"));    void connect(final int port, final String host) throws Exception {        EventLoopGroup group = new NioEventLoopGroup();        Bootstrap bootstrap = new Bootstrap();        bootstrap.group(group).channel(NioSocketChannel.class)                .option(ChannelOption.TCP_NODELAY, true)                .handler(new ChildChannelHandler());        final ChannelFuture future = bootstrap.connect(host, port).sync();        future.channel().closeFuture().sync();        try {            while (!future.isSuccess() && !future.channel().isClosed()) {                // 处理连接错误                future.addListener(new ChannelFutureListener() {                    @Override                    public void operationComplete(ChannelFuture future) throws Exception {                        if (!future.isSuccess()) {                            Throwable cause = future.cause();                            System.out.println("连接失败:" + cause.getMessage());                        }                    }                });                Thread.sleep(1);            }        } finally {            group.shutdownGracefully();        }    }    class ChildChannelHandler extends ChannelInitializer
{ @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("客户端已连接到服务器"); ch.pipeline().write(buffer); ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("进入活跃状态"); ctx.write(buffer); } }); } }}

服务器端代码

public class EchoServer {    final ByteBuf buffer = Unpooled.copiedBuffer("Hello, World", Charset.forName("UTF-8"));    void bind(final int port) throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        ServerBootstrap bootstrap = new ServerBootstrap();        bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)                .option(ChannelOption.SO_BACKLOG, 100)                .childHandler(new ChildChannelHandler());        final ChannelFuture future = bootstrap.bind(port).sync();        future.channel().closeFuture().sync();        while (!future.isSuccess() && !future.channel().isClosed()) {            Thread.sleep(100);        }        closeResources(bossGroup, workerGroup);    }    class ChildChannelHandler extends ChannelInitializer
{ @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("服务器已启动并接受连接"); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf incoming = (ByteBuf) msg; System.out.println("接收到数据:" + incoming UTF-8 decoding); ctx.writeAndFlush(buffer); } }); } } static void closeResources(EventLoopGroup... groups) { for (EventLoopGroup group : groups) { try { group.shutdownGracefully(); } catch (Exception e) { System.out.println("关闭事件循环组失败:" + e.getMessage()); } } }}

总结

ChannelHandler家族是Netty处理通信逻辑的核心机制,涵盖了从连接到数据传输的全过程。通过灵活的事件触发和异步模型,开发者可以构建高效、可扩展的联网应用。

转载地址:http://zemsz.baihongyu.com/

你可能感兴趣的文章
Objective-C实现QR正交三角分解法算法(附完整源码)
查看>>
Objective-C实现qubit measure量子位测量算法(附完整源码)
查看>>
Objective-C实现Queue队列算法(附完整源码)
查看>>
Objective-C实现Queue队列算法(附完整源码)
查看>>
Objective-C实现quick select快速选择算法(附完整源码)
查看>>
Objective-C实现rabin-karp算法(附完整源码)
查看>>
Objective-C实现radians弧度制算法(附完整源码)
查看>>
Objective-C实现radianToDegree弧度到度算法(附完整源码)
查看>>
Objective-C实现radix sort基数排序算法(附完整源码)
查看>>
Objective-C实现rail fence围栏密码算法(附完整源码)
查看>>
Objective-C实现rayleigh quotient瑞利商算法(附完整源码)
查看>>
Objective-C实现RC4加解密算法(附完整源码)
查看>>
Objective-C实现RC4加解密算法(附完整源码)
查看>>
Objective-C实现recursive bubble sor递归冒泡排序算法(附完整源码)
查看>>
Objective-C实现recursive insertion sort递归插入排序算法(附完整源码)
查看>>
Objective-C实现recursive quick sort递归快速排序算法(附完整源码)
查看>>
Objective-C实现RedBlackTree红黑树算法(附完整源码)
查看>>
Objective-C实现redis分布式锁(附完整源码)
查看>>
Objective-C实现regular-expression-matching正则表达式匹配算法(附完整源码)
查看>>
Objective-C实现relu线性整流函数算法(附完整源码)
查看>>