Kymotz's Blog

Easy Netty 系列(二):Echo程序详解

#Netty

摘要:这是一个简单的 echo 程序,它将接收你发过来的数据然后返回给你,通过这个 echo 程序能了解 netty 程序如何进行引导、基本组件情况。

程序清单

  • EchoServer
  • EchoServerHandler
  • EchoClient
  • EchoClientHandler

代码

EchoServer

 1import io.netty.bootstrap.ServerBootstrap;
 2import io.netty.channel.*;
 3import io.netty.channel.nio.NioEventLoopGroup;
 4import io.netty.channel.socket.SocketChannel;
 5import io.netty.channel.socket.nio.NioServerSocketChannel;
 6import io.netty.handler.logging.LogLevel;
 7import io.netty.handler.logging.LoggingHandler;
 8
 9import java.net.InetSocketAddress;
10
11/**
12 * echo server,接收打印client发送过来的数据,并把数据返回给client
13 */
14public final class EchoServer {
15
16    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
17
18    public static void main(String[] args) throws Exception {
19
20        // 配置服务器
21        // 主event loop,负责接受(accept)socket连接
22        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
23        // 从event loop,负责处理socket的事件
24        EventLoopGroup workerGroup = new NioEventLoopGroup();
25        // server 的channel处理器
26        final EchoServerHandler serverHandler = new EchoServerHandler();
27        try {
28            // Server引导器(配置器)
29            ServerBootstrap b = new ServerBootstrap();
30            b.group(bossGroup, workerGroup)
31                    .channel(NioServerSocketChannel.class)
32                    // 设置ip地址和端口,server默认使用本地ip,也可以通过bind()来设置
33                    .localAddress(new InetSocketAddress(PORT))
34                    // 配置网络传输参数,例如缓存大小
35                    .option(ChannelOption.SO_BACKLOG, 100)
36                    // 配置日志打印
37                    .handler(new LoggingHandler(LogLevel.INFO))
38                    // netty核心组件,pipeline 处理器
39                    .childHandler(new ChannelInitializer<SocketChannel>() {
40                        @Override
41                        public void initChannel(SocketChannel ch) throws Exception {
42                            // 通过channel获取管道对象
43                            ChannelPipeline pipeline = ch.pipeline();
44                            // 通过管道挂在handler
45                            pipeline.addLast(serverHandler);
46                        }
47                    });
48
49            // 绑定设置的断开(PORT),并阻塞(sync)到绑定完成,绑定完成后就开始监听close信号
50            ChannelFuture f = b.bind().sync();
51
52            // 阻塞到接收client发过来关闭(close)信号
53            f.channel().closeFuture().sync();
54            // 当client发过来close信号后,才会执行这里
55        } finally {
56            // 关闭所有event loop的线程,并关闭内部的线程
57            bossGroup.shutdownGracefully();
58            workerGroup.shutdownGracefully();
59        }
60    }
61
62}

EchoServerHandler

 1import io.netty.buffer.ByteBuf;
 2import io.netty.buffer.Unpooled;
 3import io.netty.channel.ChannelHandler.Sharable;
 4import io.netty.channel.ChannelHandlerContext;
 5import io.netty.channel.ChannelInboundHandlerAdapter;
 6import java.nio.charset.StandardCharsets;
 7
 8/**
 9 * Handler implementation for the echo server.
10 */
11@Sharable
12public class EchoServerHandler extends ChannelInboundHandlerAdapter {
13
14    /**  第一次建立连接调用 */
15    @Override
16    public void channelActive(ChannelHandlerContext ctx) throws Exception {
17        final char[] chars = "hello client, I am Server.".toCharArray();
18        // 建立连接想client发送一句话
19        ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer(chars, StandardCharsets.UTF_8));
20    }
21
22    /** 客户端发送过来数据调用 */
23    @Override
24    public void channelRead(ChannelHandlerContext ctx, Object msg) {
25        final ByteBuf buf = (ByteBuf) msg;
26        String recv = "接收:" + buf.toString(StandardCharsets.UTF_8);
27        if(recv.indexOf("exit") != -1){
28            ctx.close();
29            return;
30        }
31        System.out.print(recv);
32        // 将数据传回客户端
33        ctx.write(Unpooled.copiedBuffer(recv.getBytes(StandardCharsets.UTF_8)));
34    }
35
36    /** 读取完毕,完成一次读事件 */
37    @Override
38    public void channelReadComplete(ChannelHandlerContext ctx) {
39        // flush - 冲刷数据,flush一次便会触发一次或多次client的read事件,反之server也是
40        ctx.flush();
41    }
42
43    /** 捕获异常 */
44    @Override
45    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
46        // 引发异常时关闭连接会
47        cause.printStackTrace();
48        ctx.close();
49    }
50
51}

EchoClient

 1import io.netty.bootstrap.Bootstrap;
 2import io.netty.buffer.Unpooled;
 3import io.netty.channel.*;
 4import io.netty.channel.nio.NioEventLoopGroup;
 5import io.netty.channel.socket.SocketChannel;
 6import io.netty.channel.socket.nio.NioSocketChannel;
 7
 8import java.nio.charset.StandardCharsets;
 9import java.util.Scanner;
10
11/**
12 * 启动client,可以想server发送消息。server的console可以查看client发送的消息。
13 */
14public final class EchoClient {
15
16    static final String HOST = System.getProperty("host", "127.0.0.1");
17    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
18
19    public static void main(String[] args) throws Exception {
20        // 配置客户端
21        // client工作线程,每一个线程将和一个EventLoop绑定
22        EventLoopGroup group = new NioEventLoopGroup();
23        try {
24            // 客户端引导其
25            Bootstrap b = new Bootstrap();
26            //配置client启动参数
27            b.group(group)
28                    // 配置数据通道,不同的数据通道决定了IO的方式,例如Nio开头的是非阻塞IO、Oio的是阻塞IO
29                    .channel(NioSocketChannel.class)
30                    // 网络传输的参数,例如网络缓存缓存的大小、是否延迟
31                    .option(ChannelOption.TCP_NODELAY, true)
32                    // 入站、出站处理器
33                    // initializer是在入站只执行一次的处理器
34                    // 它用来与server建立连接、并在pipeline上挂载编解码处理器、自定义处理器
35                    .handler(new ChannelInitializer<SocketChannel>() {
36                        @Override
37                        public void initChannel(SocketChannel ch) throws Exception {
38                            ChannelPipeline p = ch.pipeline();
39                            //p.addLast(new LoggingHandler(LogLevel.INFO));
40                            p.addLast(new EchoClientHandler());
41                        }
42                    });
43
44            // 与server建立连接(connect),并阻塞(sync)到连接建立
45            ChannelFuture f = b.connect(HOST, PORT).sync();
46            // 获取建立的连接
47            final Channel clientChannel = f.channel();
48
49            System.out.println("input 'exit' to EXIT.");
50            Scanner sc = new Scanner(System.in);
51            String str;
52            // 读取并向server发送数据
53            while (!(str = sc.next()).startsWith("exit")) {
54                clientChannel.writeAndFlush(Unpooled.copiedBuffer(str.toCharArray(), StandardCharsets.UTF_8));
55            }
56            // 想server发送断开连接的信号
57            clientChannel.close();
58            // 获取关闭连接后的结果(future),并阻塞(sync)到接收到server发过来的断开连接信号
59            clientChannel.closeFuture().sync();
60        } finally {
61            // 关闭所有的EventLoop,并终止内部的线程
62            group.shutdownGracefully();
63        }
64    }
65}

EchoClientHandler

 1import io.netty.buffer.ByteBuf;
 2import io.netty.buffer.Unpooled;
 3import io.netty.channel.ChannelHandlerContext;
 4import io.netty.channel.ChannelInboundHandlerAdapter;
 5
 6import java.nio.charset.StandardCharsets;
 7
 8/**
 9 * client 端 handler
10 */
11public class EchoClientHandler extends ChannelInboundHandlerAdapter {
12
13    private final ByteBuf firstMessage;
14
15    public EchoClientHandler() {
16        firstMessage = Unpooled.copiedBuffer("start data transfer.\n".toCharArray(), StandardCharsets.UTF_8);
17    }
18
19    /**
20     * 建立连接向server发送一个消息
21     */
22    @Override
23    public void channelActive(ChannelHandlerContext ctx) {
24        ctx.writeAndFlush(firstMessage);
25    }
26
27    /**
28     * client的读事件触发会调用此方法
29     */
30    @Override
31    public void channelRead(ChannelHandlerContext ctx, Object msg) {
32        System.out.println("client recv : " + ((ByteBuf) msg).toString(StandardCharsets.UTF_8));
33    }
34
35    /**
36     * 读取完毕向网络发送数据,flush会触发server的read事件
37     */
38    @Override
39    public void channelReadComplete(ChannelHandlerContext ctx) {
40        ctx.flush();
41    }
42
43    @Override
44    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
45        cause.printStackTrace();
46        ctx.close();
47    }
48
49}

通过注释能了解 netty 运行的组件和开发模式,但可能还会有些疑问,看下面的问题。

问题

1、为什么 Server 和 Client 使用不同的引导(BootstrapServerBootstrap

实际上 server 和 client 有很大的不同,client 配置只需要一个连接一个线程就够了,而 server 需要配置多个;channel 也有区别,server 的 channel 一方面与 client 建立连接(accept),另一方面处理 client channel 的读写事件。因为功能的差异,所以配置存在差异,就导致需要不同的引导器配置。

2、为什么 server 有两个线程组(EventLoopGroup),而 client 只有一个

server 是可以只用一个线程组。这是 netty 线程模型的缘故,netty 使用的是 Reactor 线程模型。

3、Future

Future是异步编程常见词汇,它代表了一个结果,这个结果会在未来的某个时刻发生,而功过Future能拿到这个结果。常见的是ChannelFuture,它表示了一个操作(如bind、监听关闭closeFuture)的结果。

4、sync() 方法

这个方法是一个同步方法,会阻塞上个操作,直到满足条件,例如:closeFuture().sync() 会阻塞到接收到关闭连接到信号到来。


Top↑
comments powered by Disqus