博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JAVA NIO 实例
阅读量:6121 次
发布时间:2019-06-21

本文共 17060 字,大约阅读时间需要 56 分钟。

hot3.png

        我们都知道TCP是面向连接的传输层协议,一个socket必定会有绑定一个连接,在普通的BIO(阻塞式IO)中,需要有三次握手,然后一般的socket编程就是这样的形式。

Socket服务器端流程如下:加载 -> 创建监听的套接字 -> 绑定套接字 -> 监听套接字 -> 处理客户端相关请求。

Socket客户端:同样需要先加载套接字,然后创建套接字,不过之后不用绑定和监听了,而是直接连接服务器,发送相关请求。

        他们一直就占用这个连接,如果有信息发送,那么就响应,否则就一直阻塞着。如果有多连接,那么就要使用多线程,一个线程处理一个连接,在连接还少的情况下,是允许的,但如果同时处理的连接过多比如说1000,那么在win平台上就会遇到瓶颈了如果2000,那么在linux上就遇到瓶颈了,因为在不同的平台上每一个进程能够创建的线程数是有限度的,并且过多的线程必将会引起系统对线程调度的效率问题,再怎么也要保证线程优先队列,阻塞队列;假设一千个线程,一个线程最少一兆的栈大小,对内存也是一个很大的消耗。

        总之阻塞式的IO是:一连接<一一一>一线程 

         然后出现了NIO,在java1.4引入了java.nio包,java new I/O。引入了操作系统中常用的缓冲区和通道等概念。     

        缓冲区: 在操作系统中缓冲区是为了解决CPU的计算速度和外设输入输出速度不匹配的问题,因为外设太慢了,如果没有缓冲区,那么CPU在外设输入的时候就要一直等着,就会造成CPU处理效率的低下,引入了缓冲之后,外设直接把数据放到缓冲中,当数据传输完成之后,给CPU一个中断信号,通知CPU:“我的数据传完了,你自己从缓冲里面去取吧”。如果是输出也是一样的道理。

       通道: 那么通道用来做什么呢?其实从他的名字就可以看出,它就是一条通道,您想传递出去的数据被放置在缓冲区中,然后缓冲区中怎么从哪里传输出去呢?或者外设怎么把数据传输到缓冲中呢?这里就要用到通道。它可以进一步的减少CPU的干预,同时更有效率的提高整个系统的资源利用率,例如当CPU要完成一组相关的读操作时,只需要向I/O通道发送一条指令,以给出其要执行的通道程序的首地址和要访问的设备,通道执行通道程序便可以完成CPU指定的I/O任务。

      选择器: 另外一项创新是选择器,当我们使用通道的时候也许通道没有准备好,或者有了新的请求过来,或者线程遇到了阻塞,而选择器恰恰可以帮助CPU了解到这些信息,但前提是将这个通道注册到了这个选择器。

        在非阻塞式IO中实现的是:一请求<一一一>一线程

        下面这个例子实现了一个线程监听两个ServerSocket,只有等到请求的时候才会有处理。

Server

package cn.vicky.channel;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.channels.spi.SelectorProvider;import java.util.Iterator;/**    * TCP/IP的NIO非阻塞方式   * 服务器端   * */public class Server implements Runnable {    //第一个端口       private Integer port1 = 8099;    //第二个端口       private Integer port2 = 9099;    //第一个服务器通道 服务A       private ServerSocketChannel serversocket1;    //第二个服务器通道 服务B       private ServerSocketChannel serversocket2;    //连接1       private SocketChannel clientchannel1;    //连接2       private SocketChannel clientchannel2;    //选择器,主要用来监控各个通道的事件       private Selector selector;        //缓冲区       private ByteBuffer buf = ByteBuffer.allocate(512);        public Server() {        init();    }    /**       * 这个method的作用     * 1:是初始化选择器       * 2:打开两个通道       * 3:给通道上绑定一个socket       * 4:将选择器注册到通道上       * */    public void init() {        try {            //创建选择器               this.selector = SelectorProvider.provider().openSelector();            //打开第一个服务器通道               this.serversocket1 = ServerSocketChannel.open();            //告诉程序现在不是阻塞方式的               this.serversocket1.configureBlocking(false);            //获取现在与该通道关联的套接字               this.serversocket1.socket().bind(new InetSocketAddress("localhost", this.port1));            //将选择器注册到通道上,返回一个选择键               //OP_ACCEPT用于套接字接受操作的操作集位               this.serversocket1.register(this.selector, SelectionKey.OP_ACCEPT);            //然后初始化第二个服务端               this.serversocket2 = ServerSocketChannel.open();            this.serversocket2.configureBlocking(false);            this.serversocket2.socket().bind(new InetSocketAddress("localhost", this.port2));            this.serversocket2.register(this.selector, SelectionKey.OP_ACCEPT);        } catch (Exception e) {            e.printStackTrace();        }    }    /**       * 这个方法是连接       * 客户端连接服务器       * @throws IOException        * */    public void accept(SelectionKey key) throws IOException {        ServerSocketChannel server = (ServerSocketChannel) key.channel();        if (server.equals(serversocket1)) {            clientchannel1 = server.accept();            clientchannel1.configureBlocking(false);            //OP_READ用于读取操作的操作集位               clientchannel1.register(this.selector, SelectionKey.OP_READ);        } else {            clientchannel2 = server.accept();            clientchannel2.configureBlocking(false);            //OP_READ用于读取操作的操作集位               clientchannel2.register(this.selector, SelectionKey.OP_READ);        }    }    /**       * 从通道中读取数据       * 并且判断是给那个服务通道的       * @throws IOException        * */    public void read(SelectionKey key) throws IOException {        this.buf.clear();        //通过选择键来找到之前注册的通道           //但是这里注册的是ServerSocketChannel为什么会返回一个SocketChannel??           SocketChannel channel = (SocketChannel) key.channel();        //从通道里面读取数据到缓冲区并返回读取字节数           int count = channel.read(this.buf);        if (count == -1) {            //取消这个通道的注册               key.channel().close();            key.cancel();            return;        }        //将数据从缓冲区中拿出来           String input = new String(this.buf.array()).trim();        //那么现在判断是连接的那种服务           if (channel.equals(this.clientchannel1)) {            System.out.println("欢迎您使用服务A");            System.out.println("您的输入为:" + input);        } else {            System.out.println("欢迎您使用服务B");            System.out.println("您的输入为:" + input);        }    }    @Override    public void run() {        while (true) {            try {                System.out.println("running ... ");                //选择一组键,其相应的通道已为 I/O 操作准备就绪。                   this.selector.select();                //返回此选择器的已选择键集                   //public abstract Set
 selectedKeys()                   Iterator selectorKeys = this.selector.selectedKeys().iterator();                while (selectorKeys.hasNext()) {                    System.out.println("running2 ... ");                    //这里找到当前的选择键                       SelectionKey key = (SelectionKey) selectorKeys.next();                    //然后将它从返回键队列中删除                       selectorKeys.remove();                    if (!key.isValid()) { // 选择键无效                        continue;                    }                    if (key.isAcceptable()) {                        //如果遇到请求那么就响应                           this.accept(key);                    } else if (key.isReadable()) {                        //读取客户端的数据                           this.read(key);                    }                }            } catch (Exception e) {                e.printStackTrace();            }        }    }    public static void main(String[] args) {        Server server = new Server();        Thread thread = new Thread(server);        thread.start();    }}

Client

package cn.vicky.channel;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.net.InetAddress;/**   * TCP/IP的NIO非阻塞方式   * 客户端   * */public class Client {    //创建缓冲区       private ByteBuffer buffer = ByteBuffer.allocate(512);    //访问服务器      public void query(String host, int port) throws IOException {        InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port);        SocketChannel socket = null;        byte[] bytes = new byte[512];        while (true) {            try {                System.in.read(bytes);                socket = SocketChannel.open();                socket.connect(address);                buffer.clear();                buffer.put(bytes);                buffer.flip();                socket.write(buffer);                buffer.clear();            } catch (Exception e) {                e.printStackTrace();            } finally {                if (socket != null) {                    socket.close();                }            }        }    }    public static void main(String[] args) throws IOException {        new Client().query("localhost", 8099);    }}

以上的服务端一个线程监听两个服务,整个服务端只有一个阻塞的方法:

//选择一组键,其相应的通道已为 I/O 操作准备就绪。  

this.selector.select();  

 

当客户请求服务器的时候,那么这造成了TCP没有面向连接的假象,其实至少在传输数据的时候是连接的,只是在一次I/O请求结束之后服务器端就把连接给断开,继而继续去处理更多的请求。而在客户端,可以看到也是遇到一次请求的时候就connect服务端一次。所以TCP还是面向连接的。

     现在终于知道了为什么叫非阻塞式IO了,大概就是这个意思。

Java NIO 主要是Channel, SelectionKey, Selector 三个类之间的关系,下面的例子就是演示如果使用NIO来处理请求的

示例1:

import java.io.IOException;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class NIOServer {  /*标识数字*/ private  int flag = 0; /*缓冲区大小*/ private  int BLOCK = 4096; /*接受数据缓冲区*/ private  ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK); /*发送数据缓冲区*/ private  ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK); private  Selector selector; public NIOServer(int port) throws IOException {  // 打开服务器套接字通道  ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  // 服务器配置为非阻塞  serverSocketChannel.configureBlocking(false);  // 检索与此通道关联的服务器套接字  ServerSocket serverSocket = serverSocketChannel.socket();  // 进行服务的绑定  serverSocket.bind(new InetSocketAddress(port));  // 通过open()方法找到Selector  selector = Selector.open();  // 注册到selector,等待连接  serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  System.out.println("Server Start----8888:"); } // 监听 private void listen() throws IOException {  while (true) {   // 选择一组键,并且相应的通道已经打开   selector.select();   // 返回此选择器的已选择键集。   Set
 selectionKeys = selector.selectedKeys();   Iterator
 iterator = selectionKeys.iterator();   while (iterator.hasNext()) {      SelectionKey selectionKey = iterator.next();    iterator.remove();    handleKey(selectionKey);   }  } } // 处理请求 private void handleKey(SelectionKey selectionKey) throws IOException {  // 接受请求  ServerSocketChannel server = null;  SocketChannel client = null;  String receiveText;  String sendText;  int count=0;  // 测试此键的通道是否已准备好接受新的套接字连接。  if (selectionKey.isAcceptable()) {   // 返回为之创建此键的通道。   server = (ServerSocketChannel) selectionKey.channel();   // 接受到此通道套接字的连接。   // 此方法返回的套接字通道(如果有)将处于阻塞模式。   client = server.accept();   // 配置为非阻塞   client.configureBlocking(false);   // 注册到selector,等待连接   client.register(selector, SelectionKey.OP_READ);  } else if (selectionKey.isReadable()) {   // 返回为之创建此键的通道。   client = (SocketChannel) selectionKey.channel();   //将缓冲区清空以备下次读取   receivebuffer.clear();   //读取服务器发送来的数据到缓冲区中   count = client.read(receivebuffer);    if (count > 0) {    receiveText = new String( receivebuffer.array(),0,count);    System.out.println("服务器端接受客户端数据--:"+receiveText);    client.register(selector, SelectionKey.OP_WRITE);   }  } else if (selectionKey.isWritable()) {   //将缓冲区清空以备下次写入   sendbuffer.clear();   // 返回为之创建此键的通道。   client = (SocketChannel) selectionKey.channel();   sendText="message from server--" + flag++;   //向缓冲区中输入数据   sendbuffer.put(sendText.getBytes());    //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位   sendbuffer.flip();   //输出到通道   client.write(sendbuffer);   System.out.println("服务器端向客户端发送数据--:"+sendText);   client.register(selector, SelectionKey.OP_READ);  } } /**  * @param args  * @throws IOException  */ public static void main(String[] args) throws IOException {  // TODO Auto-generated method stub  int port = 8888;  NIOServer server = new NIOServer(port);  server.listen(); }}
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class NIOClient { /*标识数字*/ private static int flag = 0; /*缓冲区大小*/ private static int BLOCK = 4096; /*接受数据缓冲区*/ private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK); /*发送数据缓冲区*/ private static ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK); /*服务器端地址*/ private final static InetSocketAddress SERVER_ADDRESS = new InetSocketAddress(   "localhost", 1111); public static void main(String[] args) throws IOException {  // TODO Auto-generated method stub  // 打开socket通道  SocketChannel socketChannel = SocketChannel.open();  // 设置为非阻塞方式  socketChannel.configureBlocking(false);  // 打开选择器  Selector selector = Selector.open();  // 注册连接服务端socket动作  socketChannel.register(selector, SelectionKey.OP_CONNECT);  // 连接  socketChannel.connect(SERVER_ADDRESS);  // 分配缓冲区大小内存    Set
 selectionKeys;  Iterator
 iterator;  SelectionKey selectionKey;  SocketChannel client;  String receiveText;  String sendText;  int count=0;  while (true) {   //选择一组键,其相应的通道已为 I/O 操作准备就绪。   //此方法执行处于阻塞模式的选择操作。   selector.select();   //返回此选择器的已选择键集。   selectionKeys = selector.selectedKeys();   //System.out.println(selectionKeys.size());   iterator = selectionKeys.iterator();   while (iterator.hasNext()) {    selectionKey = iterator.next();    if (selectionKey.isConnectable()) {     System.out.println("client connect");     client = (SocketChannel) selectionKey.channel();     // 判断此通道上是否正在进行连接操作。     // 完成套接字通道的连接过程。     if (client.isConnectionPending()) {      client.finishConnect();      System.out.println("完成连接!");      sendbuffer.clear();      sendbuffer.put("Hello,Server".getBytes());      sendbuffer.flip();      client.write(sendbuffer);     }     client.register(selector, SelectionKey.OP_READ);    } else if (selectionKey.isReadable()) {     client = (SocketChannel) selectionKey.channel();     //将缓冲区清空以备下次读取     receivebuffer.clear();     //读取服务器发送来的数据到缓冲区中     count=client.read(receivebuffer);     if(count>0){      receiveText = new String( receivebuffer.array(),0,count);      System.out.println("客户端接受服务器端数据--:"+receiveText);      client.register(selector, SelectionKey.OP_WRITE);     }    } else if (selectionKey.isWritable()) {     sendbuffer.clear();     client = (SocketChannel) selectionKey.channel();     sendText = "message from client--" + (flag++);     sendbuffer.put(sendText.getBytes());      //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位     sendbuffer.flip();     client.write(sendbuffer);     System.out.println("客户端向服务器端发送数据--:"+sendText);     client.register(selector, SelectionKey.OP_READ);    }   }   selectionKeys.clear();  } }}

示例2:

/** *  */package dongzi.nio.exercise.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;/** * @author kyle *  */public class SelectSockets {    private static final int PORT_NUMBER = 1234;    /**     * @param args     */    public static void main(String[] args) {        new SelectSockets().go(args);    }    private void go(String[] args) {        int port = PORT_NUMBER;        if (args.length > 0) {            try {                port = Integer.parseInt(args[0]);            } catch (Exception e) {            }        }        System.out.println("Listening port: " + PORT_NUMBER);        try {            Selector selector = Selector.open();            startServer(port, selector);            while (true) {                int n = selector.select();                if (n == 0) {                    continue;                }                Iterator it = selector.selectedKeys().iterator();                while (it.hasNext()) {                    SelectionKey key = (SelectionKey) it.next();                    if (key.isAcceptable()) {                        ServerSocketChannel server = (ServerSocketChannel) key                                .channel();                        SocketChannel channel = server.accept();                        registerChannel(selector, channel, SelectionKey.OP_READ);                        sayHello(channel);                    }                    if (key.isReadable()) {                        readDataFromChannel(key);                    }                }                it.remove();            }        } catch (IOException e) {            e.printStackTrace();        }    }    private ByteBuffer buffer = ByteBuffer.allocate(1024);    private void readDataFromChannel(SelectionKey key) throws IOException {        int count = 0;        SocketChannel channel = (SocketChannel) key.channel();        buffer.clear();        while ((count = channel.read(buffer)) > 0) {            buffer.flip();            while (buffer.hasRemaining()) {                System.out.println(buffer.get());            }            buffer.clear();        }        if (count < 0) {            channel.close();        }    }    private void sayHello(SocketChannel channel) throws IOException {        if (channel == null) {            return;        }        buffer.clear();        ByteBuffer buffer = ByteBuffer.wrap("Hi, there \r\n".getBytes());        buffer.flip();        channel.write(buffer);    }    private void registerChannel(Selector selector, SocketChannel channel,            int opRead) throws IOException {        if (channel == null) {            return;        }        channel.configureBlocking(false);        channel.register(selector, opRead);    }    private void startServer(int port, Selector selector) throws IOException,            ClosedChannelException {        ServerSocketChannel serverChannel = ServerSocketChannel.open();        serverChannel.configureBlocking(false);        ServerSocket serverSocket = serverChannel.socket();        serverSocket.bind(new InetSocketAddress(port));        serverChannel.register(selector, SelectionKey.OP_ACCEPT);    }}

转载于:https://my.oschina.net/MrMichael/blog/393909

你可能感兴趣的文章
讲讲吸顶效果与react-sticky
查看>>
c++面向对象的一些问题1 0
查看>>
直播视频流技术名词
查看>>
网易跟贴这么火,背后的某个力量不可忽视
查看>>
企业级java springboot b2bc商城系统开源源码二次开发-hystrix参数详解(八)
查看>>
java B2B2C 多租户电子商城系统- 整合企业架构的技术点
查看>>
IOC —— AOP
查看>>
比特币现金将出新招,推动比特币现金使用
查看>>
数据库的这些性能优化,你做了吗?
查看>>
某大型网站迁移总结(完结)
查看>>
mysql的innodb中事务日志(redo log)ib_logfile
查看>>
部署SSL证书后,网页内容造成页面错误提示的处理办法
查看>>
MS SQLSERVER通用存储过程分页
查看>>
60.使用Azure AI 自定义视觉服务实现物品识别Demo
查看>>
Oracle 冷备份
查看>>
jq漂亮实用的select,select选中后,显示对应内容
查看>>
C 函数sscanf()的用法
查看>>
python模块之hashlib: md5和sha算法
查看>>
linux系统安装的引导镜像制作流程分享
查看>>
解决ros建***能登录不能访问内网远程桌面的问题
查看>>