Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Event-Loop机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
生产者使用netty 来通过网络获取数据,放入到BlockingQueue中,并且同时使用 protobuf封装数据返回给客户端,消费者从BlockingQueue取得数据。
server 代码
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChildChannelHandler(dataQueueCache));
// Start the server.
ChannelFuture f = b.bind(port).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
测试发送过来的数据是字符串的类型,需要添加一个字符串的解码器,在ChildChannelHandler 类中
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(Charset.forName("utf-8")));
ch.pipeline().addLast(new MyHandler(dataQueueCache));
}
MyHandler处理客户端发送过来的数据,在channelRead方法中。同时使用protobuf 封装数据,并且进行数据的存储放在BlockingQueue中
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String ip = ctx.channel().localAddress().toString();
String data = msg.toString();
DataDO dataDO = new DataDO();
dataDO.setIp(ip);
dataDO.setData(data);
this.dataQueueCache.putData(dataDO);
//使用protobuf 返回数据
Msg.msgInfo msgInfo = Msg.msgInfo.newBuilder().setData("data").setID(1).build();
ByteBuf byteBuf = Unpooled.buffer(msgInfo.toByteArray().length);
byteBuf.writeBytes(msgInfo.toByteArray());
ctx.writeAndFlush(byteBuf);
}
消费者就是只负责从BlockingQueue去出数据进行处理
while (true){
if (this.dataQueueCache.getCount() > 0){
DataDO dataDO = dataQueueCache.pollData();
//do something
System.out.println("data:"+ dataDO.toString());
}
}