4 Java I/O:AIO和NIO中的Selector( 二 )


两次连接,打印了三条信息:说明selector的轮询在起作用(因为Set<SelectionKey>中包含了所有处于监听的SelectionKey) 。但是「接收就绪」监听事件仅执行了一次就再不响应 。如果感兴趣的话你可以把OP_READ、OP_WRITE这些事件也执行一下试试看 。
因为Selector是单线程轮询监听多个Channel,那么如果Selector(线程)之间需要传递数据,怎么办呢?——Pipe登场了 。Pipe就是一种用于Selector之间数据传递的「管道」 。
先来看个图:

4 Java I/O:AIO和NIO中的Selector

文章插图
可以清楚地看到它的工作方式 。
还是用代码来解释 。
/** * NIO中的Pipe * * @author xiangwang */public class TestPipe {public static void main(String args[]) throws IOException {// 打开管道Pipe pipe = Pipe.open();// 将Buffer数据写入到管道Pipe.SinkChannel sinkChannel = pipe.sink();ByteBuffer buffer = ByteBuffer.allocate(32);buffer.put("ByteBuffer".getBytes());// 切换到写模式buffer.flip();sinkChannel.write(buffer);// 从管道读取数据Pipe.SourceChannel sourceChannel = pipe.source();buffer = ByteBuffer.allocate(32);sourceChannel.read(buffer);System.out.println(new String(buffer.array()));// 关闭管道sinkChannel.close();sourceChannel.close();}}之前说过,同步指的按顺序一次完成一个任务,直到前一个任务完成并有了结果以后,才能再执行后面的任务 。而异步指的是前一个任务结束后,并不等待任务结果,而是继续执行后一个任务,在所有任务都「执行」完后,通过任务的回调函数去获得结果 。所以异步使得应用性能有了极大的提高 。为了更加生动地说明什么是异步,可以来做个实验:
4 Java I/O:AIO和NIO中的Selector

文章插图
通过调用CompletableFuture.supplyAsync()方法可以很明显地观察到,处于位置2的「这一步先执行」会最先显示,然后才执行位置1的代码 。而这就是异步的具体实现 。
NIO为了支持异步,升级到了NIO2,也就是AIO 。而AIO引入了新的异步Channel的概念,并提供了异步FileChannel和异步SocketChannel的实现 。AIO的异步SocketChannel是真正的异步非阻塞I/O 。通过代码可以更好地说明:
/** * AIO客户端 * * @author xiangwang */public class AioClient {public void start() throws IOException, InterruptedException {AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();if (channel.isOpen()) {// socket接收缓冲区recbuf大小channel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024);// socket发送缓冲区recbuf大小channel.setOption(StandardSocketOptions.SO_SNDBUF, 128 * 1024);// 保持长连接状态channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);// 连接到服务端channel.connect(new InetSocketAddress(8080), null,new AioClientHandler(channel));// 阻塞主进程for(;;) {TimeUnit.SECONDS.sleep(1);}} else {throw new RuntimeException("Channel not opened!");}}public static void main(String[] args) throws IOException, InterruptedException {new AioClient().start();}}/** * AIO客户端CompletionHandler * * @author xiangwang */public class AioClientHandler implements CompletionHandler<Void, AioClient> {private final AsynchronousSocketChannel channel;private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder();private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in));public AioClientHandler(AsynchronousSocketChannel channel) {this.channel = channel;}@Overridepublic void failed(Throwable exc, AioClient attachment) {throw new RuntimeException("channel not opened!");}@Overridepublic void completed(Void result, AioClient attachment) {System.out.println("send message to server: ");try {// 将输入内容写到bufferString line = input.readLine();channel.write(ByteBuffer.wrap(line.getBytes()));// 在操作系统中的Java本地方法native已经把数据写到了buffer中// 这里只需要一个缓冲区能接收就行了ByteBuffer buffer = ByteBuffer.allocate(1024);while (channel.read(buffer).get() != -1) {buffer.flip();System.out.println("from server: " + decoder.decode(buffer).toString());if (buffer.hasRemaining()) {buffer.compact();} else {buffer.clear();}// 将输入内容写到bufferline = input.readLine();channel.write(ByteBuffer.wrap(line.getBytes()));}} catch (IOException | InterruptedException | ExecutionException e) {e.printStackTrace();}}}

经验总结扩展阅读