阻塞模型与非阻塞模型 | 张扎瓦的博客

阻塞模型与非阻塞模型

Java NIO 中的阻塞模型与非阻塞模型的实现


阻塞与非阻塞

  传统的IO都是阻塞式的。也就是说,当一个线程调用read()或write()方法时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在网络通信进行IO操作时,由于线程会阻塞,所以服务端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。

阻塞模型

  Java NIO 是非阻塞模型的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞IO的空闲时间用于在其他通道上执行IO操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO可以让服务器端使用一个或者有限几个线程来同时处理连接到服务器端的所有客户端。

非阻塞模型

Selector

  选择器,用于监听客户端与服务端的连接。调用通道的register(Selector sel, int ops)方法,将通道注册到选择器上,参数ops是常量,用于设置选择器的监听事件。可以监听的事件类型列表:

事件名称 功能
SelectionKey.OP_READ
SelectionKey.OP_WRITE
SelectionKey.OP_CONNECT 连接
SelectionKey.OP_ACCEPT 接收

当监听多个事件时,可以使用|操作符连接

SelectionKey.OP_READ | SelectionKey.OP_WRITE

演示代码

阻塞方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void client() throws IOException {
try (
// 1.获取通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 10000));
FileChannel fileChannel = FileChannel.open(Paths.get("a.txt"), StandardOpenOption.READ);
) {
// 2.分配缓存
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (fileChannel.read(byteBuffer) != -1) {
byteBuffer.flip();
// 3.发送数据到服务端
socketChannel.write(byteBuffer);
byteBuffer.clear();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void server() {
try (
// 1.获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
FileChannel fileChannel = FileChannel.open(Paths.get("c.txt"),
StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
) {
// 2.绑定端口
serverSocketChannel.bind(new InetSocketAddress(10000));

// 3.获取客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();

// 4.获取客户端的数据,保存到本地
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (socketChannel.read(byteBuffer) != -1) {
byteBuffer.flip();
fileChannel.write(byteBuffer);
byteBuffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}

非阻塞模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Client {
public static void main(String[] args) {
try (
// 打开通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8888));
) {
// 设置非阻塞模式
socketChannel.configureBlocking(false);
// 创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 发送数据
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.next();
byteBuffer.put(msg.getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
byteBuffer.clear();
}

} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Server {
public static void main(String[] args) {
try (
// 打开通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
) {
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8888));
//设置非阻塞模式
serverSocketChannel.configureBlocking(false);
// 获取选择器
Selector selector = Selector.open();
// 注册连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (selector.select() > 0) {
// 遍历注册事件,进行业务处理
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isAcceptable()) { // 如果有客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 注册读取事件,读取客户端发送的数据
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { // 获取客户端的数据
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = 0;
while ((len = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip();
System.out.println("[客户端]:"+new String(byteBuffer.array(), 0, len));
byteBuffer.clear();
}
}

// 清除已处理的事件
it.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
如果我的文章对您有所帮助,不妨打赏一杯豆浆以资鼓励(○` 3′○)