• 86485

    文章

  • 757

    评论

  • 18

    友链

  • 最近新加了换肤功能,大家多来逛逛吧~~~~
  • 喜欢这个网站的朋友可以加一下QQ群,我们一起交流技术。

深入理解 NIO

撸了今年阿里、腾讯和美团的面试,我有一个重要发现.......>>

前言

基于BIO实现的Server端,当建立100个连接时会有多少个线程?如果基于NIO会有多少个线程?

BIO

所谓的BIO 就是最传统的 socket链接嘛,比如:

int port = 4343; //端口号
// Socket 服务器端(简单的发送信息)
Thread sThread = new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            ServerSocket serverSocket = new ServerSocket(port);
            while (true) {
                // 等待连接
                Socket socket = serverSocket.accept();
                Thread sHandlerThread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try (PrintWriter printWriter = new PrintWriter(socket.getOutputStream())) {
                            printWriter.println("hello world!");
                            printWriter.flush();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                });
                sHandlerThread.start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
});
sThread.start();

// Socket 客户端(接收信息并打印)
try (Socket cSocket = new Socket(InetAddress.getLocalHost(), port)) {
    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(cSocket.getInputStream()));
    bufferedReader.lines().forEach(s -> System.out.println("客户端:" + s));
} catch (UnknownHostException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

流程图大概如下

所以看上会有101个线程, 一个accept线程以及100个链接线程。

这不算复杂。

NIO

实际上我们不太会自己写NIO的代码,而是会使用netty这样优秀的开源库。

图例逻辑表示大概是这样

首先每个客户端都会对应一个SocketChannel的通道(一般通道通过buffer读写数据),然后这些socketchannel会被注册进入selector。 selector相当于一个管理器,他会轮询所有socketchannel,查询所有可用socketchannel,然后去处理这些 socketchannel.

服务端

public class NIOServerSocket {

    //存储SelectionKey的队列
    private static List<SelectionKey> writeQueue = new ArrayList<SelectionKey>();
    private static Selector selector = null;

    //添加SelectionKey到队列
    public static void addWriteQueue(SelectionKey key){
        synchronized (writeQueue) {
            writeQueue.add(key);
            //唤醒主线程
            selector.wakeup();
        }
    }

    public static void main(String[] args) throws IOException {

        // 1.创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 2.绑定端口
        serverSocketChannel.bind(new InetSocketAddress(60000));
        // 3.设置为非阻塞 nio只能使用非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 4.创建通道选择器
        selector = Selector.open();
        /*
         * 5.注册事件类型
         *
         *  sel:通道选择器
         *  ops:事件类型 ==>SelectionKey:包装类,包含事件类型和通道本身。四个常量类型表示四种事件类型
         *  SelectionKey.OP_ACCEPT 获取报文      SelectionKey.OP_CONNECT 连接
         *  SelectionKey.OP_READ 读           SelectionKey.OP_WRITE 写
         */
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            System.out.println("服务器端:正在监听60000端口");
            // 6.获取可用I/O通道,获得有多少可用的通道
            int num = selector.select();
            if (num > 0) { // 判断是否存在可用的通道
                // 获得所有的keys
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                // 使用iterator遍历所有的keys
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                // 迭代遍历当前I/O通道
                while (iterator.hasNext()) {
                    // 获得当前key
                    SelectionKey key = iterator.next();
                    // 调用iterator的remove()方法,并不是移除当前I/O通道,标识当前I/O通道已经处理。
                    iterator.remove();
                    // 判断事件类型,做对应的处理
                    if (key.isAcceptable()) {
                        ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = ssChannel.accept();
 
                        System.out.println("处理请求:"+ socketChannel.getRemoteAddress());
                        // 获取客户端的数据
                        // 设置非阻塞状态
                        socketChannel.configureBlocking(false);
                        // 注册到selector(通道选择器)
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        System.out.println("读事件");
                        //取消读事件的监控
                        key.cancel();
                        //调用读操作工具类
                        NIOHandler.read(key);
                    } else if (key.isWritable()) {
                        System.out.println("写事件");
                        //取消读事件的监控
                        key.cancel();
                        //调用写操作工具类
                        NIOHandler.write(key);
                    }
                }
            }else{
                synchronized (writeQueue) {
                    while(writeQueue.size() > 0){
                        SelectionKey key = writeQueue.remove(0);
                        //注册写事件
                        SocketChannel channel = (SocketChannel) key.channel();
                        Object attachment = key.attachment();
                        channel.register(selector, SelectionKey.OP_WRITE,attachment);
                    }
                }
            }
        }
    }
}

消息处理器,这里会使用多线程来处理消息,线程数量一般和服务器cpu核心数相关,主要目的是为了发挥cpu所有核心的性能。

public class NIOHandler {
 
    //构造线程池
    private static ExecutorService executorService  = Executors.newFixedThreadPool(10);
 
    public static void read(final SelectionKey key){
        //获得线程并执行
        executorService.submit(new Runnable() {
 
            @Override
            public void run() {
                try {
                    SocketChannel readChannel = (SocketChannel) key.channel();
                    // I/O读数据操作
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    int len = 0;
                    while (true) {
                        buffer.clear();
                        len = readChannel.read(buffer);
                        if (len == -1) break;
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            baos.write(buffer.get());
                        }
                    }
                    System.out.println("服务器端接收到的数据:"+ new String(baos.toByteArray()));
                    //将数据添加到key中
                    key.attach(baos);
                    //将注册写操作添加到队列中
                    NIOServerSocket.addWriteQueue(key);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
 
    public static void write(final SelectionKey key) {
        //拿到线程并执行
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    // 写操作
                    SocketChannel writeChannel = (SocketChannel) key.channel();
                    //拿到客户端传递的数据
                    ByteArrayOutputStream attachment = (ByteArrayOutputStream)key.attachment();
                    System.out.println("客户端发送来的数据:"+new String(attachment.toByteArray()));
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    String message = "你好,我是服务器!!";
                    buffer.put(message.getBytes());
                    buffer.flip();
                    writeChannel.write(buffer);
                    writeChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

客户端

public class NIOClientSocket {
 
    public static void main(String[] args) throws IOException {
        //使用线程模拟用户 并发访问
        for (int i = 0; i < 1; i++) {
            new Thread(){
                public void run() {
                    try {
                        //1.创建SocketChannel
                        SocketChannel socketChannel=SocketChannel.open();
                        //2.连接服务器
                        socketChannel.connect(new InetSocketAddress("localhost",60000));
                        //写数据
                        String msg="我是客户端"+Thread.currentThread().getId();
                        ByteBuffer buffer=ByteBuffer.allocate(1024);
                        buffer.put(msg.getBytes());
                        buffer.flip();
                        socketChannel.write(buffer);
                        socketChannel.shutdownOutput();
                        //读数据
                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
                        int len = 0;
                        while (true) {
                            buffer.clear();
                            len = socketChannel.read(buffer);
                            if (len == -1)
                                break;
                            buffer.flip();
                            while (buffer.hasRemaining()) {
                                bos.write(buffer.get());
                            }
                        }
                        System.out.println("客户端收到:"+new String(bos.toByteArray()));
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                };
            }.start();
        }
    }
}

以上代码中有一段很奇怪的代码,也就是writequeue,这么写的主要原因是

OP_WRITE事件的就绪条件并不是发生在调用channel的write方法之后,而是在当底层缓冲区有空闲空间的情况下。因为写缓冲区在绝大部分时候都是有空闲空间的,所以如果你注册了写事件,这会使得写事件一直处于就就绪,选择处理现场就会一直占用着CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。 其实,在大部分情况下,我们直接调用channel的write方法写数据就好了,没必要都用OP_WRITE事件。那么OP_WRITE事件主要是在什么情况下使用的了?
其实OP_WRITE事件主要是在发送缓冲区空间满的情况下使用的。如:

while (buffer.hasRemaining()) {
     int len = socketChannel.write(buffer);   
     if (len == 0) {
          selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
          selector.wakeup();
          break;
     }
}

当buffer还有数据,但缓冲区已经满的情况下,socketChannel.write(buffer)会返回已经写出去的字节数,此时为0。那么这个时候我们就需要注册OP_WRITE事件,这样当缓冲区又有空闲空间的时候就会触发OP_WRITE事件,这是我们就可以继续将没写完的数据继续写出了。
而且在写完后,一定要记得将OP_WRITE事件注销:
selectionKey.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
注意,这里在修改了interest之后调用了wakeup();方法是为了唤醒被堵塞的selector方法,这样当while中判断selector返回的是0时,会再次调用selector.select()。而selectionKey的interest是在每次selector.select()操作的时候注册到系统进行监听的,所以在selector.select()调用之后修改的interest需要在下一次selector.select()调用才会生效。

所以对于NIO而言,100个链接并不会有100个线程,而是会有cpu核数+1个线程,或者cpu 核数x2 +1这样

参考

http://www.imooc.com/article/265871 https://blog.csdn.net/zxcc1314/article/details/80918665 https://www.jianshu.com/p/1af407c043cb


695856371Web网页设计师②群 | 喜欢本站的朋友可以收藏本站,或者加入我们大家一起来交流技术!

欢迎来到梁钟霖个人博客网站。本个人博客网站提供最新的站长新闻,各种互联网资讯。 还提供个人博客模板,最新最全的java教程,java面试题。在此我将尽我最大所能将此个人博客网站做的最好! 谢谢大家,愿大家一起进步!

转载原创文章请注明出处,转载至: 梁钟霖个人博客www.liangzl.com

0条评论

Loading...


发表评论

电子邮件地址不会被公开。 必填项已用*标注

自定义皮肤
注册梁钟霖个人博客