🚀 Rust 构建简易实时聊天系统
文章所有代码均在:github.com/rustify-org…
这篇文章将介绍如何用 Rust 编写一个简易的聊天系统。包括一个服务器端和一个客户端,服务器端负责监听和转发消息,客户端负责发送和接收消息。通过这篇文章,你将了解 Rust 中如何使用 TCP 连接来进行网络通信。
🧱 项目结构
🎉 最终效果
📦️ 初始化项目
mkdir chat
cd chat
mkdir crates
cd crates
cargo new client # 创建一个新的客户端项目
cargo new server # 创建一个新的服务端项目
在 Rust 中,使用 cargo new 命令可以创建一个新的项目。 cargo new xx:
- 创建一个新的 二进制(binary)项目。
- 生成的项目结构适用于编写一个可以直接运行的可执行程序。
- src 目录下会有一个 main.rs 文件,这是程序的入口点,包含一个 fn main() {} 函数
cargo new xx --lib:
- 创建一个新的 库(library)项目。
- 生成的项目结构适用于编写一个库,可以被其他项目依赖。
- src 目录下会有一个 lib.rs 文件,这是库的入口点,通常定义公共 API。
总结来说,cargo new xx 创建的是一个二进制项目,适用于开发可执行程序;cargo new xx --lib 创建的是一个库项目,适用于开发可以被其他项目依赖和使用的库。
🌱 编写服务端程序
导入相关库和定义基本常量
use std::io::{ErrorKind, Read, Write}; // 导入输入输出相关的库
use std::net::TcpListener; // 导入TCP监听器
use std::sync::mpsc; // 导入多线程消息传递
use std::time::Duration; // 导入时间处理
use std::{thread, vec}; // 导入线程和向量
const LOCAL_SERVER_PORT: &str = "127.0.0.1:8888"; // 定义服务器地址
const MSG_SIZE: usize = 1024; // 定义消息大小
创建 TcpListener
TcpListener 是一个用于监听 TCP 连接的对象。
绑定到指定的地址和端口
let server = TcpListener::bind(LOCAL_SERVER_PORT).expect("Failed to bind");
- TcpListener::bind
TcpListener::bind 是一个关联函数(类似于静态方法),用于绑定到指定的地址和端口。服务器将在本地的 8888 端口上监听连接。如果绑定成功,它会返回一个 TcpListener 实例。
- .expect
expect 方法用于处理 Result 类型的错误。 如果 TcpListener::bind 成功,它会返回 Ok(TcpListener),并将 TcpListener 实例赋值给 server 变量。 如果 TcpListener::bind 失败,它会返回 Err,expect 会打印错误消息 "Failed to bind" 并导致程序崩溃。
设置为非阻塞模式
server
.set_nonblocking(true)
.expect("Failed to set non-blocking");
set_nonblocking 方法用于设置 TcpListener 的阻塞模式。 当设置为 true 时,TcpListener 将处于非阻塞模式,这意味着调用诸如 accept 方法不会阻塞当前线程。 在非阻塞模式下,如果没有新连接可用,accept 方法会立即返回一个错误 WouldBlock,而不是阻塞等待。
为什么需要非阻塞模式 在这个聊天系统中,服务器需要同时处理多个客户端的连接和消息。如果使用阻塞模式,服务器在等待新连接时将无法处理已经连接的客户端消息,这会导致系统性能低下并无法实时响应多个客户端的请求。
通过将 TcpListener 设置为非阻塞模式,服务器可以在每次循环中快速检查是否有新连接或新消息,从而高效地处理多个客户端。
use std::net::TcpListener;
const LOCAL_SERVER_PORT: &str = "127.0.0.1:8888";
fn main() {
// 绑定到指定的地址和端口
let server = TcpListener::bind(LOCAL_SERVER_PORT).expect("Failed to bind");
// 设置为非阻塞模式
server
.set_nonblocking(true)
.expect("Failed to set non-blocking");
// 主循环,用于处理客户端连接和消息
loop {
// 处理新连接和现有连接的消息
// ...
}
}
创建一个 vec 存储客户端
let mut clients: Vec<_> = vec![]; // 创建一个vec用于存储客户端
clients: Vec<_> :声明了一个类型为 Vec (向量)的变量 clients 。Vec 是 Rust 标准库提供的动态数组类型,可以方便地动态调整大小。
创建用于消息传递的多线程通道
mpsc::channel
mpsc(multiple producer, single consumer)模块提供了多生产者,单消费者的通道(channel),用于线程之间的通信。let (tx, rx) = mpsc::channel::(); 这句代码用于创建一个这样的通道,其中可以传递 String 类型的消息。
- mpsc::channel::()
- mpsc::channel 是一个函数,用于创建一个新的通道。
- 通道有两个端:发送端(sender)和接收端(receiver)。
- 指定了这个通道将传递 String 类型的消息。
- let (tx, rx):
- 这是一个模式匹配(pattern matching)语法,用于解构元组(tuple)。
- tx 是发送端(sender),可以用来发送消息。
- rx 是接收端(receiver),可以用来接收消息。
use std::sync::mpsc;
use std::thread;
fn main() {
// 创建一个通道
let (tx, rx) = mpsc::channel::<String>();
// 创建一个新线程
thread::spawn(move || {
// 发送消息到通道
tx.send(String::from("Hello from thread")).expect("Failed to send message");
});
// 接收并打印消息
let received = rx.recv().expect("Failed to receive message");
println!("Received: {}", received);
}
在这个示例中:
- mpsc::channel 创建了一个通道,用于在线程之间传递 String 类型的消息。
- 在主线程中,let (tx, rx) 创建了发送端 tx 和接收端 rx。
- thread::spawn 创建了一个新的线程,该线程使用 tx.send 发送消息到通道。
- 主线程使用 rx.recv 接收消息并打印出来。
在聊天系统中的应用:服务器使用 mpsc::channel 来实现多个客户端线程之间的消息传递。服务器接收每个客户端发送的消息,并通过通道将消息转发给所有连接的客户端。
use std::sync::mpsc;
const MSG_SIZE: usize = 1024;
fn main() {
// 创建一个通道,用于在线程之间传递 String 类型的消息
let (tx, rx) = mpsc::channel::<String>();
loop {
// 监听新的客户端连接
if let Ok((mut socket, addr)) = server.accept() {
println!("{} connected", addr);
clients.push(socket.try_clone().expect("Failed to clone"));
let tx = tx.clone(); // 克隆发送端,使得每个线程都有自己的发送端
// 创建一个新线程处理客户端消息
thread::spawn(move || loop {
let mut buff = vec![0; MSG_SIZE];
match socket.read_exact(&mut buff) {
Ok(_) => {
let msg: Vec<u8> = buff.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
let msg_string = String::from_utf8(msg).expect("Failed to convert");
println!("{}: {:?}", addr, msg_string);
tx.send(msg_string).expect("Failed to send"); // 发送消息到通道
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
Err(_) => {
println!("{} disconnected", addr);
break;
}
}
thread::sleep(Duration::from_millis(100));
});
}
// 从通道接收消息并转发给所有客户端
if let Ok(msg) = rx.try_recv() {
clients = clients.into_iter().filter_map(|mut client| {
let mut buff = msg.clone().into_bytes();
buff.resize(MSG_SIZE, 0);
client.write_all(&buff).map(|_| client).ok()
}).collect::<Vec<_>>();
}
thread::sleep(Duration::from_micros(100));
}
}
在上述代码中:
- 服务器使用 mpsc::channel 创建了一个通道 tx 和 rx。
- 每当有新的客户端连接时,服务器会克隆 tx,并在新的线程中使用它。
- 客户端发送的消息被读取后,通过 tx.send 发送到通道。
- 主线程通过 rx.try_recv 接收消息,并将其转发给所有连接的客户端。
socket.try_clone()
socket.try_clone() 创建一个当前 TcpStream 的副本。这个副本可以在不同的线程中使用,而不影响原来的 TcpStream 对象。
为什么需要 try_clone
在多线程编程中,多个线程可能需要同时访问同一个 TcpStream 对象。直接在多个线程中使用同一个对象是不安全的,因为 TcpStream 是不可共享的。 使用 try_clone 方法可以创建一个独立的副本,使得每个线程可以使用自己的 TcpStream 对象,从而避免数据竞争。
try_clone 的工作原理
try_clone 方法调用底层操作系统的 dup(在 Unix 上)或 DuplicateHandle(在 Windows 上)来创建一个新的文件描述符,该描述符指向同一个底层套接字。 这样,原始的 TcpStream 和它的副本可以独立操作,但是它们共享同一个网络连接。 以下是一个简单的例子,展示了如何使用 try_clone 来在多个线程中使用同一个 TcpStream:
use std::io::{self, Write, Read};
use std::net::TcpStream;
use std::thread;
fn main() -> io::Result<()> {
let stream = TcpStream::connect("127.0.0.1:8888")?;
// 创建一个 TcpStream 的副本
let mut stream_clone = stream.try_clone()?;
// 在主线程中使用原始的 TcpStream
thread::spawn(move || {
let mut buffer = [0; 1024];
stream_clone.read(&mut buffer).expect("Failed to read from stream");
println!("Read from clone: {:?}", &buffer[..]);
});
// 在主线程中使用克隆的 TcpStream
let mut buffer = [0; 1024];
stream.read(&mut buffer).expect("Failed to read from stream");
println!("Read from original: {:?}", &buffer[..]);
Ok(())
}
上述代码中,stream 被克隆为 stream_clone,然后在不同的线程中使用它们。每个线程都有自己的 TcpStream 副本,因此可以独立地进行读写操作,而不会发生数据竞争。 在聊天系统中的应用:服务器需要处理多个客户端连接。每个客户端连接都在一个独立的线程中进行读写操作。使用 try_clone 可以确保每个线程都有自己的 TcpStream 副本,从而安全地处理每个客户端的通信。
if let Ok((mut socket, addr)) = server.accept() {
println!("{} connected", addr); // 打印连接的客户端地址
clients.push(socket.try_clone().expect("Failed to clone")); // 克隆socket并存储到客户端向量中
let tx = tx.clone(); // 克隆发送端
// 创建一个新线程处理客户端消息
thread::spawn(move || loop {
let mut buff = vec![0; MSG_SIZE]; // 创建一个缓冲区
match socket.read_exact(&mut buff) {
Ok(_) => {
// 接收消息并转换为字符串
let msg: Vec<u8> =
buff.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
let msg_string = String::from_utf8(msg).expect("Failed to convert");
println!("{}: {:?}", addr, msg_string); // 打印消息
tx.send(msg_string).expect("Failed to send"); // 发送消息到通道
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
Err(_) => {
// 处理断开连接
println!("{} disconnected", addr);
break;
}
}
thread::sleep(Duration::from_millis(100)); // 休眠100毫秒
});
}
在上述代码中,每当有新的客户端连接时,socket.try_clone() 创建一个新的 TcpStream 副本,并将其推入 clients 向量中。然后,每个客户端连接都在一个独立的线程中处理,使得服务器能够同时处理多个客户端的请求。
thread::sleep
为什么需要休眠一段时间呢?一句话概括:短暂的休眠有助于优化程序的性能、资源利用和稳定性。
- 避免过度频繁的执行:如果没有这个休眠,代码会以极高的频率不断尝试执行 server.accept() 和处理接收消息的逻辑,这可能会导致不必要的 CPU 资源消耗,而且在实际情况中,频繁的检查可能也不会立即产生新的结果。
- 给系统和其他线程一些执行时间:在多线程环境中,适当的休眠可以让其他线程有机会执行,以实现更平衡的资源利用和更流畅的并发操作。
- 减少竞争和错误:过于频繁的操作可能会增加竞争条件的风险,适当的休眠可以降低这种可能性,提高程序的稳定性和可靠性。
💫 编写客户端程序
客户端和服务器端处理逻辑类似,总结来说客户端需要连接到服务器,处理接受和发送消息和用户输入消息处理
导入所需的库和定义常量
use std::io::{self, ErrorKind, Read, Write}; // 导入输入输出相关的库
use std::net::TcpStream; // 导入TCP流
use std::sync::mpsc::{self, TryRecvError}; // 导入多线程消息传递
use std::time::Duration; // 导入时间处理
use std::{thread, vec}; // 导入线程和向量
const LOCAL_SERVER: &str = "127.0.0.1:8888"; // 定义服务器地址
const MSG_SIZE: usize = 1024; // 定义消息大小
连接到服务器
fn main() {
// 连接到服务器
let mut client = TcpStream::connect(LOCAL_SERVER).expect("Failed to connect");
client
.set_nonblocking(true)
.expect("Failed to set non-blocking");
}
创建一个新线程处理接收和发送消息
读取服务器消息
在新线程的循环中,创建了一个固定大小的缓冲区 buff 。通过 client.read_exact(&mut buff) 尝试从服务器读取数据。
// 创建一个新线程处理接收和发送消息
std::thread::spawn(move || loop {
let mut buff = vec![0; MSG_SIZE]; // 创建一个缓冲区
// 读取服务器消息
match client.read_exact(&mut buff) {
// ...
}
});
根据读取结果进行处理。如果读取成功,将缓冲区中的有效数据转换为字符串并打印。如果遇到 ErrorKind::WouldBlock 错误,表示当前没有数据可读取,不做处理。如果是其他错误,则认为客户端已断开连接,退出循环。
match client.read_exact(&mut buff) {
Ok(_) => {
let msg = buff.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
let msg_string = String::from_utf8(msg.clone()).unwrap();
println!("{}", msg_string); // 打印消息
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
Err(_) => {
// 处理断开连接
println!(">>Client closed");
break;
}
}
发送消息到服务器
// 发送消息到服务器
match rx.try_recv() {
Ok(msg) => {
let mut buff = msg.clone().into_bytes(); // 转换消息为字节数组
buff.resize(MSG_SIZE, 0); // 调整缓冲区大小
client.write_all(&buff).expect("Failed to send"); // 发送消息
println!(">>{}", msg)
}
Err(TryRecvError::Empty) => (),
Err(TryRecvError::Disconnected) => break,
}
通过 rx.try_recv 尝试从多线程通道接收消息。如果成功接收到消息,将其转换为字节数组,调整缓冲区大小,并发送到服务器,同时打印发送的消息。根据接收结果的不同错误类型进行相应的处理,如果是通道断开连接,则退出循环。
用户输入与消息处理
在主循环中,创建一个字符串缓冲区用于存储用户输入。通过 io::stdin().read_line(&mut buff) 读取用户输入,并去除多余的空格。
println!(">>Enter your message");
loop {
let mut buff = String::new(); // 创建一个缓冲区用于存储用户输入
io::stdin().read_line(&mut buff).expect("Failed to read"); // 读取用户输入
let msg = buff.trim().to_string(); // 去除多余的空格
// ...
}
根据用户输入的内容进行判断,如果是 "quit" 或者发送消息到通道失败,则退出循环。
if msg == "quit" || tx.send(msg).is_err() {
// 如果输入"quit"或通道关闭,则退出循环
break;
}
至此,我们运用 Rust 的特性和多线程机制,实现了客户端与服务器之间的消息接收和发送,以及对用户输入的处理,实现了非常简易的实时聊天系统。
文章所有代码均在:github.com/rustify-org…
「❤️ 感谢大家」
如果你觉得这篇内容对你挺有有帮助的话: 点赞支持下吧,让更多的人也能看到这篇内容(收藏不点赞,都是耍流氓 -_-)欢迎在留言区与我分享你的想法,也欢迎你在留言区记录你的思考过程。觉得不错的话,也可以阅读 Sunny 近期梳理的文章(感谢掘友的鼓励与支持 🌹🌹🌹):
我的博客:
Github:https://github.com/sunny-117/
前端八股文题库:sunny-117.github.io/blog/
前端面试手写题库:github.com/Sunny-117/j…
手写前端库源码教程:sunny-117.github.io/mini-anythi…
热门文章
专栏
转载自:https://juejin.cn/post/7389952004792434688