java一共支持三种网络编程模型/IO模式:BIO、NIO、AIO
java BIO:同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务端就需要启动一个线程进行处理,如果这个连接不做任何事情就会造成不必要的线程开销
java NIO:同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮训到连接有I/O请求就进行处理
java AIO(NIO2):异步非阻塞,AIO引入异步通道的鲱鲶,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般使用于连接数较多且连接时间较长的应用
BIO、NIO、AIO适用场景
- BIO方式适用于连接数目较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解
- NIO方式适用于连接数较多且连接时间短的架构,比如聊天服务器,弹幕服务器,服务器之间的通讯,编程比较复杂,JDK1.4之后支持
- AIO方式适用于连接数目较多且连接时间比较长的架构,比如相册服务器,充分调用OS参与并发操作,编程复杂,JDK1.7之后支持
Java BIO
基本介绍
- Java BIO 就是传统的 java io 编程,其相关的类和接口在 java.io
- BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)
- BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,程序简单易理解
- 其实就是最普通的javaIO流
Java BIO 工作机制
1、服务器端启动一个 ServerSocket
2、客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每个客户 建立一个线程与之通讯
3、客户端发出请求后, 先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝
4、如果有响应,客户端线程会等待请求结束后,在继续执行
Java BIO 应用实例
- 使用 BIO 模型编写一个服务器端,监听 6666 端口,当有客户端连接时,就启动一个线程与之通讯。
- 要求使用线程池机制改善,可以连接多个客户端.
- 服务器端可以接收客户端发送的数据(telnet 方式即可)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class BIOServer { public static void main(String[] args) throws Exception { ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); ServerSocket serverSocket = new ServerSocket(6666); System.out.println("服务器启动"); while (true) { System.out.println(" 线 程 信 息 id =" + Thread.currentThread().getId() + " 名 字 =" + Thread.currentThread().getName()); System.out.println("等待连接...."); final Socket socket = serverSocket.accept(); System.out.println("连接到一个客户端"); newCachedThreadPool.execute(new Runnable() { public void run() { handler(socket); }}); } } public static void handler(Socket socket) { try { System.out.println(" 线 程 信 息 id =" + Thread.currentThread().getId() + " 名 字 =" + Thread.currentThread().getName()); byte[] bytes = new byte[1024]; InputStream inputStream = socket.getInputStream(); while (true) { System.out.println("线程信息id="+Thread.currentThread().getId()+"名 字="+ Thread.currentThread().getName()); System.out.println("read...."); int read = inputStream.read(bytes); if(read != -1) { System.out.println(new String(bytes, 0, read)); } else { break; } } }catch (Exception e) { e.printStackTrace(); }finally { System.out.println("关闭和 client 的连接"); try { socket.close(); }catch (Exception e) { e.printStackTrace(); } } } }
|
java NIO
基本介绍
- Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的
- NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写
- NIO 有三大核心部分:Channel( 通道) ,Buffer( 缓冲区), Selector( 选择器)
- NIO 是 是 区 面向缓冲区 ,向 或者面向 块 块 编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络
- Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
- 通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个
- HTTP2.0 使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比 HTTP1.1 大了好几个数量级
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class BasicBuffer { public static void main(String[] args) { IntBuffer intBuffer = IntBuffer.allocate(5); for(int i = 0; i < intBuffer.capacity(); i++) { intBuffer.put( i * 2); } intBuffer.flip(); while (intBuffer.hasRemaining()) { System.out.println(intBuffer.get()); } }
}
|
NIO 和 BIO 的比较
- BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多
- BIO 是阻塞的,NIO 则是非阻塞的
- BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道
缓冲区(Buffer)
在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类,旗下子类有:
ByteBuffer, 存储字节数据到缓冲区
ShortBuffer, 存储字符串数据到缓冲区
CharBuffer, 存储字符数据到缓冲区
IntBuffer, 存储整数数据到缓冲区
LongBuffer, 存储长整型数据到缓冲区
DoubleBuffer 存储小数到缓冲区
FloatBuffer, 存储小数到缓冲区
- Buffer 类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息:
| 属性 |
描述 |
| mark |
标记 |
| position |
位置,下一个要被读或写的元素的索引 |
| limit |
缓冲区的终点,position的最大值 |
| capacity |
容量,缓冲区大小 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public abstract class Buffer { | public final int capacity( ) | | public final int position( ) | | public final Buffer position (int newPosition) | | public final int limit( ) | | public final Buffer limit (int newLimit) public final Buffer mark( ) public final Buffer reset( ) | public final Buffer clear( ) | | public final Buffer flip( ) public final Buffer rewind( ) public final int remaining( ) | public final boolean hasRemaining( ) | | public abstract boolean isReadOnly( ) | public abstract boolean hasarray() | | public abstract Object rray() public abstract int rrayoffset() public abstract boolean isDirect() }
|
ByteBuffer
一个最常用的类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public abstract class ByteBuffer{ | public static ByteBuffer allocateDirect(int capacity) | | public static ByteBuffer allocate(int capacity//设置缓冲区的初始容量 ////////////////////////////////////////////////////////////////////////////////////////////////////// public static ByteBuffer wrap(byte[] array) public static ByteBuffer wrap(byte[] array,int offset, int length) | public abstract byte get( ) | | public abstract byte get (int index); | | public abstract ByteBuffer put (byte b): | | public abstract ByteBuffer put (int index, byte b); }
|
通道(Channel)
- NIO 的通道类似于流,但有些区别如下
- 通道可以同时进行读写,而流只能读或者只能写
- 通道可以实现异步读写数据
- 通道可以从缓冲读数据,也可以写数据到缓冲区
- BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的,可以读操作,也可以写操作。
- Channel 在 NIO 中是一个接口
public interface Channel extends Closeable{}
- 常 用 的 Channel 类 有 : FileChannel 、 DatagramChannel 、 ServerSocketChannel 和 SocketChannel 。【ServerSocketChanne 类似 ServerSocket , SocketChannel 类似 Socket】
- FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和SocketChannel 用于 TCP 的数据读写。
FileChannel
FileChannel 主要用来对本地文件进行 IO 操作,常见的方法有
1 2 3 4
| public int read(ByteBuffer dst) public int write(ByteBuffer src) public long transferFrom(ReadableByteChannel src, long position, long count) public long transferTo(long position, long count, WritableByteChannel target)
|
示例1:将数据写入本地文件 (文件不存在就创建)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class NIOFileChannel01 { public static void main(String[] args) throws Exception{ String str = "hello,world"; FileOutputStream fileOutputStream = new FileOutputStream("d:\\f01.txt"); FileChannel fileChannel = fileOutputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.put(str.getBytes()); byteBuffer.flip(); fileChannel.write(byteBuffer); fileOutputStream.close(); } }
|
示例2:从本地文件中读取数据(文件不存在就报异常)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class NIOFileChannel02 { public static void main(String[] args) throws Exception { File file = new File("d:\\f01.txt"); FileInputStream fileInputStream = new FileInputStream(file); FileChannel fileChannel = fileInputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length()); fileChannel.read(byteBuffer); System.out.println(new String(byteBuffer.array())); fileInputStream.close(); } }
|
示例3:文件内容复制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class NIOFileChannel03 { public static void main(String[] args) throws Exception { FileInputStream fileInputStream = new FileInputStream("1.txt"); FileChannel fileChannel01 = fileInputStream.getChannel(); FileOutputStream fileOutputStream = new FileOutputStream("2.txt"); FileChannel fileChannel02 = fileOutputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(512); while (true) {
byteBuffer.clear(); int read = fileChannel01.read(byteBuffer); System.out.println("read =" + read); if(read == -1) { break; } byteBuffer.flip(); fileChannel02.write(byteBuffer); } fileInputStream.close(); fileOutputStream.close(); } }
|
示例4:使用 transferFrom复制文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class NIOFileChannel04 { public static void main(String[] args) throws Exception { FileInputStream fileInputStream = new FileInputStream("d:\\a.jpg"); FileOutputStream fileOutputStream = new FileOutputStream("d:\\a2.jpg"); FileChannel sourceCh = fileInputStream.getChannel(); FileChannel destCh = fileOutputStream.getChannel(); destCh.transferFrom(sourceCh,0,sourceCh.size()); sourceCh.close(); destCh.close(); fileInputStream.close(); fileOutputStream.close(); } }
|
Buffer 和 Channel 的注意事项和细节
ByteBuffer 支持类型化的 put 和 get, put 放入的是什么数据类型,get 就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class NIOByteBufferPutGet { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(64); buffer.putInt(100); buffer.putLong(9); buffer.putChar('尚'); buffer.putShort((short) 4); buffer.flip(); System.out.println(); System.out.println(buffer.getInt()); System.out.println(buffer.getLong()); System.out.println(buffer.getChar()); System.out.println(buffer.getShort()); } }
|
可以将一个普通 Buffer 转成只读 Buffer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class ReadOnlyBuffer { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(64); for(int i = 0; i < 64; i++) { buffer.put((byte)i); } buffer.flip(); ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); System.out.println(readOnlyBuffer.getClass()); while (readOnlyBuffer.hasRemaining()) { System.out.println(readOnlyBuffer.get()); } } }
|
NIO 还提供了 MappedByteBuffer, 可以让文件直接在内存(堆外的内存)中进行修改, 而如何同步到文件由 NIO 来完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public class MappedByteBufferTest { public static void main(String[] args) throws Exception { RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw"); FileChannel channel = randomAccessFile.getChannel();
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5); mappedByteBuffer.put(0, (byte) 'H'); mappedByteBuffer.put(3, (byte) '9'); mappedByteBuffer.put(5, (byte) 'Y'); randomAccessFile.close(); System.out.println("修改成功~~"); } }
|
NIO 还支持 通过多个 Buffer (即 Buffer 数组) 完成读写操作,即 Scattering 和 Gathering
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
public class ScatteringAndGatheringTest { public static void main(String[] args) throws Exception { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress(7000); ByteBuffer[] byteBuffers = new ByteBuffer[2]; byteBuffers[0] = ByteBuffer.allocate(5); byteBuffers[1] = ByteBuffer.allocate(3); SocketChannel socketChannel = serverSocketChannel.accept(); int messageLength = 8; while (true) { int byteRead = 0; while (byteRead < messageLength ) { long l = socketChannel.read(byteBuffers); byteRead += l; System.out.println("byteRead=" + byteRead); Arrays.asList(byteBuffers).stream().map(buffer -> "postion=" + buffer.position() + ", limit=" + buffer.limit()).forEach(System.out::println); } Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip()); long byteWirte = 0; while (byteWirte < messageLength) { long l = socketChannel.write(byteBuffers); byteWirte += l; } Arrays.asList(byteBuffers).forEach(buffer-> { buffer.clear(); }); System.out.println("byteRead:=" + byteRead + " byteWrite=" + byteWirte + ", messagelength" + messageLength); } } }
|
Selector选择器
- Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到 Selector(选择器)
- Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个
Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管
理多个通道,也就是管理多个连接和请求
- 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都
创建一个线程,不用去维护多个线程
- 避免了多线程之间的上下文切换导致的开销
1 2 3 4 5
| public static Selector open(); public int select(long timeout); public Set<SelectionKey> selectedKeys();
|
- selector 相关方法说明
1 2 3 4
| selector.select() selector.select(1000); selector.wakeup(); selector.selectNow();
|
非阻塞网络编程原理
- 当客户端连接时,会通过 ServerSocketChannel 得到 SocketChannel
- Selector 进行监听 select 方法, 返回有事件发生的通道的个数
- 将 socketChannel 注册到 Selector 上, register(Selector sel, int ops), 一个 selector 上可以注册多个 SocketChannel
- 注册后返回一个 SelectionKey, 会和该 Selector 关联(集合)
- 进一步得到各个 SelectionKey (有事件发生)
- 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
- 可以通过 得到的 channel , 完成业务处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| package nio;
import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;
public class NIOServer { public static void main(String[] args) throws Exception { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
serverSocketChannel.socket().bind(new InetSocketAddress(21335));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
if(selector.select(1000) == 0 ) { System.out.println("服务器等待连接。。。。。无连接"); continue; }
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator(); while(iterator.hasNext()) { SelectionKey key = iterator.next();
if(key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("有客户端连接成功"+socketChannel.hashCode()); }
if(key.isReadable()) { SocketChannel socketChannel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024); socketChannel.read(buffer);
System.out.println("来自客户端:--"+new String(buffer.array()));
}
iterator.remove(); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package nio;
import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel;
public class NIOClient { public static void main(String[] args) throws Exception{
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
InetSocketAddress address = new InetSocketAddress("127.0.0.1",21335); boolean b = socketChannel.connect(address); socketChannel.finishConnect();
String msg = "hello connecting"; ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
socketChannel.write(buffer); System.out.println("是否连接:"+b); System.in.read(); } }
|
SelectionKey
SelectionKey,表示 Selector 和网络通道的注册关系
1 2 3 4 5 6 7 8 9
| int OP_ACCEPT: int OP_CONNECT: int OP_READ: int OP_WRITE:
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
|
SelectionKey 相关方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| publicabstract class SelectionKey{ public abstract Selector selector(); public abstract Selectable(Channel channel); public final Object attachment(); public abstract SelectionKey interestOps(int ops): public final boolean isAcceptable(); public final boolean isReadable(); public final boolean isWritable(); }
|
ServerSocketChannel
相关方法
1 2 3 4 5 6 7 8
| public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel{ public static ServerSocketChannel open() public final ServerSocketChannel bind(SocketAddress local) public final SelectableChannel configureBlocking(boolean block) public Socket( Channel accept) public final SelectionKey register(Selector sel, int ops) }
|
SocketChannel
- SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数
据读到缓冲区
相关方法
1 2 3 4 5 6 7 8 9 10 11
| public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel,NetworkChannel{ public static SocketChannel open(); public final SelectableChannel configureBlocking(boolean block); public boolean connect(SocketAddress remote); public boolean finishConnect() public int write(ByteBuffer src); public int read(ByteBuffer dst); public final SelectionKey register(Selector sel, int ops, Object tt); public final void close(); }
|
NIO 网络编程应用实例-群聊系统
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| package nio;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;
public class ChatServer {
private final int PORT = 20857; private ServerSocketChannel serverSocketChannel; private Selector selector; private final int BUFFERSIZE = 1024;
public ChatServer() {
try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) {
} }
public void listen() {
try {
while (true) { int count = selector.select(2000); if (count > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println(socketChannel.getRemoteAddress() + "上线");
}
if (key.isReadable()) { readMsg(key); } iterator.remove(); } } }
} catch (Exception e) { }
}
public void readMsg(SelectionKey key) { SocketChannel channel = null; try { channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(BUFFERSIZE); int read = channel.read(buffer); if (read > 0) { String msg = new String(buffer.array()); System.out.println("转发消息"); sendMsg(channel, msg); }
} catch (Exception e) { try { System.out.println(channel.getRemoteAddress() + "客户端下线"); key.cancel(); channel.close(); } catch (IOException e1) {
e1.printStackTrace(); } } }
public void sendMsg(SocketChannel self, String msg) {
Set<SelectionKey> keys = selector.keys(); for (SelectionKey selectionKey : keys) { Channel channel = selectionKey.channel(); if ((channel instanceof SocketChannel) && (channel != self)) { SocketChannel targetChannel= (SocketChannel)channel; ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); try { targetChannel.write(buffer); } catch (IOException e) { System.out.println("消息转发失败"); e.printStackTrace(); } }
}
} public static void main(String[] args) { ChatServer server = new ChatServer(); server.listen(); } }
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| package nio;
import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner;
public class ChatClient {
private String HOST = "127.0.0.1"; private final int PORT = 20857; private Selector selector; private SocketChannel socketChannel; private String username;
public ChatClient() throws Exception { selector = Selector.open(); socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); username = socketChannel.getLocalAddress().toString().substring(1); System.out.println("准备连接。。"); }
public void sendMsg(String msg) { msg = username + ":" + msg; try { ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); socketChannel.write(buffer);
} catch (Exception e) { } }
public void readMsg() { try { int i = selector.select(); if (i > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); String msg = new String(buffer.array()); System.out.println(msg.trim()); } iterator.remove(); } } } catch (Exception e) { } }
public static void main(String[] args) throws Exception { ChatClient client = new ChatClient(); new Thread() { @Override public void run() { while(true) { client.readMsg(); try {
Thread.sleep(500); } catch (InterruptedException e) {
e.printStackTrace(); } } } }.start();
Scanner scanner = new Scanner(System.in);
while(scanner.hasNextLine()) { String msg = scanner.next(); client.sendMsg(msg); } } }
|