前置知识

上层应用与IO

上层应用不能决定IO,没有权限直接向IO设备(磁盘、网口)读写数据。

操作系统有,IO是操作系统控制的。比如kernel:内核,它有权限去操作IO设备。

操作系统内核会向上提供接口(系统函数),上层应用想要操作IO设备,直接借助操作系统内核去操作,通过系统调用去调用系统函数(内核提供的接口),达到读写IO设备的目的。

操作系统提供io模型:阻塞IO、非阻塞IO、IO多路复用、信号驱动IO、异步IO

操作系统内核将IO设备中的数据加载到类似内存的地方。然后传给上层应用中内存的地方。

image-20230822191738725

同步与异步

同步/异步 要和线程中的同步线程/异步线程要区分开,这里指的是同步IO/异步IO

  • 同步与异步(synchronous/asynchronous):同步是一种可靠的有序运行机制,当我们进行同步操作时,后续的任务是等待当前调用返回,才会进行下一步;而异步则相反,其他任务不需要等待当前调用返回,通常依靠事件、回调等机制来实现任务间次序关系

    • 同步: 数据就绪后需要**(上层应用)自己去读是同步,调用方法之后,必须要得到一个返回值**

      ​ 例如: 买火车票,一定要买到票,才能继续下一步

    • 异步: 数据就绪后系统直接读好再回调给**(上层应用)程序是异步,调用方法之后,没有返回值,但是会有回调函数**,回调函数指的是满足条件之后会自动执行的方法

      ​ 例如: 买火车票, 不一定要买到票,我可以交代售票员,当有票的话,你就帮我出张票

      image-20211223201524072

阻塞与非阻塞

  • 阻塞与非阻塞:在进行阻塞操作时,当前线程会处于阻塞状态,无法从事其他任务,只有当条件就绪才能继续,比如ServerSocket新连接建立完毕,或者数据读取、写入操作完成;而非阻塞则是不管IO操作是否结束,直接返回,相应操作在后台继续处理

    • 阻塞:(IO设备:如网口)没有数据传过来时,(操作系统内核)读会阻塞(等)直到有数据;缓冲区满时,写操作也会阻塞。如果没有达到方法的目的,就会一直停在那里(等待) , 例如: ServerSocket的accept()方法

      image-20220115164735644

    • 非阻塞: 不管方法有没有达到目的,都直接往下执行(不等待)

      image-20220115164754004

同步和异步强调的是被调用方(B–操作系统),阻塞和非阻塞强调的是调用方(A–应用程序);

同步阻塞(BIO)是什么

BIO是blocking I/O的简称,它是同步阻塞型IO,其相关的类和接口在java.io下

BIO模型简单来讲,就是服务端为每一个请求都分配一个线程进行处理,I/O操作都是基于流Stream的操作

image-20211223203117507

[例子]bio原生socket编程

  • 服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class BioServer {
// 编写一个服务端程序,基于socket
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8888);
while (true) {
Socket socket = serverSocket.accept();// 接收一个客户端连接,这个方法阻塞的
// 为了不被read影响主线程(阻塞在read,无法接收新的客户端请求)=>开启新的线程专门处理读写
new Thread(new ServerHandler(socket)).start();


}
}
static class ServerHandler implements Runnable{

private final Socket socket;

public ServerHandler(Socket socket) {
this.socket = socket;
}

public void run() {
try {
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
// inputStream.read(); // 服务端基于input流读客户端的请求,read也是阻塞的

BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream));
// 读取客户端发送的数据
while (true) { // 客户端和服务端一起发送
String line = reader.readLine();
System.out.println("服务端接收到来自客户端的数据:"+line);
// 服务端给客户端返回数据
writer.write("我是服务端,我收到您发的数据了\n");
writer.flush();
}
} catch (IOException e) {
// e.printStackTrace();
}
}
}
}
  • 客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class BioClient {
public static void main(String[] args) {
Socket socket = null;
BufferedReader reader = null;
BufferedWriter writer = null;
try {
socket = new Socket("127.0.0.1", 8888);
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
System.out.println("向服务端发送数据");
writer.write("你好,我是客户端\n"); // 这里加\n,服务端用的是readLine
writer.flush();
System.out.println("接收来自服务端的消息:"+reader.readLine());
} catch (IOException e) {
if (reader != null) {
try {
reader.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
if (writer != null) {
try {
writer.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
}
  • 启动测试

image-20220115171536021

上面的java bio是 输入对输出。输出对输入,对冲。一对一的

image-20220115171758583

  • 弊端

线程开销:客户端的并发数与后端的线程数1:1的比例,线程的创建、销毁是非常消耗系统资源的,随着并发量增大,服务端性能将显著下降,甚至会发生线程堆栈溢出等错误

线程阻塞:当连接创建后,如果该线程没有操作时,会进行阻塞操作,这样极大的浪费了服务器资源

同步非阻塞(NIO)是什么

同步非阻塞 IO

NIO之所以是同步,是因为它的accept/read/write方法的内核I/O操作都会阻塞当前线程

在Java1.4之前的I/O系统中,提供的都是面向流(字节流和字符流)的I/O系统,系统一次一个字节地处理数据,一个输入流产生一个字节的数据,一个输出流消费一个字节的数据,面向流的I/O速度非常慢

为了弥补原来I/O的不足,在Java 1.4中推出了NIO,这是一个面向块的I/O系统,系统以块(block)的方式处理数据,每一个操作在一步中产生或者消费一个数据,按块处理要比按字节处理数据快的多。

NIO,称之为New IO 或是 non-block IO (非阻塞IO),这两种说法都可以,其实称之为非阻塞IO更恰当一些 。

NIO是在访问个数特别大的时候才使用的 , 比如流行的软件或者流行的游戏中会有高并发和大量连接.

(java中上层的)NIO的三大核心组件:Buffer(缓冲区)、Channel(通道)、Selector(选择器/多路复用器)

同步阻塞和同步非阻塞这里明面上的体现就是调用的API不同了,不会阻塞了

异步非阻塞(AIO)是什么

在NIO中,Selector多路复用器在做轮询时,如果没有事件发生,也会进行阻塞,如何优化?

在 Java 7 中,NIO 有了进一步的改进,也就是 NIO 2,引入了异步非阻塞 IO 方式,也有很多人叫它 AIO(Asynchronous IO)。异步 IO 操作基于事件和回调机制,可以简单理解为,应用操作直接返回,而不会阻塞在那里,当后台处理完成,操作系统会通知相应线程进行后续工作。

AIO是异步IO(asynchronous I/O)的缩写,该异步IO是需要依赖于操作系统底层的异步IO实现。 虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的。对于NIO来说,我们的业务线程是在IO操作准备好时,得到通知,接着就由这个线程自行进行IO操作,IO操作本身是同步的。

但是对AIO来说,则更加进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此AIO是不会阻塞的,此时我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。

image-20211223205553581

AIO的基本流程:用户线程通过系统调用,告知kernel内核启动某个IO操作,用户线程返回。kernel内核在整个IO操作(包括数据准备、数据复制)完成后,通知用户程序,用户执行后续的业务操作。

与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,

  • 对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;
  • 对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。

即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。 在JDK1.7中,这部分内容被称作NIO.2—->AIO,主要在Java.nio.channels包下增加了下面四个异步通道:

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

在AIO socket编程中,服务端通道是AsynchronousServerSocketChannel,这个类提供了一个open()静态工厂,一个bind()方法用于绑定服务端IP地址(还有端口号),另外还提供了accept()用于接收用户连接请求。在客户端使用的通道是AsynchronousSocketChannel,这个通道处理提供open静态工厂方法外,还提供了readwrite方法。

在AIO编程中,发出一个事件(accept read write等)之后要指定事件处理类(回调函数),AIO中的事件处理类是CompletionHandler<V,A>,这个接口定义了如下两个方法,分别在异步操作成功和失败时被回调。

  • void completed(V result, A attachment);

  • void failed(Throwable exc, A attachment);

不足之处:

  • windows下实现成熟,但很少作为百万级以上或者说高并发应用的服务器操作系统来使用。
  • Linux 系统下,异步IO模型在2.6版本才引入,目前并不完善。所以 Linux 下,实现高并发网络编程时都是以 NIO 多路复用模型模式为主。

【案例1】AIO同步写法,读取客户端写过来的数据:

这里要补充一下客户端的代码,方便测试! ,直接用前面的NIO的客户端就行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) throws Exception {
//创建对象
AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open();

//绑定端口
assc.bind(new InetSocketAddress(8888));

//获取连接
//Future里面放的就是方法的结果
//********同步********
System.out.println("准备连接客户端");
Future<AsynchronousSocketChannel> future = assc.accept();
//Future方法需要调用get()方法获取真正的返回值
AsynchronousSocketChannel sc = future.get();
System.out.println("连接上了客户端");
//读取客户端发来的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取
//以前返回的是读取到的个数,真正的个数就在Future里面放着
//********同步********
System.out.println("准备读取数据");
Future<Integer> future2 = sc.read(buffer);
//获取真正的返回值
Integer len = future2.get();
System.out.println("读取到了数据");
//打印
System.out.println(new String(buffer.array(),0,len));
}

【案例2】AIO异步写法,读取客户端写过来的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//异步非阻塞连接和读取
public static void main(String[] args) throws IOException {
//创建对象
AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open();
//绑定端口
assc.bind(new InetSocketAddress(8000));
//异步非阻塞连接!!!!
//第一个参数是一个附件
System.out.println(1);
assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
// 异步操作成功时调用
@Override
public void completed(AsynchronousSocketChannel s, Object attachment) {
//如果连接客户端成功,应该获取客户端发来的数据
//completed()的第一个参数表示的是Socket对象.
System.out.println(5);
//创建数组
ByteBuffer buffer = ByteBuffer.allocate(1024);
//异步非阻塞读!!!!!
System.out.println(3);
s.read(buffer, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer len, Object attachment) {
//读取成功会自动调用这个方法
//completed()方法的第一个参数是read()读取到的实际个数
//打印数据
System.out.println(6);
System.out.println(new String(buffer.array(),0,len));
}

@Override
public void failed(Throwable exc, Object attachment) {

}
});
System.out.println(4);
}
// 异步操作失败时调用
@Override
public void failed(Throwable exc, Object attachment) {

}
});

System.out.println(2);

//让程序别结束写一个死循环
while(true){

}
}

异步没有阻塞的。

一张图解释3大场景

image-20200512230232797

为什么学NIO

互联网环境,分布式系统大行其道,分布式重要的是网络编程。网络编程中佼佼者是Netty,Netty底层是NIO.

NIO与多线程的关系

看起来NIO和多线程似乎关系不大,但是NIO虽然没有对多线程的控制与写作提出一些特别的观点,但是它改变了一个线程在应用层面的一些使用方式,也时解决了一些实际的困难,节省了成本。

Buffer

本质是个数组

buffer是什么

Buffer(缓冲区):Buffer是一个抽象类,包含一些要写入或者读出的数据,体现了与原I/O的一个重要区别,在面向流的I/O中,数据读写是直接进入到Stream中,而在NIO中,所有数据都是用缓冲区处理的,读数据直接从缓冲区读,写数据直接写入到缓冲区,Buffer是NIO读写数据的中转池缓冲区的本质是一个数组(做了些封装),通常使用一个字节数组(ByteBuffer),也可以使用其他类型,但缓冲区又不仅仅是一个数组,它还提供了对数据结构化访问以及维护读写位置等操作。

buffer 则用来缓冲读写数据,常见的 buffer 有

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

image-20211223203348791

ByteBuffer

byteBuffer 简单使用

有一普通文本文件 data.txt,内容为

1
1234567890abcd

使用 FileChannel 来读取文件内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
public class ChannelDemo1 {
public static void main(String[] args) {
try (RandomAccessFile file = new RandomAccessFile("helloword/data.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(10);
do {
// 向 buffer 写入
int len = channel.read(buffer); // 缓存区是会占用内存空间的,所以我们不能设置缓存区无限大,要分多次读
log.debug("读到字节数:{}", len);
if (len == -1) {
break;
}
// 切换 buffer 读模式
buffer.flip();
while(buffer.hasRemaining()) {
log.debug("{}", (char)buffer.get());
}
// 切换 buffer 写模式
buffer.clear();
} while (true);
} catch (IOException e) {
e.printStackTrace();
}
}
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 读到字节数:10
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 1
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 2
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 3
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 5
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 6
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 7
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 8
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 9
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 0
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 读到字节数:4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - a
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - b
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - c
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - d
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 读到字节数:-1

可以看到文件中的14个字节分2次读完,读不到了返回-1

ByteBuffer 正确使用姿势

  1. 向 buffer 写入数据,例如调用 channel.read(buffer)
  2. 调用 flip() 切换至读模式
  3. 从 buffer 读取数据,例如调用 buffer.get()
  4. 调用 clear()compact() 切换至写模式
  5. 重复 1~4 步骤

ByteBuffer 结构

ByteBuffer 有以下重要属性

  • capacity
  • position
  • limit

一开始

image-20230822191939750

写模式下,position 是写入位置,limit 等于容量,下图表示写入了 4 个字节后的状态

image-20230822191822218

flip 动作发生后,position 切换为读取位置,limit 切换为读取限制

image-20230822191831790

读取 4 个字节后,状态

image-20230822191841017

clear 动作发生后,状态

image-20230822191851344

compact 方法,是把未读完的部分向前压缩,然后切换至写模式

image-20230822191900686

调试工具类💡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];

static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}

int i;

// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}

// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}

// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}

// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}

// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}

/**
* 打印所有内容
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}

/**
* 打印可读取内容
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}

private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
NEWLINE + "+--------+-------------------------------------------------+----------------+");

final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;

// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;

// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");

// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}

// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");

// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}

dump.append(NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}

private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}

public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}

写入字节效果:

image-20220109160454921

flip的作用效果演示:

image-20220109160853036

compact的压缩效果:

image-20220109161419090

position从压缩的位置开始写入

ByteBuffer 常见方法

分配空间

可以使用 allocate() 方法为 ByteBuffer 分配空间,其它 buffer 类也有该方法

1
2
3
Bytebuffer buf = ByteBuffer.allocate(16); // java堆内存,读写效率低,收到GC影响

ByteBuffer buffer = ByteBuffer.allocateDirect(10); // 使用系统直接内存,读写效率高(少一次拷贝:零拷贝),不受GC影响,但是调用系统方法分配内存速度较慢,使用不当容易造成内存泄露,一定要合理释放

通过数组创建缓冲区:wrap(byte[] arr),此种方式创建的缓冲区为:间接缓冲区

1
2
3
4
public static void main(String[] args) {
byte[] byteArray = new byte[10];
ByteBuffer byteBuffer = ByteBuffer.wrap(byteArray);
}
  • 在堆中创建缓冲区称为:间接缓冲区

  • 在系统内存创建缓冲区称为:直接缓冲区

  • 间接缓冲区的创建和销毁效率要高于直接缓冲区

  • 间接缓冲区的工作效率要低于直接缓冲区

向 buffer 写入数据

有两种办法

  • 调用 channel 的 read() 方法 , 有点拗口,反正就是从channel取数据到buffer,对buffer来说就是写入了
  • 调用 buffer 自己的 put() 方法
1
int readBytes = channel.read(buf);

1
buf.put((byte)127);

更多例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Test_添加数据 {
public static void main(String[] args) {
ByteBuffer b1 = ByteBuffer.allocate(10);
// 添加数据
b1.put((byte)11);
b1.put((byte)12);
b1.put((byte)13);

// ByteBuffer转换为普通字节数组
byte[] bytes = b1.array();
System.out.println(Arrays.toString(bytes));
//打印结果: [11, 12, 13, 0, 0, 0, 0, 0, 0, 0]

System.out.println("=======================");
byte[] b2 = {14,15,16};
// 添加数据
b1.put(b2);
// ByteBuffer转换为普通字节数组
byte[] b = b1.array();
System.out.println(Arrays.toString(b));
//打印结果: [11, 12, 13, 14, 15, 16, 0, 0, 0, 0]
}
}

从 buffer 读取数据

同样有两种办法

  • 调用 channel 的 write() 方法 , 有点拗口,反正就是从buffer取数据到channel,对buffer来说就是读取了
  • 调用 buffer 自己的 get() 方法
1
int writeBytes = channel.write(buf);

1
byte b = buf.get();

get 方法会让 position 读指针向后走,如果想重复读取数据

  • 可以调用 rewind() 方法将 position 重新置为 0
  • 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针(position的位置不会变)

image-20220109163025857

容量-capacity

Buffer的容量(capacity)是指:Buffer所能够包含的元素的最大数量。定义了Buffer后,容量是不可变的。

示例代码:

1
2
3
4
5
6
7
8
public static void main(String[] args) {
ByteBuffer b1 = ByteBuffer.allocate(10);
System.out.println("容量:" + b1.capacity());//10。之后不可改变

byte[] byteArray = {97, 98, 99, 100};
ByteBuffer b2 = ByteBuffer.wrap(byteArray);
System.out.println("容量:" + b2.capacity());//4。之后不可改变
}

结果:

1
2
容量:10
容量:4

限制-limit

限制limit是指:第一个不应该读取或写入元素的index索引。缓冲区的限制(limit)不能为负,并且不能大于容量。

有两个相关方法:

  • public int limit():获取此缓冲区的限制。
  • public Buffer limit(int newLimit):设置此缓冲区的限制。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Test_添加数据 {
public static void main(String[] args) {
ByteBuffer b1 = ByteBuffer.allocate(10);
// 添加数据
b1.put((byte)10);

// 获取限制
int limit1 = b1.limit();
System.out.println("limit1:"+limit1);// 10

// 设置限制
b1.limit(3);

// 添加元素
b1.put((byte)20);
b1.put((byte)30);
// b1.put((byte)40);// 报异常,因为限制位置索引为3,所以再存40就会报异常:BufferOverflowException

}
}

图示:

image-20230822192039930

位置-position

位置position是指:当前可写入的索引。位置不能小于0,并且不能大于”限制”。

有两个相关方法:

  • public int position():获取当前可写入位置索引。
  • public Buffer position(int p):更改当前可写入位置索引。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Test_添加数据 {
public static void main(String[] args) {
ByteBuffer b1 = ByteBuffer.allocate(10);
// 添加数据
b1.put((byte)11);

// 获取当前位置索引
int position = b1.position();
System.out.println("position:"+position);// 1

// 设置当前位置索引
b1.position(5);

b1.put((byte)22);
b1.put((byte)33);
System.out.println("position:"+b1.position());// 7
System.out.println(Arrays.toString(b1.array()));
// 打印结果:[11, 0, 0, 0, 0, 22, 33, 0, 0, 0]
}
}

注意: 字节缓冲数组能操作的范围就是position位置到limit位置:[position,limit)

标记-mark 和 reset

标记mark是指:当调用缓冲区的reset()方法时,会将缓冲区的position位置重置为该索引。

相关方法:

  • public Buffer mark():设置此缓冲区的标记为当前的position位置。
  • public Buffer reset() : 将此缓冲区的位置重置为以前标记的位置。

mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置

注意: rewind 和 flip 都会清除 mark 位置

image-20220109163257798

更独例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {
ByteBuffer b1 = ByteBuffer.allocate(10);
// 添加数据
b1.put((byte)11);

// 获取当前位置索引
int position = b1.position();
System.out.println("position:"+position);// 1

// 标记当前位置索引
b1.mark();

// 添加元素
b1.put((byte)22);
b1.put((byte)33);

// 获取当前位置索引
System.out.println("position:"+b1.position());// 3
System.out.println(Arrays.toString(b1.array()));
// 打印结果:[11, 22, 33, 0, 0, 0, 0, 0, 0, 0]

// 重置当前位置索引
b1.reset();
// 获取当前位置索引
System.out.println("position:"+b1.position());// 1

// 添加元素
b1.put((byte)44);
System.out.println(Arrays.toString(b1.array()));
// 打印结果:[11, 44, 33, 0, 0, 0, 0, 0, 0, 0]

}

可以看到索引1的位置被覆盖了。说明指针是reset到之前mark标记的下标为1的位置。

字符串与 ByteBuffer 互转

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ---------- str => byteBuffer
ByteBuffer buffer1 = ByteBuffer.allocate(6);
buffer1.put("hello".getBytes());
debugAll(buffer1);

// 直接编码转换,position的位置是0.
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello");
debugAll(buffer2);
ByteBuffer buffer3 = Charset.defaultCharset().encode("hello");
debugAll(buffer3);
ByteBuffer buffer4 = ByteBuffer.wrap("hello".getBytes());
debugAll(buffer4);
ByteBuffer buffer5 = Charset.forName("utf-8").encode("你好");
debug(buffer5);

// ----------- byteBuffer => str
CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(buffer3.getClass());
System.out.println(buffer3.toString());

输出

1
2
3
4
5
6
7
8
9
10
11
12
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| e4 bd a0 e5 a5 bd |...... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| e4 bd a0 e5 a5 bd |...... |
+--------+-------------------------------------------------+----------------+
class java.nio.HeapCharBuffer
你好

其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- public int remaining():获取position与limit之间的元素数。
- public boolean isReadOnly():获取当前缓冲区是否只读。
- public boolean isDirect():获取当前缓冲区是否为直接缓冲区。
- public Buffer rewind():重绕此缓冲区。
- 将position位置设置为:0
- 限制limit不变。
- 丢弃标记。
- public Buffer clear():还原缓冲区的状态。
- 将position设置为:0
- 将限制limit设置为容量capacity;
- 丢弃标记mark。
- public Buffer flip():缩小limit的范围。
- 将limit设置为当前position位置;
- 将当前position位置设置为0
- 丢弃标记。

更多例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.itheima.demo11_ByteBuffer常用方法;

import java.nio.ByteBuffer;
import java.util.Arrays;

public class Test6_clear和flip {
public static void main(String[] args) {
// 创建ByteBuffer字节缓冲数组
ByteBuffer b = ByteBuffer.allocate(10);

// 容量:10,限制:10,位置:0
System.out.println("容量:" + b.capacity() + ",限制:" + b.limit() + ",位置:" + b.position());

// 往b添加数据
b.put((byte) 10);
b.put((byte) 20);
b.put((byte) 30);

// 容量:10,限制:10,位置:3
System.out.println("容量:" + b.capacity() + ",限制:" + b.limit() + ",位置:" + b.position());
System.out.println(Arrays.toString(b.array()));// [10, 20, 30, 0, 0, 0, 0, 0, 0, 0]

/*// 调用clear
b.clear();
// 容量:10,限制:10,位置:0
System.out.println("容量:"+b.capacity()+",限制:"+b.limit()+",位置:"+b.position());
System.out.println(Arrays.toString(b.array()));// [10, 20, 30, 0, 0, 0, 0, 0, 0, 0]
*/

// 调用flip()
b.flip();
// 容量:10,限制:3,位置:0
System.out.println("容量:"+b.capacity()+",限制:"+b.limit()+",位置:"+b.position());
System.out.println(Arrays.toString(b.array()));// [10, 20, 30, 0, 0, 0, 0, 0, 0, 0]

}
}

可以看到flip把limit设置成有数的最后一个下标,position置为0了。clear则position置为0,limit置为容量capacity的大小。

Buffer 的线程安全:warning:

Buffer 是非线程安全的

Scattering Reads

分散读取,有一个文本文件 3parts.txt

1
onetwothree

使用如下方式读取,可以将数据填充至多个 buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer a = ByteBuffer.allocate(3);
ByteBuffer b = ByteBuffer.allocate(3);
ByteBuffer c = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{a, b, c});
a.flip();
b.flip();
c.flip();
debug(a);
debug(b);
debug(c);
} catch (IOException e) {
e.printStackTrace();
}

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6f 6e 65 |one |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 77 6f |two |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 68 72 65 65 |three |
+--------+-------------------------------------------------+----------------+

可以看到这个文件的内容被分成3段读取到了

Gathering Writes

使用如下方式写入,可以将多个 buffer 的数据填充至 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer d = ByteBuffer.allocate(4);
ByteBuffer e = ByteBuffer.allocate(4);
channel.position(11);

d.put(new byte[]{'f', 'o', 'u', 'r'});
e.put(new byte[]{'f', 'i', 'v', 'e'});
d.flip();
e.flip();
debug(d);
debug(e);
channel.write(new ByteBuffer[]{d, e});
} catch (IOException e) {
e.printStackTrace();
}

输出

1
2
3
4
5
6
7
8
9
10
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 6f 75 72 |four |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 69 76 65 |five |
+--------+-------------------------------------------------+----------------+

文件内容

1
onetwothreefourfive

可以看到,这个文本的内容后面跟了2个单词,分2次写入的

黏包处理案例

网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为

  • Hello,world\n
  • I'm zhangsan\n
  • How are you?\n

变成了下面的两个 byteBuffer (黏包,半包)

  • Hello,world\nI'm zhangsan\nHo
  • w are you?\n

现在要求你编写程序,将错乱的数据恢复成原始的按 \n 分隔的数据(处理黏包,半包,后续netty底层会处理的更好)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
// 11 24
source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
split(source);

source.put("w are you?\nhaha!\n".getBytes());
split(source);
}

private static void split(ByteBuffer source) {
source.flip();
int oldLimit = source.limit();
for (int i = 0; i < oldLimit; i++) {
if (source.get(i) == '\n') {
System.out.println(i);
ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());
// 0 ~ limit
source.limit(i + 1);
target.put(source); // 从source 读,向 target 写
debugAll(target);
source.limit(oldLimit);
}
}
source.compact();
}

Channel

Channel是什么

Channel(通道):Channel 是一个通道,管道,接口,网络数据通过Channel读取和写入。可以把它看做是IO中的流,

Channel和流Stream的不同之处在于Channel是双向的,流只在一个方向上移(InputStream/OutputStream), 而Channel可以用于读写同时进行,即Channel是全双工的

image-20220530215218782

channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream (是单向的)要么是输入,要么是输出,channel 比 stream 更为底层

1
2
3
graph LR
channel --> buffer
buffer --> channel

image-20211223204129848

Java NIO中常见的 Channel 有

  • FileChannel : 文件的数据传输通道
  • DatagramChannel :UDP数据传输通道
  • SocketChannel :TCP式数据传输通道,客户端和服务器都可以用
  • ServerSocketChannel :TCP式数据传输通道,服务器用,可以监听TCP连接

image-20211223204027897

发送的事件:

image-20211223204213047

注意,流是单向的,channel是读写双向的。而且。可以channel可以设置成非阻塞式的。

文件编程(FileChannel)

FileChannel 工作模式:warning:

FileChannel 只能工作在阻塞模式下 (同步阻塞=>BIO,异步没有阻塞)

获取Channel

不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法

  • 通过 FileInputStream 获取的 channel 只能读
  • 通过 FileOutputStream 获取的 channel 只能写
  • 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定
1
2
3
4
5
FileInputStream fi=new FileInputStream(new File(src));
FileOutputStream fo=new FileOutputStream(new File(dst));
//获得传输通道channel
FileChannel inChannel=fi.getChannel();
FileChannel outChannel=fo.getChannel();

读取数据

会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1 表示到达了文件的末尾

1
int readBytes = channel.read(buffer); // 缓存区是会占用内存空间的,所以我们不能设置缓存区无限大,要分多次读

写入数据

写入的正确姿势如下, 后面要学的SocketChannel(处理能力有限,需要先检测是否有剩余数据,在写入)

1
2
3
4
5
6
7
ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip(); // 切换读模式

while(buffer.hasRemaining()) {
channel.write(buffer);
}

在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel

关闭Channel

channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法

Channel的位置

获取当前位置

1
long pos = channel.position();

设置当前位置

1
2
long newPos = ...;
channel.position(newPos);

设置当前位置时,如果设置为文件的末尾

  • 这时读取会返回 -1
  • 这时写入,会追加内容,但要注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)

Channel的大小

使用 size() 方法获取文件的大小

强制写入

操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘。可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

文件复制

FileChannel+ByteBuffer

通过CopyFile这个示例让大家体会NIO的操作过程。CopyFile执行三个基本的操作:创建一个Buffer,然后从源文件读取数据到缓冲区,然后再将缓冲区写入目标文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static void main(String[] args) throws Exception{
FileInputStream fis = new FileInputStream("day19\\aaa\\a.txt");
FileOutputStream fos = new FileOutputStream("day19\\aaa\\aCopy1.txt");
// 获得FileChannel管道对象
FileChannel c1 = fis.getChannel();
FileChannel c2 = fos.getChannel();

// 创建ByteBuffer数组
ByteBuffer b = ByteBuffer.allocate(1000);

// 循环读取数据
while ((c1.read(b)) != -1){// 读取的字节会填充postion到limit位置之间
// 重置 postion为0,limit为postion的位置
b.flip();
// 写出数据
c2.write(b);// 会把postion到limit之间的数据写出 (向channel写入,即从buffer读取)
// 还原
b.clear();// positon为:0 limit为: capacity 用于下次读取
}

// 释放资源
c2.close();
c1.close();
fos.close();
fis.close();

/*byte[] bys = new byte[8192];
int len;
while ((len = fis.read(bys)) != -1){
fos.write(bys,0,len);
}
fos.close();
fis.close();*/
}

结合MappedByteBuffer高效读写:复制2GB以下的文件

FileChannel+MappedByBuffer

  • 上例直接使用FileChannel结合ByteBuffer实现的管道读写,但并不能提高文件的读写效率。

  • ByteBuffer有个抽象子类:MappedByteBuffer,它可以将文件直接映射至内存,把硬盘中的读写变成内存中的读写, 所以可以提高大文件的读写效率。

  • 可以调用FileChannel的map()方法获取一个MappedByteBuffer,map()方法的原型:

    MappedByteBuffer map(MapMode mode, long position, long size);

    说明:将节点中从position开始的size个字节映射到返回的MappedByteBuffer中。

复制d:\\b.rar文件,此文件大概600多兆,复制完毕用时不到2秒。此例不能复制大于2G的文件,因为map的第三个参数被限制在Integer.MAX_VALUE(字节) = 2G

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) throws Exception{
//java.io.RandomAccessFile类,可以设置读、写模式的IO流类。
//"r"表示:只读--输入流,只读就可以。
RandomAccessFile r1 = new RandomAccessFile("day19\\aaa\\a.txt","r");
//"rw"表示:读、写--输出流,需要读、写。
RandomAccessFile r2 = new RandomAccessFile("day19\\aaa\\aCopy2.txt","rw");

// 获得FileChannel管道对象
FileChannel c1 = r1.getChannel();
FileChannel c2 = r2.getChannel();

// 获取文件的大小
long size = c1.size();

// 直接把硬盘中的文件映射到内存中
MappedByteBuffer b1 = c1.map(FileChannel.MapMode.READ_ONLY, 0, size);
MappedByteBuffer b2 = c2.map(FileChannel.MapMode.READ_WRITE, 0, size);

// 循环读取数据
for (long i = 0; i < size; i++) {
// 读取字节
byte b = b1.get();
// 保存到第二个数组中
b2.put(b);
}
// 释放资源
c2.close();
c1.close();
r2.close();
r1.close();
}
  • 代码说明:

  • map()方法的第一个参数mode:映射的三种模式,在这三种模式下得到的将是三种不同的MappedByteBuffer:三种模式都是Channel的内部类MapMode中定义的静态常量,这里以FileChannel举例:
    1). FileChannel.MapMode.READ_ONLY:得到的镜像只能读不能写(只能使用get之类的读取Buffer中的内容);

    2). FileChannel.MapMode.READ_WRITE:得到的镜像可读可写(既然可写了必然可读),对其写会直接更改到存储节点;

    3). FileChannel.MapMode.PRIVATE:得到一个私有的镜像,其实就是一个(position, size)区域的副本罢了,也是可读可写,只不过写不会影响到存储节点,就是一个普通的ByteBuffer了!!

  • 为什么使用RandomAccessFile?

    1). 使用InputStream获得的Channel可以映射,使用map时只能指定为READ_ONLY模式,不能指定为READ_WRITE和PRIVATE,否则会抛出运行时异常!

    2). 使用OutputStream得到的Channel不可以映射!并且OutputStream的Channel也只能write不能read!

    3). 只有RandomAccessFile获取的Channel才能开启任意的这三种模式!

下例使用循环,将文件分块,可以高效的复制大于2G的文件[更高效的写法]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public static void main(String[] args) throws Exception{
//java.io.RandomAccessFile类,可以设置读、写模式的IO流类。
//"r"表示:只读--输入流,只读就可以。
RandomAccessFile r1 = new RandomAccessFile("H:\\课堂资料.zip","r");
//"rw"表示:读、写--输出流,需要读、写。
RandomAccessFile r2 = new RandomAccessFile("H:\\课堂资料2.zip","rw");

// 获得FileChannel管道对象
FileChannel c1 = r1.getChannel();
FileChannel c2 = r2.getChannel();

// 获取文件的大小
long size = c1.size();

// 每次期望复制500M
int everySize = 1024*1024*500;

// 总共需要复制多少次
long count = size % everySize == 0 ? size/everySize : size/everySize+1;

// 开始复制
for (long i = 0; i < count; i++) {
// 每次开始复制的位置
long start = everySize*i;

// 每次复制的实际大小
long trueSize = size - start > everySize ? everySize : size - start;

// 直接把硬盘中的文件映射到内存中
MappedByteBuffer b1 = c1.map(FileChannel.MapMode.READ_ONLY, start, trueSize);
MappedByteBuffer b2 = c2.map(FileChannel.MapMode.READ_WRITE, start, trueSize);

// 循环读取数据
for (long j = 0; j < trueSize; j++) {
// 读取字节
byte b = b1.get();
// 保存到第二个数组中
b2.put(b);
}
}

// 释放资源
c2.close();
c1.close();
r2.close();
r1.close();

}

FileChannel复制2G以上的数据

FileChannel

1
2
3
4
5
6
7
8
9
10
11
12
String FROM = "helloword/data.txt";
String TO = "helloword/to.txt";
long start = System.nanoTime();
try (FileChannel from = new FileInputStream(FROM).getChannel();
FileChannel to = new FileOutputStream(TO).getChannel();
) {
from.transferTo(0, from.size(), to);
} catch (IOException e) {
e.printStackTrace();
}
long end = System.nanoTime();
System.out.println("transferTo 用时:" + (end - start) / 1000_000.0);

输出

1
transferTo 用时:8.2011

超过 2g 大小的文件传输:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestFileChannelTransferTo {
public static void main(String[] args) {
try (
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel();
) {
// 效率高,底层会利用操作系统的零拷贝进行优化
long size = from.size();
// left 变量代表还剩余多少字节
for (long left = size; left > 0; ) {
System.out.println("position:" + (size - left) + " left:" + left);
left -= from.transferTo((size - left), left, to);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

实际传输一个超大文件

1
2
3
4
position:0 left:7769948160
position:2147483647 left:5622464513
position:4294967294 left:3474980866
position:6442450941 left:1327497219

Paths工具类

Java Path是一个接口,位于java.nio.file包中,Java 7中引入到Java NIO中。

  • Path 用来表示文件路径(传统IO中是通过File来指向文件和路径的),大多数场景下是可以用Path来代替File的
  • Paths 是工具类,用来获取 Path 实例
1
2
3
4
5
6
7
Path source = Paths.get("1.txt"); // 相对路径 使用 user.dir 环境变量来定位 1.txt

Path source = Paths.get("d:\\1.txt"); // 绝对路径 代表了 d:\1.txt

Path source = Paths.get("d:/1.txt"); // 绝对路径 同样代表了 d:\1.txt

Path projects = Paths.get("d:\\data", "projects"); // 代表了 d:\data\projects
  • . 代表了当前路径
  • .. 代表了上一级路径

例如目录结构如下

1
2
3
4
5
d:
|- data
|- projects
|- a
|- b

代码

1
2
3
Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize()); // 正常化路径

会输出

1
2
d:\data\projects\a\..\b
d:\data\projects\b

Files工具类

Java NIO Files类(java.nio.file.Files)提供了一些方法用来操作文件,其是和上面提到的Path一起配合使用的。

检查文件是否存在

1
2
Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));

创建一级目录

1
2
Path path = Paths.get("helloword/d1");
Files.createDirectory(path);
  • 如果目录已存在,会抛异常 FileAlreadyExistsException
  • 不能一次创建多级目录,否则会抛异常 NoSuchFileException

创建多级目录用

1
2
Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);

拷贝文件

1
2
3
4
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");

Files.copy(source, target);
  • 如果文件已存在,会抛异常 FileAlreadyExistsException

如果希望用 source 覆盖掉 target,需要用 StandardCopyOption 来控制

1
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);

移动文件

1
2
3
4
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");

Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
  • StandardCopyOption.ATOMIC_MOVE 保证文件移动的原子性

删除文件

1
2
3
Path target = Paths.get("helloword/target.txt");

Files.delete(target);
  • 如果文件不存在,会抛异常 NoSuchFileException

删除目录

1
2
3
Path target = Paths.get("helloword/d1");

Files.delete(target);
  • 如果目录还有内容,会抛异常 DirectoryNotEmptyException

遍历目录文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws IOException {
Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");
AtomicInteger dirCount = new AtomicInteger();
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
System.out.println(dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
System.out.println(file);
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
});
System.out.println(dirCount); // 133
System.out.println(fileCount); // 1479
}

统计 jar 的数目

1
2
3
4
5
6
7
8
9
10
11
12
13
Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
if (file.toFile().getName().endsWith(".jar")) {
fileCount.incrementAndGet();
}
return super.visitFile(file, attrs);
}
});
System.out.println(fileCount); // 724

这里用到了AtomicInteger原子类

删除多级目录

:warning:删除是危险操作,确保要递归删除的文件夹没有重要内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Path path = Paths.get("d:\\a");
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
Files.delete(file); // 遍历文件时删除文件
return super.visitFile(file, attrs);
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc)
throws IOException {
Files.delete(dir); // 文件删除后,退出文件夹,删除文件夹
return super.postVisitDirectory(dir, exc);
}
});

拷贝多级目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
long start = System.currentTimeMillis();
String source = "D:\\Snipaste-1.16.2-x64";
String target = "D:\\Snipaste-1.16.2-x64aaa";

Files.walk(Paths.get(source)).forEach(path -> {
try {
String targetName = path.toString().replace(source, target);
// 是目录
if (Files.isDirectory(path)) {
Files.createDirectory(Paths.get(targetName));
}
// 是普通文件
else if (Files.isRegularFile(path)) {
Files.copy(path, Paths.get(targetName));
}
} catch (IOException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(end - start);

网络编程(ServerSocketChannel)

NIO网络编程收发信息

使用ServerSocketChannel代替之前的ServerSocket,来完成网络编程的收发数据。

  • 服务器代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Server {
public static void main(String[] args) throws IOException{
//创建对象
//ServerSocket ss = new ServerSocket(8888);

//创建
ServerSocketChannel ssc = ServerSocketChannel.open();
//服务器绑定端口
ssc.bind(new InetSocketAddress(8888));

//连接上客户端
SocketChannel sc = ssc.accept();

//服务器端接受数据
//创建数组
ByteBuffer buffer = ByteBuffer.allocate(1024);
//接收数据
int len = sc.read(buffer);
//打印结构
System.out.println(new String(buffer.array(),0,len));

//关闭资源
sc.close();
}
}
  • 客户端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Client {
public static void main(String[] args) {
//创建对象
//Socket s = new Socket("127.0.0.1",8888);

//创建对象
SocketChannel sc = SocketChannel.open();
//连接服务器
sc.connect(new InetSocketAddress("127.0.0.1",8888));

//客户端发数据
//创建数组
ByteBuffer buffer = ByteBuffer.allocate(1024);
//数组中添加数据
buffer.put("你好啊~".getBytes());
//切换
buffer.flip();
//发出数据
sc.write(buffer);

//关流
sc.close();
}
}

阻塞

  • 阻塞模式下,相关方法都会导致线程暂停
    • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
    • SocketChannel.read 会在没有数据可读时让线程暂停
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
  • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
  • 但多线程下,有新的问题,体现在以下方面
    • 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接
  • 服务器端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 使用 nio 来理解阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
// ServerSocketChannel类用于连接的服务器端,它相当于:ServerSocket。
ServerSocketChannel ssc = ServerSocketChannel.open();

// 2. 绑定本机监听端口,准备接受连接。
ssc.bind(new InetSocketAddress(8080)); // 调用ServerSocketChannel的静态方法open()就可以获得ServerSocketChannel对象, 但并没有指定端口号, 必须通过其套接字的bind方法将其绑定到特定地址,才能接受连接。

// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
log.debug("connecting...");
// 调用ServerSocketChannel的实例方法accept():等待连接。
SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行
log.debug("connected... {},后续代码......", sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
log.debug("before read... {}", channel);
channel.read(buffer); // 阻塞方法,线程停止运行
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}
  • 客户端
1
2
3
4
5
6
// SocketChannel类用于连接的客户端,它相当于:Socket
// 先调用SocketChannel的open()方法打开通道:
SocketChannel sc = SocketChannel.open();
// 调用SocketChannel的实例方法connect(SocketAddress add)连接服务器:
sc.connect(new InetSocketAddress("localhost", 8080)); // SocketAddress代表一个Socket地址。可以用它的 子类java.net.InetSocketAddress
System.out.println("waiting..."); // 在这里打断点,debug启动,客户端建立连接后不断开,可以观察效果

image-20220109174731304

还可以观察到服务端接收到消息,打印后,继续在等待新的连接。

非阻塞

我们可以通过ServerSocketChannel的configureBlocking(boolean b)方法设置accept()是否阻塞

  • 非阻塞模式下,相关方法都会不会让线程暂停
    • 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行
    • SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept
    • 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去
  • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu
  • 数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)

服务器端,客户端代码不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 使用 nio 来理解非阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // (服务器socket)非阻塞模式,写成false叫非阻塞, 写成true叫阻塞
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,但sc是null
if (sc != null) { //不等于null说明连接上了客户端
log.debug("connected... {}", sc);
sc.configureBlocking(false); // (客户端socket)非阻塞模式
channels.add(sc);
}
//没连接上客户端
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0
if (read > 0) {
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}
}

Selector

Selector是什么

上面的BIO和cannel,都是每次来个连接都会创建一个新的线程/channel。所以需要多路复用器

selector 单从字面意思不好理解,需要结合服务器的设计演化来理解它的用途。**”多路”是指:服务器端同时监听多个“端口”的情况。每个端口都要监听多个客户端的连接(端口)。**

选择器Selector是NIO中的重要技术之一。它与SelectableChannel联合使用实现了非阻塞的多路复用。使用它可以节省CPU资源,提高程序的运行效率。

Selector(选择器/多路复用器):Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,即该Channel处于就绪状态,它就会被Selector轮询出来,然后通过selectedKeys可以获取就绪Channel的集合,进行后续的I/O操作。

概述: Selector被称为:选择器,也被称为:多路复用器,可以把多个Channel注册到一个Selector选择器上, 那么就可以实现利用一个线程来处理这多个Channel上发生的事件,并且能够根据事件情况决定Channel读写。这样,通过一个线程管理多个Channel,就可以处理大量网络连接了, 减少系统负担, 提高效率。因为线程之间的切换对操作系统来说代价是很高的,并且每个线程也会占用一定的系统资源。所以,对系统来说使用的线程越少越好。

作用: 一个Selector可以监听多个Channel发生的事件, 减少系统负担 , 提高程序执行效率 .

image-20211223204253550

channel的关闭是在程序中如果有异常什么的自己关闭的。

多线程版设计:x:

如果不使用“多路复用”,服务器端需要开很多线程处理每个端口的请求。如果在高并发环境下,造成系统性能下降。

1
2
3
4
5
6
graph TD
subgraph 多线程版
t1(thread) --> s1(socket1)
t2(thread) --> s2(socket2)
t3(thread) --> s3(socket3)
end

image-20230822192056325

多线程版缺点:warning:

  • 内存占用高 (假设1个线程内存占1M,1000个连接.刚内存就占1G了)
  • 线程上下文切换成本高(线程数越多越好的前提是CPU要跟得上。比如CPU是16C,能同时跑的线程也就16个,其他线程要等待.讲来有空余的CPU了,就要改变等待的状态,也就是切换的成本)
  • 只适合连接数少的场景

线程池版设计:x:

1
2
3
4
5
6
7
graph TD
subgraph 线程池版
t4(thread) --> s4(socket1)
t5(thread) --> s5(socket2)
t4(thread) -.-> s6(socket3)
t5(thread) -.-> s7(socket4)
end

:warning:线程池版缺点

  • 阻塞模式下,线程仅能处理一个 socket 连接(socket1连接的时候,socket3只能等待,即使socket连接后不做任何读写,这个线程也只能等待它断开后再去处理其他的socket)
  • 仅适合短连接场景

selector版设计:heavy_check_mark:

使用了多路复用,只需要一个线程就可以处理多个通道,降低内存占用率,减少CPU切换时间,在高并发、高频段业务环境下有非常重要的优势

selector 的作用就是配合一个线程来管理多个 channel,获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。适合连接数特别多,但流量低的场景(low traffic)

1
2
3
4
5
6
7
graph TD
subgraph selector 版
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
end

image-20230822192111746

调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理

多路复用案例

  • 服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class NioServer2 {

public static void main(String[] args) throws Exception{
//打开一个serversocketchannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//绑定端口监听,设置非阻塞
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
serverSocketChannel.configureBlocking(false);

//打开一个selector
Selector selector = Selector.open();

//将serversocketchannel注册到selector上,监听连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//创建单reactor线程 让selector执行多路复用程序
new Thread(new SingleReactor(selector) ).start();
}

static class SingleReactor implements Runnable {

private final Selector selector;

public SingleReactor(Selector selector) {
this.selector = selector;
}

@Override
public void run() {
//轮询selector 处理事件即可
while (true) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
processSelectionKey(selectionKey);
iterator.remove();
}
} catch (Exception e) {

}
}
}

private void processSelectionKey(SelectionKey selectionKey) throws Exception {
if (selectionKey.isValid()) {
//根据事件类型分发处理
if (selectionKey.isAcceptable()) { // 连接事件准备就绪了
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
//接收一个连接
SocketChannel socketChannel = serverSocketChannel.accept();
//配置非阻塞模式
socketChannel.configureBlocking(false);
//将其注册到selector上,监听可读事件
socketChannel.register(selector,SelectionKey.OP_READ);
}

if (selectionKey.isReadable()) { //通道可读事件准备就绪,可以读取数据了
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//从通道内读数据即可
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);

//从byteBuffer中拿出数据 转换模式
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);

//将数据转换成string
String msg = new String(bytes,Charset.defaultCharset());
System.out.println("服务端收到来自客户端的数据:"+msg);

//像客户端写数据 可以重新创建一个buffer
byteBuffer.clear();
byteBuffer.put("hello nioclient,i am nioserver".getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
socketChannel.write(byteBuffer);
}

}
}
}
}

  • 客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public class NioClient {

public static void main(String[] args) {
try {
//1、窗口客户端SocketChannel,绑定客户端本地地址(不选默认随机分配一个可用地址)
SocketChannel socketChannel = SocketChannel.open();
//2、设置非阻塞模式,
socketChannel.configureBlocking(false);
//3、创建Selector
Selector selector = Selector.open();
//3、创建Reactor线程
new Thread(new SingleReactorClient(socketChannel,selector)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}

class SingleReactorClient implements Runnable{
private final SocketChannel socketChannel;
private final Selector selector;

public SingleReactorClient(SocketChannel socketChannel, Selector selector) {
this.socketChannel = socketChannel;
this.selector = selector;
}

public void run() {
try {
//连接服务端
doConnect(socketChannel,selector);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}

//5、多路复用器执行多路复用程序
while (true) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
processKey(selectionKey);
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

private void doConnect(SocketChannel sc, Selector selector) throws IOException {
System.out.println("客户端成功启动,开始连接服务端");
//3、连接服务端
boolean connect = sc.connect(new InetSocketAddress("127.0.0.1", 8888));
//4、将socketChannel注册到selector并判断是否连接成功,连接成功监听读事件,没有继续监听连接事件
System.out.println("connect="+connect);
if (connect) {
sc.register(selector, SelectionKey.OP_READ);
System.out.println("客户端成功连上服务端,准备发送数据");
//开始进行业务处理,向服务端发送数据
doService(sc);
}else {
sc.register(selector,SelectionKey.OP_CONNECT);
}
}

private void processKey(SelectionKey key) throws IOException {
if (key.isValid()) {
//6、根据准备就绪的事件类型分别处理
if (key.isConnectable()) {//服务端可连接事件准备就绪
SocketChannel sc = (SocketChannel) key.channel();
if (sc.finishConnect()) {
//6.1、向selector注册可读事件(接收来自服务端的数据)
sc.register(selector,SelectionKey.OP_READ);
//6.2、处理业务 向服务端发送数据
doService(sc);
}else {
//连接失败,退出
System.exit(1);
}
}

if (key.isReadable()) {//读事件准备继续
//6.1、读服务端返回的数据
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBufer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBufer);
//前面设置过socketChannel是非阻塞的,故要通过返回值判断读取到的字节数
if (readBytes > 0) {
readBufer.flip();//读写模式切换
byte[] bytes = new byte[readBufer.remaining()];
readBufer.get(bytes);
String msg = new String(bytes,"utf-8");
//接收到服务端返回的数据后进行相关操作
doService(msg);
}else if (readBytes < 0) {
//值为-1表示链路通道已经关闭
key.cancel();
sc.close();
}else {
//没读取到数据,忽略
}
}
}
}
private static void doService(SocketChannel socketChannel) throws IOException {
System.out.println("客户端开始向服务端发送数据:");
//向服务端发送数据
byte[] bytes = "hello nioServer,i am nioClient !".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
}

private String doService(String msg) {
System.out.println("成功接收来自服务端响应的数据:"+msg);
return "";
}
}

上面将了3大组件的接口,下面说下3大组件具体的常用类

Selector常用方法

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络 IO,普通文件 IO 没法利用多路复用
  • 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入
      • 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件
1
2
3
4
5
6
7
graph TD
subgraph selector 版
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
end

好处

  • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

创建

1
Selector selector = Selector.open();

绑定 Channel 事件

也称之为注册事件,绑定的事件 selector 才会关心

1
2
channel.configureBlocking(false);// 设置非阻塞
SelectionKey key = channel.register(selector, 绑定事件);
  • channel 必须工作在非阻塞模式
  • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
  • 绑定的事件类型可以有
    • connect - 客户端连接成功时触发
    • accept - 服务器端成功接受连接时触发
    • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
    • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

register()方法的第二个参数:是一个int值,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件,而且可以使用SelectionKey的四个常量表示:

  1. 连接就绪–常量:SelectionKey.OP_CONNECT

  2. 接收就绪–常量:SelectionKey.OP_ACCEPT (ServerSocketChannel在注册时只能使用此项)

  3. 读就绪–常量:SelectionKey.OP_READ

  4. 写就绪–常量:SelectionKey.OP_WRITE

    注意:对于ServerSocketChannel在注册时,只能使用OP_ACCEPT,否则抛出异常。

【案例】:监听一个通道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Test1 {
public static void main(String[] args) throws Exception{
/*
- Selector选择器的概述和作用
概述: Selector被称为:选择器,也被称为:多路复用器,可以把多个Channel注册到一个Selector选择器上,
那么就可以实现利用一个线程来处理这多个Channel上发生的事件,并且能够根据事件情况决定Channel读写。
作用: 一个Selector可以监听多个Channel发生的事件, 减少系统负担 , 提高程序执行效率 .

- Selector选择器的获取
通过Selector.open()来获取Selector选择器对象
- 注册Channel到Selector
通过Channel的register(Selector sel, int ops)方法把Channel注册到指定的选择器上
参数1: 表示选择器
参数2: 选择器要监听Channel的什么事件
注意:
1.对于ServerSocketChannel在注册时,只能使用OP_ACCEPT,否则抛出异常。
2.ServerSocketChannel要设置成非阻塞
*/
// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc1 = ServerSocketChannel.open();
// 绑定端口号
ssc1.bind(new InetSocketAddress(7777));

// 设置非阻塞
ssc1.configureBlocking(false);

// 获取Selector选择器对象
Selector selector = Selector.open();

// 把服务器通道的accept()交给选择器来处理
// 注册Channel到Selector选择器上
ssc1.register(selector, SelectionKey.OP_ACCEPT);
}
}

【案例】:服务器创建3个通道,同时监听3个端口,并将3个通道注册到一个选择器中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Test2 {
public static void main(String[] args) throws Exception{
/*
把多个Channel注册到一个选择器上
*/
// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc1 = ServerSocketChannel.open();
// 绑定端口号
ssc1.bind(new InetSocketAddress(7777));

// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc2 = ServerSocketChannel.open();
// 绑定端口号
ssc2.bind(new InetSocketAddress(8888));


// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc3 = ServerSocketChannel.open();
// 绑定端口号
ssc3.bind(new InetSocketAddress(9999));


// 设置非阻塞
ssc1.configureBlocking(false);
ssc2.configureBlocking(false);
ssc3.configureBlocking(false);

// 获取Selector选择器对象
Selector selector = Selector.open();

// 把服务器通道的accept()交给选择器来处理
// 注册Channel到Selector选择器上
ssc1.register(selector, SelectionKey.OP_ACCEPT);
ssc2.register(selector,SelectionKey.OP_ACCEPT);
ssc3.register(selector,SelectionKey.OP_ACCEPT);
}
}

接下来,就可以通过选择器selector操作三个通道了。

监听 Channel 事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件

方法1,阻塞直到绑定事件发生

1
int count = selector.select();

方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)

1
int count = selector.select(long timeout);

方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

1
int count = selector.selectNow();

💡 select 何时不阻塞

  • 事件发生时
    • 客户端发起连接请求,会触发 accept 事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
    • channel 可写,会触发 write 事件
    • 在 linux 下 nio bug 发生时
  • 调用 selector.wakeup()
  • 调用 selector.close()
  • selector 所在线程 interrupt

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class Server1 {
public static void main(String[] args) throws Exception {
/*
- Selector的select()方法
作用:服务器等待客户端连接的方法
阻塞:
1.在没有客户端连接之前该方法会一直阻塞
2.当连接到客户端后没有被处理,该方法就会进入不阻塞状态
3.当连接到客户端后有被处理,该方法就会进入阻塞状态
*/
// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc1 = ServerSocketChannel.open();
// 绑定端口号
ssc1.bind(new InetSocketAddress(7777));

// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc2 = ServerSocketChannel.open();
// 绑定端口号
ssc2.bind(new InetSocketAddress(8888));


// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc3 = ServerSocketChannel.open();
// 绑定端口号
ssc3.bind(new InetSocketAddress(9999));


// 设置非阻塞
ssc1.configureBlocking(false);
ssc2.configureBlocking(false);
ssc3.configureBlocking(false);

// 获取Selector选择器对象
Selector selector = Selector.open();

// 把服务器通道的accept()交给选择器来处理
// 注册Channel到Selector选择器上
ssc1.register(selector, SelectionKey.OP_ACCEPT);
ssc2.register(selector, SelectionKey.OP_ACCEPT);
ssc3.register(selector, SelectionKey.OP_ACCEPT);

// 死循环一直接受客户端的连接请求
while (true) {
System.out.println(1);
// 服务器等待客户端的连接
selector.select();// 阻塞
System.out.println(2);

// 处理客户端请求的代码--->暂时看不懂,先放着
Set<SelectionKey> keySet = selector.selectedKeys();// 存储所有被连接的服务器Channel对象
for (SelectionKey key : keySet) {
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();
System.out.println("...开始处理,接受数据,代码省略...");
//...
}
}

}
}

SelectedKeys()方法

获取已连接的所有通道集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class Server2 {
public static void main(String[] args) throws Exception {
/*
- Selector的selectedKeys()方法
作用: 获取所有被连接的服务器Channel对象的Set集合
该Set集合中的元素类型是SelectionKey,该SelectionKey类其实就是对Channel的一个封装
如何获取被连接的服务器Channel对象:
遍历所有被连接的服务器Channel对象的Set集合
获取该集合中的SelectionKey对象
根据SelectionKey对象调用channel方法,获得服务器Channel对象
- Selector的keys()方法
*/
// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc1 = ServerSocketChannel.open();
// 绑定端口号
ssc1.bind(new InetSocketAddress(7777));

// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc2 = ServerSocketChannel.open();
// 绑定端口号
ssc2.bind(new InetSocketAddress(8888));


// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc3 = ServerSocketChannel.open();
// 绑定端口号
ssc3.bind(new InetSocketAddress(9999));


// 设置非阻塞
ssc1.configureBlocking(false);
ssc2.configureBlocking(false);
ssc3.configureBlocking(false);

// 获取Selector选择器对象
Selector selector = Selector.open();

// 把服务器通道的accept()交给选择器来处理
// 注册Channel到Selector选择器上
ssc1.register(selector, SelectionKey.OP_ACCEPT);
ssc2.register(selector, SelectionKey.OP_ACCEPT);
ssc3.register(selector, SelectionKey.OP_ACCEPT);

// 获取所有被连接的服务器Channel对象的Set集合
// 该Set集合中的元素类型是SelectionKey,该SelectionKey类其实就是对Channel的一个封装
Set<SelectionKey> keySet = selector.selectedKeys();
System.out.println("被连接的服务器对象有多少个:"+keySet.size());// 0

// 死循环一直接受客户端的连接请求
while (true) {
System.out.println(1);
// 服务器等待客户端的连接
selector.select();// 阻塞
System.out.println(2);
System.out.println("被连接的服务器对象个数:"+keySet.size());// 有多少个客户端连接服务器成功,就打印几

// 处理客户端请求的代码--->暂时看不懂,先放着
// 获取所有被连接的服务器Channel对象的集合
/*Set<SelectionKey> keySet = selector.selectedKeys();
// 遍历所有被连接的服务器Channel对象,拿到每一个SelectionKey
for (SelectionKey key : keySet) {
// 根据SelectionKey获取服务器Channel对象
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
// 获得客户端Channel对象
SocketChannel sc = ssc.accept();
// 处理
System.out.println("...开始处理,接受数据,代码省略...");
//...
}*/
}

}
}

案例:使用Selector进行多路复用,监听3个服务器端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class Server1 {
public static void main(String[] args) throws Exception {
/*
需求: 使用Selector进行多路复用,监听3个服务器端口
分析:
1.创建3个服务器Channel对象,并绑定端口号
2.把3个服务器Channel对象设置成非阻塞
3.获得Selector选择器
4.把3个个服务器Channel对象对象注册到同一个Selector选择器上,指定监听事件
5.死循环去等待客户端的连接
6.获取所有被连接的服务器Channel对象的Set集合
7.循环遍历所有被连接的服务器Channel对象
8.处理客户端的请求
*/
// 1.创建3个服务器Channel对象,并绑定端口号
ServerSocketChannel ssc1 = ServerSocketChannel.open();
ssc1.bind(new InetSocketAddress(7777));

ServerSocketChannel ssc2 = ServerSocketChannel.open();
ssc2.bind(new InetSocketAddress(8888));

ServerSocketChannel ssc3 = ServerSocketChannel.open();
ssc3.bind(new InetSocketAddress(9999));

// 2.把3个服务器Channel对象设置成非阻塞
ssc1.configureBlocking(false);
ssc2.configureBlocking(false);
ssc3.configureBlocking(false);

// 3.获得Selector选择器
Selector selector = Selector.open();

// 4.把3个个服务器Channel对象对象注册到同一个Selector选择器上,指定监听事件
ssc1.register(selector, SelectionKey.OP_ACCEPT);
ssc2.register(selector, SelectionKey.OP_ACCEPT);
ssc3.register(selector, SelectionKey.OP_ACCEPT);

// 5.死循环去等待客户端的连接
while (true) {
// 服务器等待客户端连接
System.out.println(1);
selector.select();

// 6.获取所有被连接的服务器Channel对象的Set集合
Set<SelectionKey> keySet = selector.selectedKeys(); // 2

// 7.循环遍历所有被连接的服务器Channel对象,获取每一个被连接的服务器Channel对象
for (SelectionKey key : keySet) {// 遍历出7777端口 8888端口
// 8.由于SelectionKey是对Channel的封装,所以我们得根据key获取被连接的服务器Channel对象
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
// 9.处理客户端的请求
// 9.1 获取连接的客户端对象
SocketChannel sc = ssc.accept();
// 9.2 创建ByteBuffer缓冲数组
ByteBuffer b = ByteBuffer.allocate(1024);
// 9.3 读取数据
int len = sc.read(b);// 把读取到的字节数据存储到b缓冲数组中,返回读取到的字节个数
// 9.4 打印输出
System.out.println(new String(b.array(), 0, len));
// 10. 释放资源
sc.close();
}
}
/*
- 问题: Selector把所有被连接的服务器对象放在了一个Set集合中,但是使用完后并没有删除,
导致在遍历集合时,遍历到已经没用的对象,出现了异常
- 解决办法: 使用完了,应该从集合中删除,由于遍历的同时不能删除,所以使用迭代器进行遍历

*/
}
}

  • 问题: Selector把所有被连接的服务器对象放在了一个Set集合中,但是使用完后并没有删除,导致在遍历集合时,遍历到已经没用的对象,出现了异常

  • 解决办法: 使用完了,应该从集合中删除,由于遍历的同时不能删除,所以使用迭代器进行遍历

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class Server2 {
public static void main(String[] args) throws Exception {
/*
需求: 使用Selector进行多路复用,监听3个服务器端口
分析:
1.创建3个服务器Channel对象,并绑定端口号
2.把3个服务器Channel对象设置成非阻塞
3.获得Selector选择器
4.把3个个服务器Channel对象对象注册到同一个Selector选择器上,指定监听事件
5.死循环去等待客户端的连接
6.获取所有被连接的服务器Channel对象的Set集合
7.循环遍历所有被连接的服务器Channel对象
8.处理客户端的请求

- 问题: Selector把所有被连接的服务器对象放在了一个Set集合中,但是使用完后并没有删除,
导致在遍历集合时,遍历到已经没用的对象,出现了异常
- 解决办法: 使用完了,应该从集合中删除,由于遍历的同时不能删除,所以使用迭代器进行遍历
*/
// 1.创建3个服务器Channel对象,并绑定端口号
ServerSocketChannel ssc1 = ServerSocketChannel.open();
ssc1.bind(new InetSocketAddress(7777));

ServerSocketChannel ssc2 = ServerSocketChannel.open();
ssc2.bind(new InetSocketAddress(8888));

ServerSocketChannel ssc3 = ServerSocketChannel.open();
ssc3.bind(new InetSocketAddress(9999));

// 2.把3个服务器Channel对象设置成非阻塞
ssc1.configureBlocking(false);
ssc2.configureBlocking(false);
ssc3.configureBlocking(false);

// 3.获得Selector选择器
Selector selector = Selector.open();

// 4.把3个个服务器Channel对象对象注册到同一个Selector选择器上,指定监听事件
ssc1.register(selector, SelectionKey.OP_ACCEPT);
ssc2.register(selector, SelectionKey.OP_ACCEPT);
ssc3.register(selector, SelectionKey.OP_ACCEPT);

// 5.死循环去等待客户端的连接
while (true) {
// 服务器等待客户端连接
System.out.println(1);
selector.select();

// 6.获取所有被连接的服务器Channel对象的Set集合
Set<SelectionKey> keySet = selector.selectedKeys();

// 7.循环遍历所有被连接的服务器Channel对象,获取每一个被连接的服务器Channel对象
Iterator<SelectionKey> it = keySet.iterator();
// 迭代器的快捷键: itit
while (it.hasNext()){

// 遍历出来的SelectionKey
SelectionKey key = it.next();

// 8.由于SelectionKey是对Channel的封装,所以我们得根据key获取被连接的服务器Channel对象
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
// 9.处理客户端的请求
// 9.1 获取连接的客户端对象
SocketChannel sc = ssc.accept();
// 9.2 创建ByteBuffer缓冲数组
ByteBuffer b = ByteBuffer.allocate(1024);
// 9.3 读取数据
int len = sc.read(b);// 把读取到的字节数据存储到b缓冲数组中,返回读取到的字节个数
// 9.4 打印输出
System.out.println(new String(b.array(), 0, len));
// 10. 释放资源
sc.close();

// 用完了就得删除
it.remove();
}
}
}
}

Keys()方法

获取已注册的所有通道集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class Server3 {
public static void main(String[] args) throws Exception {
/*
- Selector的keys()方法
获取所有被注册的服务器Channel对象的Set集合
该Set集合中的元素类型是SelectionKey,该SelectionKey类其实就是对Channel的一个封装
*/
// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc1 = ServerSocketChannel.open();
// 绑定端口号
ssc1.bind(new InetSocketAddress(7777));

// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc2 = ServerSocketChannel.open();
// 绑定端口号
ssc2.bind(new InetSocketAddress(8888));


// 获取ServerSocketChannel服务器通道对象
ServerSocketChannel ssc3 = ServerSocketChannel.open();
// 绑定端口号
ssc3.bind(new InetSocketAddress(9999));


// 设置非阻塞
ssc1.configureBlocking(false);
ssc2.configureBlocking(false);
ssc3.configureBlocking(false);

// 获取Selector选择器对象
Selector selector = Selector.open();

// 把服务器通道的accept()交给选择器来处理
// 注册Channel到Selector选择器上
ssc1.register(selector, SelectionKey.OP_ACCEPT);
ssc2.register(selector, SelectionKey.OP_ACCEPT);
ssc3.register(selector, SelectionKey.OP_ACCEPT);

// 获取所有被连接的服务器Channel对象的Set集合
// 该Set集合中的元素类型是SelectionKey,该SelectionKey类其实就是对Channel的一个封装
Set<SelectionKey> keySet = selector.selectedKeys();
System.out.println("被连接的服务器对象有多少个:"+keySet.size());// 0

// 获取所有被注册的服务器Channel对象的Set集合
// 该Set集合中的元素类型是SelectionKey,该SelectionKey类其实就是对Channel的一个封装
Set<SelectionKey> keys = selector.keys();
System.out.println("被注册的服务器对象有多少个:"+keys.size()); // 3


// 死循环一直接受客户端的连接请求
while (true) {
System.out.println(1);
// 服务器等待客户端的连接
selector.select();// 阻塞
System.out.println(2);
System.out.println("被连接的服务器对象个数:"+keySet.size());// 有多少个客户端连接服务器成功,就打印几
System.out.println("被注册的服务器对象个数:"+keys.size());// 选择器上注册了多少个服务器Channel,就打印几
}

}
}

处理 accept 事件

客户端代码为

1
2
3
4
5
6
7
8
9
10
11
public class Client {
public static void main(String[] args) {
try (Socket socket = new Socket("localhost", 8080)) {
System.out.println(socket);
socket.getOutputStream().write("world".getBytes());
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}

服务器端代码为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
// 没有事件发生会线程阻塞,有事件才会恢复运行(事件未处理并不会阻塞,所以要处理事件(取消事件也算处理))
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }

// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();

// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
log.debug("{}", sc);
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

💡 事件发生后能否不处理

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

处理 read 事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }

// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();

// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
log.debug("连接已建立: {}", sc);
} else if (key.isReadable()) {
try {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if (read == -1) {
key.cancel(); // 正常断开,要取消key
sc.close();
} else {
buffer.flip();
debugRead(buffer);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 异常断开,也要取消key
}
}
// 处理完毕,必须将事件移除。不然再拿这个key去做操作会报空指针
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

开启两个客户端,修改一下发送文字,输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - 连接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60367]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - 连接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60378]
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 6f 72 6c 64 |world |
+--------+-------------------------------------------------+----------------+

💡 为何要 iter.remove()

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

💡 cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

不处理边界的问题:warning:

以前有同学写过这样的代码,思考注释中两个问题,以 bio 为例,其实 nio 道理是一样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Server {
public static void main(String[] args) throws IOException {
ServerSocket ss=new ServerSocket(9000);
while (true) {
Socket s = ss.accept();
InputStream in = s.getInputStream();
// 这里这么写,有没有问题
byte[] arr = new byte[4];
while(true) {
int read = in.read(arr);
// 这里这么写,有没有问题
if(read == -1) {
break;
}
System.out.println(new String(arr, 0, read));
}
}
}
}

客户端

1
2
3
4
5
6
7
8
9
10
public class Client {
public static void main(String[] args) throws IOException {
Socket max = new Socket("localhost", 9000);
OutputStream out = max.getOutputStream();
out.write("hello".getBytes());
out.write("world".getBytes());
out.write("你好".getBytes());
max.close();
}
}

输出

1
2
3
4
5
hell
owor
ld�
�好

为什么?

处理消息的边界

image-20230822192021069

  • 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽

  • 另一种思路是按分隔符拆分,缺点是效率低

  • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量

    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式

image-20220110212320291

1
2
3
4
5
6
7
8
9
10
11
sequenceDiagram 
participant c1 as 客户端1
participant s as 服务器
participant b1 as ByteBuffer1
participant b2 as ByteBuffer2
c1 ->> s: 发送 01234567890abcdef3333\r
s ->> b1: 第一次 read 存入 01234567890abcdef
s ->> b2: 扩容
b1 ->> b2: 拷贝 01234567890abcdef
s ->> b2: 第二次 read 存入 3333\r
b2 ->> b2: 01234567890abcdef3333\r

服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
// 找到一条完整消息
if (source.get(i) == '\n') {
int length = i + 1 - source.position();
// 把这条完整消息存入新的 ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
// 从 source 读,向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
source.compact(); // 0123456789abcdef position 16 limit 16
}

public static void main(String[] args) throws IOException {
// 1. 创建 selector, 管理多个 channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2. 建立 selector 和 channel 的联系(注册)
// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("sscKey:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true) {
// 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
// select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
selector.select();
// 4. 处理事件, selectedKeys 内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
iter.remove();
log.debug("key: {}", key);
// 5. 区分事件类型
if (key.isAcceptable()) { // 如果是 accept
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
// 将一个 byteBuffer 作为附件关联到 selectionKey 上
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
log.debug("scKey:{}", scKey);
} else if (key.isReadable()) { // 如果是 read
try {
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
// 获取 selectionKey 上关联的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
if(read == -1) {
key.cancel();
} else {
split(buffer);
// 需要扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer); // 0123456789abcdef3333\n
key.attach(newBuffer);
}
}

} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
}
}
}
}
}

客户端

1
2
3
4
5
6
7
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();

ByteBuffer 大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

处理 write 事件

一次无法写完例子

  • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
    • 如果不取消,会每次可写均会触发 write 事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class WriteServer {

public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));

Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);

while(true) {
selector.select();

Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept(); // 这里和key.channel强转拿到的是同一个serverSocketChannel
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客户端发送内容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer); // 并不能保证一次把所有消息都写给客户端
// 3. write 表示实际写了多少字节
System.out.println("实际写入字节:" + write);
// 4. 如果有剩余未读字节,才需要关注写事件
if (buffer.hasRemaining()) {
// read 1 write 4
// 在原有关注事件的基础上,多关注 写事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作为附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) { // 可写事件,缓冲区有可能会写满,写不进去.可写的时候写入.不能写的时候,就让它去做读事件。
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println("实际写入字节:" + write);
if (!buffer.hasRemaining()) { // 写完了
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class WriteClient {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
sc.connect(new InetSocketAddress("localhost", 8080));
int count = 0;
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isConnectable()) {
System.out.println(sc.finishConnect());
} else if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
buffer.clear();
System.out.println(count);
}
}
}
}
}

write为何要取消💡

只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注

更进一步

利用多线程优化💡

现在都是多核 cpu,设计时要充分考虑别让 cpu 的力量被白白浪费

前面的代码只有单线程配合一个selector选择器来管理多个channel上的事件,没有充分利用多核 cpu,如何改进呢?

分两组选择器

  • 单线程配一个选择器,专门处理 accept 事件
  • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件

image-20220110220243167

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
public class ChannelDemo7 {
public static void main(String[] args) throws IOException {
new BossEventLoop().register();
}


@Slf4j
static class BossEventLoop implements Runnable {
private Selector boss;
private WorkerEventLoop[] workers;
private volatile boolean start = false;
AtomicInteger index = new AtomicInteger();

public void register() throws IOException {
if (!start) {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
boss = Selector.open();
SelectionKey ssckey = ssc.register(boss, 0, null);
ssckey.interestOps(SelectionKey.OP_ACCEPT);
workers = initEventLoops();
new Thread(this, "boss").start();
log.debug("boss start...");
start = true;
}
}

public WorkerEventLoop[] initEventLoops() {
// 充分利用CPU的话,比如4核就设置4个
// EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
for (int i = 0; i < workerEventLoops.length; i++) {
workerEventLoops[i] = new WorkerEventLoop(i);
}
return workerEventLoops;
}

@Override
public void run() {
while (true) {
try {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
SocketChannel sc = c.accept();
sc.configureBlocking(false);
log.debug("{} connected", sc.getRemoteAddress());
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

@Slf4j
static class WorkerEventLoop implements Runnable {
private Selector worker;
private volatile boolean start = false; // 用来保证初始化的只执行了一遍
private int index;

private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();

public WorkerEventLoop(int index) {
this.index = index;
}

public void register(SocketChannel sc) throws IOException {
if (!start) {
worker = Selector.open();
new Thread(this, "worker-" + index).start();
start = true;
}
tasks.add(() -> {
try {
SelectionKey sckey = sc.register(worker, 0, null);
sckey.interestOps(SelectionKey.OP_READ);
worker.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
});
worker.wakeup();
}

@Override
public void run() {
while (true) {
try {
worker.select();
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
Set<SelectionKey> keys = worker.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isReadable()) { // 这里的selectkey是work的,只关注读写事件就行了
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
try {
int read = sc.read(buffer);
if (read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
log.debug("{} message:", sc.getRemoteAddress());
debugAll(buffer);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
sc.close();
}
}
iter.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

如何拿到 cpu 个数💡

  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
  • 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

UDP

  • UDP 是无连接的,client 发送数据不会管 server 是否开启
  • server 这边的 receive 方法会将接收到的数据存入 byte buffer,但如果数据报文超过 buffer 大小,多出来的数据会被默默抛弃

首先启动服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class UdpServer {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.socket().bind(new InetSocketAddress(9999));
System.out.println("waiting...");
ByteBuffer buffer = ByteBuffer.allocate(32);
channel.receive(buffer);
buffer.flip();
debug(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}

输出

1
waiting...

运行客户端

1
2
3
4
5
6
7
8
9
10
11
public class UdpClient {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
channel.send(buffer, address);
} catch (Exception e) {
e.printStackTrace();
}
}
}

接下来服务器端输出

1
2
3
4
5
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+

多路复用监听多个端口

使用Selector进行多路复用,监听3个服务器端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class Server1 {
public static void main(String[] args) throws Exception {
/*
需求: 使用Selector进行多路复用,监听3个服务器端口
分析:
1.创建3个服务器Channel对象,并绑定端口号
2.把3个服务器Channel对象设置成非阻塞
3.获得Selector选择器
4.把3个个服务器Channel对象对象注册到同一个Selector选择器上,指定监听事件
5.死循环去等待客户端的连接
6.获取所有被连接的服务器Channel对象的Set集合
7.循环遍历所有被连接的服务器Channel对象
8.处理客户端的请求
*/
// 1.创建3个服务器Channel对象,并绑定端口号
ServerSocketChannel ssc1 = ServerSocketChannel.open();
ssc1.bind(new InetSocketAddress(7777));

ServerSocketChannel ssc2 = ServerSocketChannel.open();
ssc2.bind(new InetSocketAddress(8888));

ServerSocketChannel ssc3 = ServerSocketChannel.open();
ssc3.bind(new InetSocketAddress(9999));

// 2.把3个服务器Channel对象设置成非阻塞
ssc1.configureBlocking(false);
ssc2.configureBlocking(false);
ssc3.configureBlocking(false);

// 3.获得Selector选择器
Selector selector = Selector.open();

// 4.把3个个服务器Channel对象对象注册到同一个Selector选择器上,指定监听事件
ssc1.register(selector, SelectionKey.OP_ACCEPT);
ssc2.register(selector, SelectionKey.OP_ACCEPT);
ssc3.register(selector, SelectionKey.OP_ACCEPT);

// 5.死循环去等待客户端的连接
while (true) {
// 服务器等待客户端连接
System.out.println(1);
selector.select();

// 6.获取所有被连接的服务器Channel对象的Set集合
Set<SelectionKey> keySet = selector.selectedKeys(); // 2

// 7.循环遍历所有被连接的服务器Channel对象,获取每一个被连接的服务器Channel对象
for (SelectionKey key : keySet) {// 遍历出7777端口 8888端口
// 8.由于SelectionKey是对Channel的封装,所以我们得根据key获取被连接的服务器Channel对象
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
// 9.处理客户端的请求
// 9.1 获取连接的客户端对象
SocketChannel sc = ssc.accept();
// 9.2 创建ByteBuffer缓冲数组
ByteBuffer b = ByteBuffer.allocate(1024);
// 9.3 读取数据
int len = sc.read(b);// 把读取到的字节数据存储到b缓冲数组中,返回读取到的字节个数
// 9.4 打印输出
System.out.println(new String(b.array(), 0, len));
// 10. 释放资源
sc.close();
}
}
}
}

  • 问题: Selector把所有被连接的服务器对象放在了一个Set集合中,但是使用完后并没有删除,导致在遍历集合时,遍历到已经没用的对象,出现了异常

  • 解决办法: 使用完了,应该从集合中删除,由于遍历的同时不能删除,所以使用迭代器进行遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class Server2 {
public static void main(String[] args) throws Exception {
/*
需求: 使用Selector进行多路复用,监听3个服务器端口
分析:
1.创建3个服务器Channel对象,并绑定端口号
2.把3个服务器Channel对象设置成非阻塞
3.获得Selector选择器
4.把3个个服务器Channel对象对象注册到同一个Selector选择器上,指定监听事件
5.死循环去等待客户端的连接
6.获取所有被连接的服务器Channel对象的Set集合
7.循环遍历所有被连接的服务器Channel对象
8.处理客户端的请求

- 问题: Selector把所有被连接的服务器对象放在了一个Set集合中,但是使用完后并没有删除,
导致在遍历集合时,遍历到已经没用的对象,出现了异常
- 解决办法: 使用完了,应该从集合中删除,由于遍历的同时不能删除,所以使用迭代器进行遍历
*/
// 1.创建3个服务器Channel对象,并绑定端口号
ServerSocketChannel ssc1 = ServerSocketChannel.open();
ssc1.bind(new InetSocketAddress(7777));

ServerSocketChannel ssc2 = ServerSocketChannel.open();
ssc2.bind(new InetSocketAddress(8888));

ServerSocketChannel ssc3 = ServerSocketChannel.open();
ssc3.bind(new InetSocketAddress(9999));

// 2.把3个服务器Channel对象设置成非阻塞
ssc1.configureBlocking(false);
ssc2.configureBlocking(false);
ssc3.configureBlocking(false);

// 3.获得Selector选择器
Selector selector = Selector.open();

// 4.把3个个服务器Channel对象对象注册到同一个Selector选择器上,指定监听事件
ssc1.register(selector, SelectionKey.OP_ACCEPT);
ssc2.register(selector, SelectionKey.OP_ACCEPT);
ssc3.register(selector, SelectionKey.OP_ACCEPT);

// 5.死循环去等待客户端的连接
while (true) {
// 服务器等待客户端连接
System.out.println(1);
selector.select();

// 6.获取所有被连接的服务器Channel对象的Set集合
Set<SelectionKey> keySet = selector.selectedKeys();

// 7.循环遍历所有被连接的服务器Channel对象,获取每一个被连接的服务器Channel对象
Iterator<SelectionKey> it = keySet.iterator();
// 迭代器的快捷键: itit
while (it.hasNext()){

// 遍历出来的SelectionKey
SelectionKey key = it.next();

// 8.由于SelectionKey是对Channel的封装,所以我们得根据key获取被连接的服务器Channel对象
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
// 9.处理客户端的请求
// 9.1 获取连接的客户端对象
SocketChannel sc = ssc.accept();
// 9.2 创建ByteBuffer缓冲数组
ByteBuffer b = ByteBuffer.allocate(1024);
// 9.3 读取数据
int len = sc.read(b);// 把读取到的字节数据存储到b缓冲数组中,返回读取到的字节个数
// 9.4 打印输出
System.out.println(new String(b.array(), 0, len));
// 10. 释放资源
sc.close();

// 用完了就得删除
it.remove();
}
}
}
}

NIO vs BIO

stream vs channel

  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
  • 二者均为全双工,即读写可以同时进行

IO 模型

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)
  • 阻塞 IO

当调用一次 channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:等待数据阶段、复制数据阶段

image-20230822192202975

1
2
3
4
5
6
7
8
9
10
网络编程中读取操作
用户程序:java程序,从网络中读取数据,并不是java程序真正干得活.得由操作系统去做。
用户空间 => 切换到 => 操作系统内核空间
数据读取:
- 等待数据:等待网络端把数据发送过来
- 复制数据:将数据从网卡复制到内存

=> 阻塞IO,用户线程在等待期间什么是事情都做不了

=> 阻塞IO => 线程自己去获取结果 => 同步(阻塞IO)

image-20230822192211625

  • 非阻塞 IO

    image-20230822192218711

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    比如while(true)去读,
    发现数据还没传输过来,读到0.
    继续循环
    用户线程始终在运行。没有阻塞。
    然后某次读取,读到数据了。到复制数据。
    复制数据还是会阻塞。
    等复制完毕,返回。线程就可以继续执行了。

    非阻塞IO => 有多次的用户空间和内存空间的切换。影响系统的性能

    => 非阻塞IO => 线程自己去获取结果 => 同步(非阻塞IO)
  • 多路复用

    image-20230822192227599

    1
    2
    3
    4
    5
    6
    一开始去看是否有事件,如果可读
    用户线程就去read读。
    然后复制数据,阻塞。等待数据复制完毕
    用户线程继续操作

    => 多路复用 => 线程自己去获取结果(read) => 同步(多路复用)
  • 信号驱动

  • 异步 IO

    image-20230822192234589

1
2
异步没有阻塞的说法
异步非阻塞
  • 阻塞 IO vs 多路复用

从图上看阻塞IO和多路复用。好像差别不大.而且多路复用还多了2次空间切换。那多路复用的优势在哪?

image-20230822192243236

1
2
3
因为如果阻塞IO,channel1复制完数据,channel2建立连接。
accept阻塞,channel1如果要去read读取数据,只能等channel2连接建立完。
做一件事的时候,做不了另一件事,这是阻塞IO的一个问题。

image-20230822192250261

1
2
selector最大的好处就是一个selector可以监测多个channel的不同事件。
而且,也不用在等待连接,等待数据。监听到事件肯定是有可连接,可读取的。直接建立连接,复制数据。要等待的时间在select等待事件中花费了。

零拷贝

操作系统中的零拷贝:

在 OS 层面上的 Zero-copy 通常指避免在 用户态(User-space) 与 内核态(Kernel-space) 之间来回拷贝数据. 例如 Linux 提供的 mmap 系统调用, 它可以将一段用户空间内存映射到内核空间, 当映射成功后, 用户对这段内存区域的修改可以直接反映到内核空间; 同样地, 内核空间对这段区域的修改也直接反映用户空间. 正因为有这样的映射关系, 我们就不需要在 用户态(User-space) 与 内核态(Kernel-space) 之间拷贝数据, 提高了数据传输的效率

image-20220116123617958

明显的图中有两步是多余的数据拷贝,通过java的FileChannel.transferTo方法(底层基于NIO),可以避免上面两次多余的拷贝(当然这需要底层操作系统的支持)

传统 IO 问题

传统的 IO 将一个文件通过 socket 写出

1
2
3
4
5
6
7
8
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);

内部工作流程是这样的:

image-20230822192126442

  1. java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据(文件data.txt)(从磁盘)读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu

    DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO

  2. 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA

  3. 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝

  4. 接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次

NIO 优化

通过 DirectByteBuf

  • ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存

image-20230822192133498

大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用

  • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

image-20230822192142001

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了 3 次

进一步优化(linux 2.4)

image-20230822191806539

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输

AIO

网络操作比如说,分为准备连接和读取数据2大步。NIO的selector解决的是把数据准备好了再通知我,而AIO优化的是读完了再通知我

  • AIO不会加快IO,只是读完通知,使用回调函数,进行业务处理

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

文件 AIO

先来看看 AsynchronousFileChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class AioDemo1 {
public static void main(String[] args) throws IOException {
try{
AsynchronousFileChannel s =
AsynchronousFileChannel.open(
Paths.get("1.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(2);
log.debug("begin...");
s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed...{}", result);
buffer.flip();
debug(buffer);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.debug("read failed...");
}
});

} catch (IOException e) {
e.printStackTrace();
}
log.debug("do other things...");
System.in.read();
}
}

输出

1
2
3
4
5
6
7
8
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin...
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things...
13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0d |a. |
+--------+-------------------------------------------------+----------------+

可以看到

  • 响应文件读取成功的是另一个线程 Thread-5
  • 主线程并没有 IO 操作阻塞

守护线程💡

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束

网络 AIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class AioServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
}

private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}

private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}

private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;

public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}

@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}

@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}

🔖 参考

书籍:UNIX 网络编程 - 卷 I