0%

IO

java一共支持三种网络编程模型/IO模式:BIO、NIO、AIO

  • java BIO:同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务端就需要启动一个线程进行处理,如果这个连接不做任何事情就会造成不必要的线程开销

  • java NIO:同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮训到连接有I/O请求就进行处理

  • java AIO(NIO2):异步非阻塞,AIO引入异步通道的鲱鲶,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般使用于连接数较多且连接时间较长的应用

BIO、NIO、AIO适用场景

  • BIO方式适用于连接数目较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解
  • NIO方式适用于连接数较多且连接时间短的架构,比如聊天服务器,弹幕服务器,服务器之间的通讯,编程比较复杂,JDK1.4之后支持
  • AIO方式适用于连接数目较多且连接时间比较长的架构,比如相册服务器,充分调用OS参与并发操作,编程复杂,JDK1.7之后支持

Java BIO

基本介绍

  • Java BIO 就是传统的 java io 编程,其相关的类和接口在 java.io
  • BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)
  • BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,程序简单易理解
  • 其实就是最普通的javaIO流

Java BIO 工作机制

1、服务器端启动一个 ServerSocket

2、客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每个客户 建立一个线程与之通讯

3、客户端发出请求后, 先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝

4、如果有响应,客户端线程会等待请求结束后,在继续执行

Java BIO 应用实例

  • 使用 BIO 模型编写一个服务器端,监听 6666 端口,当有客户端连接时,就启动一个线程与之通讯。
  • 要求使用线程池机制改善,可以连接多个客户端.
  • 服务器端可以接收客户端发送的数据(telnet 方式即可)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class BIOServer {
public static void main(String[] args) throws Exception {
//线程池机制//思路
//1. 创建一个线程池
//2. 如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
//创建 ServerSocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动");
while (true) {
System.out.println(" 线 程 信 息 id =" + Thread.currentThread().getId() + " 名 字 =" +
Thread.currentThread().getName());
//监听,等待客户端连接
System.out.println("等待连接....");
final Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
//就创建一个线程,与之通讯(单独写一个方法)
newCachedThreadPool.execute(new Runnable() {
public void run() {
//可以和客户端通讯
handler(socket);
}});
}
}
//编写一个 handler 方法,和客户端通讯
public static void handler(Socket socket) {
try {
System.out.println(" 线 程 信 息 id =" + Thread.currentThread().getId() + " 名 字 =" +
Thread.currentThread().getName());
byte[] bytes = new byte[1024];
//通过 socket 获取输入流
InputStream inputStream = socket.getInputStream();
//循环的读取客户端发送的数据
while (true) {
System.out.println("线程信息id="+Thread.currentThread().getId()+"名 字="+ Thread.currentThread().getName());
System.out.println("read....");
int read = inputStream.read(bytes);
if(read != -1) {
System.out.println(new String(bytes, 0, read)); //输出客户端发送的数据
} else {
break;
}
}
}catch (Exception e) {
e.printStackTrace();
}finally {
System.out.println("关闭和 client 的连接");
try {
socket.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
}

java NIO

基本介绍

  • Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞
  • NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写
  • NIO 有三大核心部分:Channel( 通道) ,Buffer( 缓冲区), Selector( 选择器)
  • NIO 是 是 区 面向缓冲区 ,向 或者面向 块 块 编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络
  • Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
  • 通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个
  • HTTP2.0 使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比 HTTP1.1 大了好几个数量级
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class BasicBuffer {
public static void main(String[] args) {
//创建一个 Buffer, 大小为 5, 即可以存放 5 个 int
IntBuffer intBuffer = IntBuffer.allocate(5);
//向 buffer 存放数据
for(int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put( i * 2);
}
//将 buffer 转换,读写切换(!!!)
intBuffer.flip();
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
}

}

NIO 和 BIO 的比较

  • BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多
  • BIO 是阻塞的,NIO 则是非阻塞的
  • BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道

缓冲区(Buffer)

​ 在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类,旗下子类有:

ByteBuffer, 存储字节数据到缓冲区
ShortBuffer, 存储字符串数据到缓冲区
CharBuffer, 存储字符数据到缓冲区
IntBuffer, 存储整数数据到缓冲区
LongBuffer, 存储长整型数据到缓冲区
DoubleBuffer 存储小数到缓冲区
FloatBuffer, 存储小数到缓冲区

  • Buffer 类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息:
属性 描述
mark 标记
position 位置,下一个要被读或写的元素的索引
limit 缓冲区的终点,position的最大值
capacity 容量,缓冲区大小
  • Buffer中的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public abstract class Buffer {
//JIDK1.4时,引入的api
////////////////////////////////////////////////////////////////////////////////////////////////
| public final int capacity( )//返回此缓冲区的容量
|
| public final int position( )//返回此缓冲区的位置
|
| public final Buffer position (int newPosition)//设置此缓冲区的位置
|
| public final int limit( )//返回此缓冲区的限制
|
| public final Buffer limit (int newLimit)//设置此缓冲区的限制
///////////////////////////////////////////////////////////////////////////////////////////////////
public final Buffer mark( )//在此缓冲区的位置设置标记

public final Buffer reset( )//将此缓冲区的位置重置为以前标记的位置

//////////////////////////////////////////////////////////////////////////////////////////////////
| public final Buffer clear( )//清除此缓冲区,即将各个标记恢复到初始状态,但是数据并没有真正擦除,
|
| public final Buffer flip( )//反转此缓冲区
/////////////////////////////////////////////////////////////////////////////////////////////////

public final Buffer rewind( )//重绕此缓冲区

public final int remaining( )//返回当前位置与限制之间的元素数

//////////////////////////////////////////////////////////////////////////////////////////////////
| public final boolean hasRemaining( )//告知在当前位置和限制之间是否有元素
|
| public abstract boolean isReadOnly( )//告知此缓冲区是否为只读缓冲区
//////////////////////////////////////////////////////////////////////////////////////////////////

//JDK1.6时引入的api
/////////////////////////////////////////////////////////////////////////////////////////////////
| public abstract boolean hasarray()//告知此缓冲区是否具有可访问的底层实现数组
|
| public abstract Object rray()//返回此缓冲区的底层实现数组
////////////////////////////////////////////////////////////////////////////////////////////////

public abstract int rrayoffset()//返回此缓冲区的底层实现数组中第一一个缓冲区元素的偏移量

public abstract boolean isDirect()//告知此缓冲区是否为直接缓冲区
}

ByteBuffer

一个最常用的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public abstract class ByteBuffer{
//缓冲区创建相关api
//////////////////////////////////////////////////////////////////////////////////////////////////////
| public static ByteBuffer allocateDirect(int capacity)//创建直接缓冲区
|
| public static ByteBuffer allocate(int capacity//设置缓冲区的初始容量
//////////////////////////////////////////////////////////////////////////////////////////////////////
public static ByteBuffer wrap(byte[] array)//把一个数组放到缓冲区中使用

//构造初始化位置offset和上界length的缓冲区
public static ByteBuffer wrap(byte[] array,int offset, int length)

//缓存区存取相关API
//////////////////////////////////////////////////////////////////////////////////////////////////////
| public abstract byte get( )//从当前位置position上get, get之后, position会 自动+1
|
| public abstract byte get (int index);//从绝对位置get
|
| public abstract ByteBuffer put (byte b)://从当前位置上添加,put之后, position会 自动+1
|
| public abstract ByteBuffer put (int index, byte b);//从绝对位置上put
//////////////////////////////////////////////////////////////////////////////////////////////////////
}

通道(Channel)

  1. NIO 的通道类似于流,但有些区别如下
  • 通道可以同时进行读写,而流只能读或者只能写
  • 通道可以实现异步读写数据
  • 通道可以从缓冲读数据,也可以写数据到缓冲区
  1. BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的,可以读操作,也可以写操作。
  2. Channel 在 NIO 中是一个接口 public interface Channel extends Closeable{}
  3. 常 用 的 Channel 类 有 : FileChannel 、 DatagramChannel 、 ServerSocketChannel 和 SocketChannel 。【ServerSocketChanne 类似 ServerSocket , SocketChannel 类似 Socket】
  4. FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和SocketChannel 用于 TCP 的数据读写。

FileChannel

FileChannel 主要用来对本地文件进行 IO 操作,常见的方法有

1
2
3
4
public int read(ByteBuffer dst)   //从通道读取数据并放到缓冲区中
public int write(ByteBuffer src) //把缓冲区的数据写到通道中
public long transferFrom(ReadableByteChannel src, long position, long count)//从目标通道中复制数据到当前通道
public long transferTo(long position, long count, WritableByteChannel target)//把数据从当前通道复制给目标通道

示例1:将数据写入本地文件 (文件不存在就创建)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class NIOFileChannel01 {
public static void main(String[] args) throws Exception{
String str = "hello,world";
//创建一个输出流->channel
FileOutputStream fileOutputStream = new FileOutputStream("d:\\f01.txt");
//通过 fileOutputStream 获取 对应的 FileChannel
//这个 fileChannel 真实 类型是 FileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
//创建一个缓冲区 ByteBuffer大小1024
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将 str 放入 byteBuffer
byteBuffer.put(str.getBytes());

//对 byteBuffer 进行 flip 切换操作
byteBuffer.flip();
//将 byteBuffer 数据写入到 fileChannel
fileChannel.write(byteBuffer);
fileOutputStream.close();
}
}

示例2:从本地文件中读取数据(文件不存在就报异常)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class NIOFileChannel02 {
public static void main(String[] args) throws Exception {
//创建文件的输入流
File file = new File("d:\\f01.txt");
FileInputStream fileInputStream = new FileInputStream(file);

//通过 fileInputStream 获取对应的 FileChannel -> 实际类型 FileChannelImpl
FileChannel fileChannel = fileInputStream.getChannel();

//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());

//将 通道的数据读入到 Buffer
fileChannel.read(byteBuffer);

//将 byteBuffer 的 字节数据 转成 String
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();
}
}

示例3:文件内容复制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class NIOFileChannel03 {
public static void main(String[] args) throws Exception {
FileInputStream fileInputStream = new FileInputStream("1.txt");
FileChannel fileChannel01 = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("2.txt");
FileChannel fileChannel02 = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
while (true) { //循环读取
/*重置缓冲区属性
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
*/
byteBuffer.clear(); //清空 buffer 如果不清空则出现意想不到的错误

int read = fileChannel01.read(byteBuffer);
System.out.println("read =" + read);
if(read == -1) { //表示读完
break;
}
//将 buffer 中的数据写入到 fileChannel02 -- 2.txt 切换操作
byteBuffer.flip();
fileChannel02.write(byteBuffer);
}
//关闭相关的流
fileInputStream.close();
fileOutputStream.close();
}
}

示例4:使用 transferFrom复制文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class NIOFileChannel04 {
public static void main(String[] args) throws Exception {
//创建相关流
FileInputStream fileInputStream = new FileInputStream("d:\\a.jpg");
FileOutputStream fileOutputStream = new FileOutputStream("d:\\a2.jpg");

//获取各个流对应的 filechannel
FileChannel sourceCh = fileInputStream.getChannel();
FileChannel destCh = fileOutputStream.getChannel();

//使用 transferForm 完成拷贝
destCh.transferFrom(sourceCh,0,sourceCh.size());

//关闭相关通道和流
sourceCh.close();
destCh.close();
fileInputStream.close();
fileOutputStream.close();
}
}

Buffer 和 Channel 的注意事项和细节

  1. ByteBuffer 支持类型化的 put 和 get, put 放入的是什么数据类型,get 就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class NIOByteBufferPutGet {
    public static void main(String[] args) {
    //创建一个 Buffer
    ByteBuffer buffer = ByteBuffer.allocate(64);
    //类型化方式放入数据
    buffer.putInt(100);
    buffer.putLong(9);
    buffer.putChar('尚');
    buffer.putShort((short) 4);
    //取出
    buffer.flip();
    System.out.println();
    System.out.println(buffer.getInt());
    System.out.println(buffer.getLong());
    System.out.println(buffer.getChar());
    System.out.println(buffer.getShort());
    }
    }
  2. 可以将一个普通 Buffer 转成只读 Buffer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class ReadOnlyBuffer {
    public static void main(String[] args) {
    //创建一个 buffer
    ByteBuffer buffer = ByteBuffer.allocate(64);
    for(int i = 0; i < 64; i++) {
    buffer.put((byte)i);
    }
    //读取
    buffer.flip();//得到一个只读的 Buffer
    ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
    System.out.println(readOnlyBuffer.getClass());
    //读取
    while (readOnlyBuffer.hasRemaining()) {
    System.out.println(readOnlyBuffer.get());
    }
    //readOnlyBuffer.put((byte)100); //ReadOnlyBufferException
    }
    }
  3. NIO 还提供了 MappedByteBuffer, 可以让文件直接在内存(堆外的内存)中进行修改, 而如何同步到文件由 NIO 来完成

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    /*
    说明
    1. MappedByteBuffer 可让文件直接在内存(堆外内存)修改, 操作系统不需要拷贝一次
    */
    public class MappedByteBufferTest {
    public static void main(String[] args) throws Exception {
    RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
    //获取对应的通道
    FileChannel channel = randomAccessFile.getChannel();
    /**
    * 参数 1: FileChannel.MapMode.READ_WRITE 使用的读写模式
    * 参数 2: 0 : 可以直接修改的起始位置
    * 参数 3: 5: 是映射到内存的大小(不是索引位置) ,即将 1.txt 的多少个字节映射到内存
    * 可以直接修改的范围就是 0-5
    * 实际类型 DirectByteBuffer
    */
    MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
    mappedByteBuffer.put(0, (byte) 'H');
    mappedByteBuffer.put(3, (byte) '9');
    mappedByteBuffer.put(5, (byte) 'Y');//IndexOutOfBoundsException
    randomAccessFile.close();
    System.out.println("修改成功~~");
    }
    }
  4. NIO 还支持 通过多个 Buffer (即 Buffer 数组) 完成读写操作,即 Scattering 和 Gathering

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    /**
    * Scattering:将数据写入到 buffer 时,可以采用 buffer 数组,依次写入 [分散]
    * Gathering: 从 buffer 读取数据时,可以采用 buffer 数组,依次读
    */
    public class ScatteringAndGatheringTest {
    public static void main(String[] args) throws Exception {
    //使用 ServerSocketChannel 和 SocketChannel 网络
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
    //绑定端口到 socket ,并启动serverSocketChannel.socket().bind(inetSocketAddress);
    //创建 buffer 数组
    ByteBuffer[] byteBuffers = new ByteBuffer[2];
    byteBuffers[0] = ByteBuffer.allocate(5);
    byteBuffers[1] = ByteBuffer.allocate(3);
    //等客户端连接(telnet)
    SocketChannel socketChannel = serverSocketChannel.accept();
    int messageLength = 8; //假定从客户端接收 8 个字节
    //循环的读取
    while (true) {
    int byteRead = 0;
    while (byteRead < messageLength ) {
    long l = socketChannel.read(byteBuffers);
    byteRead += l; //累计读取的字节数
    System.out.println("byteRead=" + byteRead);
    //使用流打印, 看看当前的这个 buffer 的 position 和 limit
    Arrays.asList(byteBuffers).stream().map(buffer -> "postion=" + buffer.position() + ", limit=" + buffer.limit()).forEach(System.out::println);
    }
    //将所有的 buffer 进行 flip
    Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip());//将数据读出显示到客户端
    long byteWirte = 0;
    while (byteWirte < messageLength) {
    long l = socketChannel.write(byteBuffers); //
    byteWirte += l;
    }
    //将所有的 buffer 进行 clear
    Arrays.asList(byteBuffers).forEach(buffer-> {
    buffer.clear();
    });
    System.out.println("byteRead:=" + byteRead + " byteWrite=" + byteWirte + ", messagelength" +
    messageLength);
    }
    }
    }

Selector选择器

  1. Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到 Selector(选择器)
  2. Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个
    Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管
    理多个通道,也就是管理多个连接和请求
  3. 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都
    创建一个线程,不用去维护多个线程
  4. 避免了多线程之间的上下文切换导致的开销
1
2
3
4
5
public static Selector open();//得到一个 选择器对象

public int select(long timeout);//监控所有注册的通道,当其中有10操作可以进行时,将对应的SelectionKey加入到内部集合中并返回,参数用来设置超时时间

public Set<SelectionKey> selectedKeys();//从内部集合中得到所有的SelectionKey
  1. selector 相关方法说明
1
2
3
4
selector.select()		//阻塞
selector.select(1000); //阻塞 1000 毫秒,在 1000 毫秒后返回
selector.wakeup(); //唤醒 selector
selector.selectNow(); //不阻塞,立马返还NIO

非阻塞网络编程原理

  1. 当客户端连接时,会通过 ServerSocketChannel 得到 SocketChannel
  2. Selector 进行监听 select 方法, 返回有事件发生的通道的个数
  3. 将 socketChannel 注册到 Selector 上, register(Selector sel, int ops), 一个 selector 上可以注册多个 SocketChannel
  4. 注册后返回一个 SelectionKey, 会和该 Selector 关联(集合)
  5. 进一步得到各个 SelectionKey (有事件发生)
  6. 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
  7. 可以通过 得到的 channel , 完成业务处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
* @author Administrator
* @date 2019-12-19 13:19
* 服务端:
* 1、创建服务器端serverSocketChannel,绑定端口号,设置为非阻塞
* 2、创建selector
* 3、将服务端serverSocketChannel注册到selector上,并指定此socketchannel的操作事件(SelectionKey.OP_ACCEPT 等待客户端连接)
* 4、循环等待连接
* 5、在循环中,通过 selector.select(1000) 的容量判断是否有事件等待操作,如果为0则是空闲,继续循环,如果不为0则说明有事件
* 6、迭代selectedKeys获取所有事件,并一一判断事件类型
* 7、key.isAcceptable() 如果是连接事件则通过serverSocketChannel.accept()来接受连接(accept()方法是阻塞方法,会一直等待连接,但是程序通过前面一系列逻辑之后来到这时必定有客户端请求连接,所以并不会阻塞)
* 7.1、通过 serverSocketChannel.accept()返回一个和客户端通信的socketChannel,并将其设置为非阻塞,注册到selector上,指定操作事件为读取其中的内容
* 8、key.isReadable() 如果是读事件,则通过key反向获取socketChannel( (SocketChannel)key.channel();) 并将其读入到缓冲区,输出
* 9、操作完selectedKey之后手动将selectedKeys从selectedKeys中清除,清空事件
*/
public class NIOServer {
public static void main(String[] args) throws Exception {
//创建serversocketchannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

//得到一个selector对象
Selector selector = Selector.open();

//绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(21335));

//设置为非阻塞
serverSocketChannel.configureBlocking(false);

//serversocketchannel注册到selector上 指定事件为等待连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

//循环监听
while(true) {

//等待1s如果没有连接就继续循环监听
if(selector.select(1000) == 0 ) {
System.out.println("服务器等待连接。。。。。无连接");
continue;
}

//返回大于0,集合里面是有事件发生的连接
Set<SelectionKey> selectedKeys = selector.selectedKeys();

//迭代器遍历

Iterator<SelectionKey> iterator = selectedKeys.iterator();
while(iterator.hasNext()) {
//获取selectionkey
SelectionKey key = iterator.next();

//根据key的事件做相应的处理
if(key.isAcceptable()) { //有连接事件

//给客户端一个socketchannel
SocketChannel socketChannel = serverSocketChannel.accept();

socketChannel.configureBlocking(false);
//将这个socketchannel注册到selector上,绑定为读事件,并制定缓冲区大小1024
socketChannel.register(selector, SelectionKey.OP_READ);

System.out.println("有客户端连接成功"+socketChannel.hashCode());
}

if(key.isReadable()) { //有读取事件
//通过key反向获取channel
SocketChannel socketChannel = (SocketChannel)key.channel();

//ByteBuffer buffer =(ByteBuffer) key.attachment();

ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer);

System.out.println("来自客户端:--"+new String(buffer.array()));

}

iterator.remove();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
* @author Administrator
* @date 2019-12-19 13:50
* 客户端:
*/
public class NIOClient {
public static void main(String[] args) throws Exception{

SocketChannel socketChannel = SocketChannel.open();

socketChannel.configureBlocking(false);

InetSocketAddress address = new InetSocketAddress("127.0.0.1",21335);
boolean b = socketChannel.connect(address);
socketChannel.finishConnect();

String msg = "hello connecting";
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());//自适应缓冲区

socketChannel.write(buffer);
System.out.println("是否连接:"+b);
System.in.read();
}
}

SelectionKey

  1. SelectionKey,表示 Selector 和网络通道的注册关系

    1
    2
    3
    4
    5
    6
    7
    8
    9
    int OP_ACCEPT://有新的网络连接可以 accept,值为 16
    int OP_CONNECT://代表连接已经建立,值为 8
    int OP_READ://代表读操作,值为 1
    int OP_WRITE://代表写操作,值为 4
    //源码中:
    public static final int OP_READ = 1 << 0;
    public static final int OP_WRITE = 1 << 2;
    public static final int OP_CONNECT = 1 << 3;
    public static final int OP_ACCEPT = 1 << 4;
  2. SelectionKey 相关方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
publicabstract class SelectionKey{
public abstract Selector selector();//得到与之关联的Selector对象

public abstract Selectable(Channel channel);//得到与之关联的通道

public final Object attachment();//得到与之关联的共享数据

public abstract SelectionKey interestOps(int ops)://设置或改变监听事件

public final boolean isAcceptable();//是否可以accept

public final boolean isReadable();//是否可以读

public final boolean isWritable();//是否可以写
}

ServerSocketChannel

相关方法

1
2
3
4
5
6
7
8
public abstract class ServerSocketChannel
extends AbstractSelectableChannel implements NetworkChannel{
public static ServerSocketChannel open() //得到一个ServerSocketChannel通道
public final ServerSocketChannel bind(SocketAddress local) //设置服务器端端口号
public final SelectableChannel configureBlocking(boolean block) //设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
public Socket( Channel accept) //接受一个连接, 返回代表这个连接的通道对象
public final SelectionKey register(Selector sel, int ops) //注册一个选择器并设置监听事件
}

SocketChannel

  • SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数
    据读到缓冲区

相关方法

1
2
3
4
5
6
7
8
9
10
11
public abstract class SocketChannel extends AbstractSelectableChannel
implements ByteChannel, ScatteringByteChannel, GatheringByteChannel,NetworkChannel{
public static SocketChannel open();//得到一个SocketChannel 通道
public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,取值false 表示采用非阻塞模式
public boolean connect(SocketAddress remote);//连接服务器
public boolean finishConnect()//如果上面的方法连接失败,接下来就要通过该方法完成连接操作
public int write(ByteBuffer src);//往通道里写数据
public int read(ByteBuffer dst);//从通道里读数据
public final SelectionKey register(Selector sel, int ops, Object tt);//注册一个选择器并设置监听事件,最后一一个参数可以设置共享数据
public final void close();//关闭通道
}

NIO 网络编程应用实例-群聊系统

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package nio;
/**
* @author Administrator
* @date 2019-12-19 16:04
*
*/

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class ChatServer {

private final int PORT = 20857;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private final int BUFFERSIZE = 1024;

public ChatServer() {

try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {

}
}

public void listen() {

try {

while (true) {
int count = selector.select(2000);
if (count > 0) {

Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println(socketChannel.getRemoteAddress() + "上线");

}

if (key.isReadable()) {
// 接收消息,转发
readMsg(key);
}
iterator.remove();//操作完key之后移除
}
}
}

} catch (Exception e) {
// TODO: handle exception
}

}

public void readMsg(SelectionKey key) {
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(BUFFERSIZE);
int read = channel.read(buffer);
if (read > 0) {
String msg = new String(buffer.array());
// 转发消息
System.out.println("转发消息");
sendMsg(channel, msg);
}

} catch (Exception e) {
try {
System.out.println(channel.getRemoteAddress() + "客户端下线");
key.cancel();
channel.close();
} catch (IOException e1) {

e1.printStackTrace();
}
}
}

public void sendMsg(SocketChannel self, String msg) {

Set<SelectionKey> keys = selector.keys();
for (SelectionKey selectionKey : keys) {
Channel channel = selectionKey.channel();
if ((channel instanceof SocketChannel) && (channel != self)) {
SocketChannel targetChannel= (SocketChannel)channel;
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
try {
targetChannel.write(buffer);
} catch (IOException e) {
System.out.println("消息转发失败");
e.printStackTrace();
}
}

}

}
public static void main(String[] args) {
ChatServer server = new ChatServer();
server.listen();
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package nio;
/**
* @author Administrator
* @date 2019-12-19 16:48
*
*/

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;

public class ChatClient {

private String HOST = "127.0.0.1";
private final int PORT = 20857;
private Selector selector;
private SocketChannel socketChannel;
private String username;

public ChatClient() throws Exception {
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println("准备连接。。");
}

public void sendMsg(String msg) {
msg = username + ":" + msg;
try {
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
socketChannel.write(buffer);

} catch (Exception e) {
// TODO: handle exception
}
}

public void readMsg() {
try {
int i = selector.select();
if (i > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {

SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
String msg = new String(buffer.array());
System.out.println(msg.trim());
}
iterator.remove(); //操作完key之后移除
}
}
} catch (Exception e) {
// TODO: handle exception
}
}

public static void main(String[] args) throws Exception {
ChatClient client = new ChatClient();
new Thread() {
@Override
public void run() {
while(true) {
client.readMsg();
try {

Thread.sleep(500);
} catch (InterruptedException e) {

e.printStackTrace();
}
}
}
}.start();

Scanner scanner = new Scanner(System.in);

while(scanner.hasNextLine()) {
String msg = scanner.next();
client.sendMsg(msg);
}
}
}
赏口饭吃吧!