为了让你想象 Transport 如何工作,我会从一个简单的应用程序开始,这个应用程序什么都不做,只是接受客户端连接并发送“Hi!”字符串消息到客户端,发送完了就断开连接。
没有用 Netty 实现 I/O 和 NIO
我们将不用 Netty 实现只用 JDK API 来实现 I/O 和 NIO。下面这个例子,是使用阻塞 IO 实现的例子:
Listing 4.1 Blocking networking without Netty
public class PlainOioServer {
public void serve(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port); //1
try {
for (;;) {
final Socket clientSocket = socket.accept(); //2
System.out.println("Accepted connection from " + clientSocket);
new Thread(new Runnable() { //3
@Override
public void run() {
OutputStream out;
try {
out = clientSocket.getOutputStream();
out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); //4
out.flush();
clientSocket.close(); //5
} catch (IOException e) {
e.printStackTrace();
try {
clientSocket.close();
} catch (IOException ex) {
// ignore on close
}
}
}
}).start(); //6
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 绑定服务器到指定的端口。
- 接受一个连接。
- 创建一个新的线程来处理连接。
- 将消息发送到连接的客户端。
- 一旦消息被写入和刷新时就 关闭连接。
- 启动线程。
上面的方式可以工作正常,但是这种阻塞模式在大连接数的情况就会有很严重的问题,如客户端连接超时,服务器响应严重延迟,性能无法扩展。为了解决这种情况,我们可以使用异步网络处理所有的并发连接,但问题在于 NIO 和 OIO 的 API 是完全不同的,所以一个用OIO开发的网络应用程序想要使用NIO重构代码几乎是重新开发。
下面代码是使用 NIO 实现的例子:
Listing 4.2 Asynchronous networking without Netty
public class PlainNioServer {
public void serve(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address); //1
Selector selector = Selector.open(); //2
serverChannel.register(selector, SelectionKey.OP_ACCEPT); //3
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
for (;;) {
try {
selector.select(); //4
} catch (IOException ex) {
ex.printStackTrace();
// handle exception
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys(); //5
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) { //6
ServerSocketChannel server =
(ServerSocketChannel)key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_WRITE |
SelectionKey.OP_READ, msg.duplicate()); //7
System.out.println(
"Accepted connection from " + client);
}
if (key.isWritable()) { //8
SocketChannel client =
(SocketChannel)key.channel();
ByteBuffer buffer =
(ByteBuffer)key.attachment();
while (buffer.hasRemaining()) {
if (client.write(buffer) == 0) { //9
break;
}
}
client.close(); //10
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
// 在关闭时忽略
}
}
}
}
}
}
- 绑定服务器到制定端口
- 打开 selector 处理 channel
- 注册 ServerSocket 到 ServerSocket ,并指定这是专门意接受 连接。
- 等待新的事件来处理。这将阻塞,直到一个事件是传入。
- 从收到的所有事件中 获取 SelectionKey 实例。
- 检查该事件是一个新的连接准备好接受。
- 接受客户端,并用 selector 进行注册。
- 检查 socket 是否准备好写数据。
- 将数据写入到所连接的客户端。如果网络饱和,连接是可写的,那么这个循环将写入数据,直到该缓冲区是空的。
- 关闭连接。
如你所见,即使它们实现的功能是一样,但是代码完全不同。下面我们将用Netty 来实现相同的功能。
采用 Netty 实现 I/O 和 NIO
下面代码是使用Netty作为网络框架编写的一个阻塞 IO 例子:
Listing 4.3 Blocking networking with Netty
public class NettyOioServer {
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); //1
b.group(group) //2
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {//3
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //4
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5
}
});
}
});
ChannelFuture f = b.bind().sync(); //6
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync(); //7
}
}
}
- 创建一个 ServerBootstrap
- 使用 OioEventLoopGroup 允许阻塞模式(Old-IO)
- 指定 ChannelInitializer 将给每个接受的连接调用
- 添加的 ChannelHandler 拦截事件,并允许他们作出反应
- 写信息到客户端,并添加 ChannelFutureListener 当一旦消息写入就关闭连接
- 绑定服务器来接受连接
- 释放所有资源
下面代码是使用 Netty NIO 实现。
Netty NIO 版本
下面是 Netty NIO 的代码,只是改变了一行代码,就从 OIO 传输 切换到了 NIO。
Listing 4.4 Asynchronous networking with Netty
public class NettyNioServer {
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); //1
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() { //3
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //4
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate()) //5
.addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync(); //6
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync(); //7
}
}
}
- 创建一个 ServerBootstrap
- 使用 OioEventLoopGroup 允许阻塞模式(Old-IO)
- 指定 ChannelInitializer 将给每个接受的连接调用
- 添加的 ChannelInboundHandlerAdapter() 接收事件并进行处理
- 写信息到客户端,并添加 ChannelFutureListener 当一旦消息写入就关闭连接
- 绑定服务器来接受连接
- 释放所有资源
因为 Netty 使用相同的 API 来实现每个传输,它并不关心你使用什么来实现。Netty 通过操作接口Channel 、ChannelPipeline 和 ChannelHandler来实现。
现在你了解到了用 基于 Netty 传输的好处。下面就来看下传输的 API.