likes
comments
collection
share

🚀 Rust 构建简易实时聊天系统

作者站长头像
站长
· 阅读数 46

文章所有代码均在:github.com/rustify-org…

这篇文章将介绍如何用 Rust 编写一个简易的聊天系统。包括一个服务器端和一个客户端,服务器端负责监听和转发消息,客户端负责发送和接收消息。通过这篇文章,你将了解 Rust 中如何使用 TCP 连接来进行网络通信。

🧱 项目结构

🚀 Rust 构建简易实时聊天系统

🎉 最终效果

🚀 Rust 构建简易实时聊天系统

📦️ 初始化项目

mkdir chat
cd chat
mkdir crates
cd crates
cargo new client # 创建一个新的客户端项目
cargo new server # 创建一个新的服务端项目

在 Rust 中,使用 cargo new 命令可以创建一个新的项目。 cargo new xx

  • 创建一个新的 二进制(binary)项目
  • 生成的项目结构适用于编写一个可以直接运行的可执行程序。
  • src 目录下会有一个 main.rs 文件,这是程序的入口点,包含一个 fn main() {} 函数

cargo new xx --lib

  • 创建一个新的 库(library)项目
  • 生成的项目结构适用于编写一个库,可以被其他项目依赖。
  • src 目录下会有一个 lib.rs 文件,这是库的入口点,通常定义公共 API。

总结来说,cargo new xx 创建的是一个二进制项目,适用于开发可执行程序;cargo new xx --lib 创建的是一个库项目,适用于开发可以被其他项目依赖和使用的库。

🌱 编写服务端程序

导入相关库和定义基本常量

use std::io::{ErrorKind, Read, Write}; // 导入输入输出相关的库
use std::net::TcpListener; // 导入TCP监听器
use std::sync::mpsc; // 导入多线程消息传递
use std::time::Duration; // 导入时间处理
use std::{thread, vec}; // 导入线程和向量

const LOCAL_SERVER_PORT: &str = "127.0.0.1:8888"; // 定义服务器地址
const MSG_SIZE: usize = 1024; // 定义消息大小

创建 TcpListener

TcpListener 是一个用于监听 TCP 连接的对象。

绑定到指定的地址和端口

let server = TcpListener::bind(LOCAL_SERVER_PORT).expect("Failed to bind");
  • TcpListener::bind

TcpListener::bind 是一个关联函数(类似于静态方法),用于绑定到指定的地址和端口。服务器将在本地的 8888 端口上监听连接。如果绑定成功,它会返回一个 TcpListener 实例。

  • .expect

expect 方法用于处理 Result 类型的错误。 如果 TcpListener::bind 成功,它会返回 Ok(TcpListener),并将 TcpListener 实例赋值给 server 变量。 如果 TcpListener::bind 失败,它会返回 Err,expect 会打印错误消息 "Failed to bind" 并导致程序崩溃。

设置为非阻塞模式

server
    .set_nonblocking(true)
    .expect("Failed to set non-blocking");

set_nonblocking 方法用于设置 TcpListener 的阻塞模式。 当设置为 true 时,TcpListener 将处于非阻塞模式,这意味着调用诸如 accept 方法不会阻塞当前线程。 在非阻塞模式下,如果没有新连接可用,accept 方法会立即返回一个错误 WouldBlock,而不是阻塞等待。

为什么需要非阻塞模式 在这个聊天系统中,服务器需要同时处理多个客户端的连接和消息。如果使用阻塞模式,服务器在等待新连接时将无法处理已经连接的客户端消息,这会导致系统性能低下并无法实时响应多个客户端的请求。

通过将 TcpListener 设置为非阻塞模式,服务器可以在每次循环中快速检查是否有新连接或新消息,从而高效地处理多个客户端。

use std::net::TcpListener;

const LOCAL_SERVER_PORT: &str = "127.0.0.1:8888";

fn main() {
    // 绑定到指定的地址和端口
    let server = TcpListener::bind(LOCAL_SERVER_PORT).expect("Failed to bind");

    // 设置为非阻塞模式
    server
        .set_nonblocking(true)
        .expect("Failed to set non-blocking");

    // 主循环,用于处理客户端连接和消息
    loop {
        // 处理新连接和现有连接的消息
        // ...
    }
}

创建一个 vec 存储客户端

let mut clients: Vec<_> = vec![]; // 创建一个vec用于存储客户端

clients: Vec<_> :声明了一个类型为 Vec (向量)的变量 clients 。Vec 是 Rust 标准库提供的动态数组类型,可以方便地动态调整大小。

创建用于消息传递的多线程通道

mpsc::channel

mpsc(multiple producer, single consumer)模块提供了多生产者,单消费者的通道(channel),用于线程之间的通信。let (tx, rx) = mpsc::channel::(); 这句代码用于创建一个这样的通道,其中可以传递 String 类型的消息。

  1. mpsc::channel::()
  • mpsc::channel 是一个函数,用于创建一个新的通道。
  • 通道有两个端:发送端(sender)和接收端(receiver)。
  • 指定了这个通道将传递 String 类型的消息。
  1. let (tx, rx):
  • 这是一个模式匹配(pattern matching)语法,用于解构元组(tuple)。
  • tx 是发送端(sender),可以用来发送消息。
  • rx 是接收端(receiver),可以用来接收消息。
use std::sync::mpsc;
use std::thread;

fn main() {
    // 创建一个通道
    let (tx, rx) = mpsc::channel::<String>();

    // 创建一个新线程
    thread::spawn(move || {
        // 发送消息到通道
        tx.send(String::from("Hello from thread")).expect("Failed to send message");
    });

    // 接收并打印消息
    let received = rx.recv().expect("Failed to receive message");
    println!("Received: {}", received);
}

在这个示例中:

  1. mpsc::channel 创建了一个通道,用于在线程之间传递 String 类型的消息。
  2. 在主线程中,let (tx, rx) 创建了发送端 tx 和接收端 rx。
  3. thread::spawn 创建了一个新的线程,该线程使用 tx.send 发送消息到通道。
  4. 主线程使用 rx.recv 接收消息并打印出来。

在聊天系统中的应用:服务器使用 mpsc::channel 来实现多个客户端线程之间的消息传递。服务器接收每个客户端发送的消息,并通过通道将消息转发给所有连接的客户端。

use std::sync::mpsc;

const MSG_SIZE: usize = 1024;

fn main() {
    // 创建一个通道,用于在线程之间传递 String 类型的消息
    let (tx, rx) = mpsc::channel::<String>();

    loop {
        // 监听新的客户端连接
        if let Ok((mut socket, addr)) = server.accept() {
            println!("{} connected", addr);
            clients.push(socket.try_clone().expect("Failed to clone"));
            let tx = tx.clone(); // 克隆发送端,使得每个线程都有自己的发送端

            // 创建一个新线程处理客户端消息
            thread::spawn(move || loop {
                let mut buff = vec![0; MSG_SIZE];
                match socket.read_exact(&mut buff) {
                    Ok(_) => {
                        let msg: Vec<u8> = buff.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
                        let msg_string = String::from_utf8(msg).expect("Failed to convert");
                        println!("{}: {:?}", addr, msg_string);
                        tx.send(msg_string).expect("Failed to send"); // 发送消息到通道
                    }
                    Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
                    Err(_) => {
                        println!("{} disconnected", addr);
                        break;
                    }
                }
                thread::sleep(Duration::from_millis(100));
            });
        }

        // 从通道接收消息并转发给所有客户端
        if let Ok(msg) = rx.try_recv() {
            clients = clients.into_iter().filter_map(|mut client| {
                let mut buff = msg.clone().into_bytes();
                buff.resize(MSG_SIZE, 0);
                client.write_all(&buff).map(|_| client).ok()
            }).collect::<Vec<_>>();
        }
        thread::sleep(Duration::from_micros(100));
    }
}

在上述代码中:

  • 服务器使用 mpsc::channel 创建了一个通道 tx 和 rx。
  • 每当有新的客户端连接时,服务器会克隆 tx,并在新的线程中使用它。
  • 客户端发送的消息被读取后,通过 tx.send 发送到通道。
  • 主线程通过 rx.try_recv 接收消息,并将其转发给所有连接的客户端。

socket.try_clone()

socket.try_clone() 创建一个当前 TcpStream 的副本。这个副本可以在不同的线程中使用,而不影响原来的 TcpStream 对象。

为什么需要 try_clone

在多线程编程中,多个线程可能需要同时访问同一个 TcpStream 对象。直接在多个线程中使用同一个对象是不安全的,因为 TcpStream 是不可共享的。 使用 try_clone 方法可以创建一个独立的副本,使得每个线程可以使用自己的 TcpStream 对象,从而避免数据竞争。

try_clone 的工作原理

try_clone 方法调用底层操作系统的 dup(在 Unix 上)或 DuplicateHandle(在 Windows 上)来创建一个新的文件描述符,该描述符指向同一个底层套接字。 这样,原始的 TcpStream 和它的副本可以独立操作,但是它们共享同一个网络连接。 以下是一个简单的例子,展示了如何使用 try_clone 来在多个线程中使用同一个 TcpStream:

use std::io::{self, Write, Read};
use std::net::TcpStream;
use std::thread;

fn main() -> io::Result<()> {
    let stream = TcpStream::connect("127.0.0.1:8888")?;

    // 创建一个 TcpStream 的副本
    let mut stream_clone = stream.try_clone()?;

    // 在主线程中使用原始的 TcpStream
    thread::spawn(move || {
        let mut buffer = [0; 1024];
        stream_clone.read(&mut buffer).expect("Failed to read from stream");
        println!("Read from clone: {:?}", &buffer[..]);
    });

    // 在主线程中使用克隆的 TcpStream
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).expect("Failed to read from stream");
    println!("Read from original: {:?}", &buffer[..]);

    Ok(())
}

上述代码中,stream 被克隆为 stream_clone,然后在不同的线程中使用它们。每个线程都有自己的 TcpStream 副本,因此可以独立地进行读写操作,而不会发生数据竞争。 在聊天系统中的应用:服务器需要处理多个客户端连接。每个客户端连接都在一个独立的线程中进行读写操作。使用 try_clone 可以确保每个线程都有自己的 TcpStream 副本,从而安全地处理每个客户端的通信。

if let Ok((mut socket, addr)) = server.accept() {
    println!("{} connected", addr); // 打印连接的客户端地址
    clients.push(socket.try_clone().expect("Failed to clone")); // 克隆socket并存储到客户端向量中
    let tx = tx.clone(); // 克隆发送端
                            // 创建一个新线程处理客户端消息
    thread::spawn(move || loop {
        let mut buff = vec![0; MSG_SIZE]; // 创建一个缓冲区
        match socket.read_exact(&mut buff) {
            Ok(_) => {
                // 接收消息并转换为字符串
                let msg: Vec<u8> =
                    buff.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
                let msg_string = String::from_utf8(msg).expect("Failed to convert");
                println!("{}: {:?}", addr, msg_string); // 打印消息
                tx.send(msg_string).expect("Failed to send"); // 发送消息到通道
            }
            Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
            Err(_) => {
                // 处理断开连接
                println!("{} disconnected", addr);
                break;
            }
        }
        thread::sleep(Duration::from_millis(100)); // 休眠100毫秒
    });
}

在上述代码中,每当有新的客户端连接时,socket.try_clone() 创建一个新的 TcpStream 副本,并将其推入 clients 向量中。然后,每个客户端连接都在一个独立的线程中处理,使得服务器能够同时处理多个客户端的请求。

thread::sleep

为什么需要休眠一段时间呢?一句话概括:短暂的休眠有助于优化程序的性能、资源利用和稳定性。

  1. 避免过度频繁的执行:如果没有这个休眠,代码会以极高的频率不断尝试执行 server.accept() 和处理接收消息的逻辑,这可能会导致不必要的 CPU 资源消耗,而且在实际情况中,频繁的检查可能也不会立即产生新的结果。
  2. 给系统和其他线程一些执行时间:在多线程环境中,适当的休眠可以让其他线程有机会执行,以实现更平衡的资源利用和更流畅的并发操作。
  3. 减少竞争和错误:过于频繁的操作可能会增加竞争条件的风险,适当的休眠可以降低这种可能性,提高程序的稳定性和可靠性。

💫 编写客户端程序

客户端和服务器端处理逻辑类似,总结来说客户端需要连接到服务器,处理接受和发送消息和用户输入消息处理

导入所需的库和定义常量

use std::io::{self, ErrorKind, Read, Write}; // 导入输入输出相关的库
use std::net::TcpStream; // 导入TCP流
use std::sync::mpsc::{self, TryRecvError}; // 导入多线程消息传递
use std::time::Duration; // 导入时间处理
use std::{thread, vec}; // 导入线程和向量

const LOCAL_SERVER: &str = "127.0.0.1:8888"; // 定义服务器地址
const MSG_SIZE: usize = 1024; // 定义消息大小

连接到服务器

fn main() {
    // 连接到服务器
    let mut client = TcpStream::connect(LOCAL_SERVER).expect("Failed to connect");
    client
        .set_nonblocking(true)
        .expect("Failed to set non-blocking");
}

创建一个新线程处理接收和发送消息

读取服务器消息

在新线程的循环中,创建了一个固定大小的缓冲区 buff 。通过 client.read_exact(&mut buff) 尝试从服务器读取数据。

// 创建一个新线程处理接收和发送消息
std::thread::spawn(move || loop {
    let mut buff = vec![0; MSG_SIZE]; // 创建一个缓冲区
    // 读取服务器消息
    match client.read_exact(&mut buff) {
        // ...
    }
});

根据读取结果进行处理。如果读取成功,将缓冲区中的有效数据转换为字符串并打印。如果遇到 ErrorKind::WouldBlock 错误,表示当前没有数据可读取,不做处理。如果是其他错误,则认为客户端已断开连接,退出循环。

match client.read_exact(&mut buff) {
    Ok(_) => {
        let msg = buff.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
        let msg_string = String::from_utf8(msg.clone()).unwrap();
        println!("{}", msg_string); // 打印消息
    }
    Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
    Err(_) => {
        // 处理断开连接
        println!(">>Client closed");
        break;
    }
}

发送消息到服务器

// 发送消息到服务器
match rx.try_recv() {
    Ok(msg) => {
        let mut buff = msg.clone().into_bytes(); // 转换消息为字节数组
        buff.resize(MSG_SIZE, 0); // 调整缓冲区大小
        client.write_all(&buff).expect("Failed to send"); // 发送消息
        println!(">>{}", msg)
    }
    Err(TryRecvError::Empty) => (),
    Err(TryRecvError::Disconnected) => break,
}

通过 rx.try_recv 尝试从多线程通道接收消息。如果成功接收到消息,将其转换为字节数组,调整缓冲区大小,并发送到服务器,同时打印发送的消息。根据接收结果的不同错误类型进行相应的处理,如果是通道断开连接,则退出循环。

用户输入与消息处理

在主循环中,创建一个字符串缓冲区用于存储用户输入。通过 io::stdin().read_line(&mut buff) 读取用户输入,并去除多余的空格。

println!(">>Enter your message");
loop {
    let mut buff = String::new(); // 创建一个缓冲区用于存储用户输入
    io::stdin().read_line(&mut buff).expect("Failed to read"); // 读取用户输入
    let msg = buff.trim().to_string(); // 去除多余的空格
    // ...
}

根据用户输入的内容进行判断,如果是 "quit" 或者发送消息到通道失败,则退出循环。

if msg == "quit" || tx.send(msg).is_err() {
    // 如果输入"quit"或通道关闭,则退出循环
    break;
}

至此,我们运用 Rust 的特性和多线程机制,实现了客户端与服务器之间的消息接收和发送,以及对用户输入的处理,实现了非常简易的实时聊天系统。

文章所有代码均在:github.com/rustify-org…

「❤️ 感谢大家」

如果你觉得这篇内容对你挺有有帮助的话: 点赞支持下吧,让更多的人也能看到这篇内容(收藏不点赞,都是耍流氓 -_-)欢迎在留言区与我分享你的想法,也欢迎你在留言区记录你的思考过程。觉得不错的话,也可以阅读 Sunny 近期梳理的文章(感谢掘友的鼓励与支持 🌹🌹🌹):

我的博客:

Github:https://github.com/sunny-117/

前端八股文题库:sunny-117.github.io/blog/

前端面试手写题库:github.com/Sunny-117/j…

手写前端库源码教程:sunny-117.github.io/mini-anythi…

热门文章

专栏

转载自:https://juejin.cn/post/7389952004792434688
评论
请登录