netty与protobuf生产者和消费者

作者 chauncy 日期 2016-12-09
netty与protobuf生产者和消费者

Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Event-Loop机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

生产者使用netty 来通过网络获取数据,放入到BlockingQueue中,并且同时使用 protobuf封装数据返回给客户端,消费者从BlockingQueue取得数据。

server 代码

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChildChannelHandler(dataQueueCache));

    // Start the server.
    ChannelFuture f = b.bind(port).sync();

    // Wait until the server socket is closed.
    f.channel().closeFuture().sync();

测试发送过来的数据是字符串的类型,需要添加一个字符串的解码器,在ChildChannelHandler 类中

protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new StringDecoder(Charset.forName("utf-8")));
    ch.pipeline().addLast(new MyHandler(dataQueueCache));
}

MyHandler处理客户端发送过来的数据,在channelRead方法中。同时使用protobuf 封装数据,并且进行数据的存储放在BlockingQueue中

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       String ip = ctx.channel().localAddress().toString();
       String data = msg.toString();
       DataDO dataDO = new DataDO();
       dataDO.setIp(ip);
       dataDO.setData(data);
       this.dataQueueCache.putData(dataDO);

       //使用protobuf 返回数据
       Msg.msgInfo msgInfo = Msg.msgInfo.newBuilder().setData("data").setID(1).build();
       ByteBuf byteBuf = Unpooled.buffer(msgInfo.toByteArray().length);
       byteBuf.writeBytes(msgInfo.toByteArray());
       ctx.writeAndFlush(byteBuf);


   }

消费者就是只负责从BlockingQueue去出数据进行处理

while (true){
     if (this.dataQueueCache.getCount() > 0){
           DataDO dataDO = dataQueueCache.pollData();
           //do something
           System.out.println("data:"+ dataDO.toString());

          }
       }

代码地址