likes
comments
collection
share

Rust:实现一个实时聊天系统(server)

作者站长头像
站长
· 阅读数 10

作者:Ahmad Rosid

原文链接:blog.logrocket.com/real-time-c…

实时聊天应用简介

实时聊天应用允许用户通过文本、语音或视频实时相互通信。这类应用相比电子邮件或即时消息等其他通信方式,允许更即时的消息传递。

聊天应用必须实时工作的几个原因:

  • 提高性能:更即时的通信允许更自然的对话
  • 更高的响应性:实时功能结果在改善用户体验
  • 更高的可靠性:有了实时功能,消息丢失或延迟的机会更少

WebSockets简介

WebSockets使得客户端和服务器在实时聊天应用中能够进行双向通信。使用Rust构建WebSocket服务器将使服务器能够处理大量连接而不会变慢。这得益于Rust的速度和可靠性。

现在我们对WebSockets有了更好的理解,让我们开始构建我们的实时聊天应用吧!

入门

首先,让我们回顾一下一些先决条件:

Rust:确保你的电脑上安装了Rust。如果没有,请使用以下命令安装:

curl --proto '=https' --tlsv1.2 -sSf [https://sh.rustup.rs](https://sh.rustup.rs/) | sh 
// 如果你在Windows上,请在这里查看更多安装方法 <https://forge.rust-lang.org/infra/other-installation-methods.html>

接下来,运行以下命令以验证一切是否安装并正常工作:

rustc --version
cargo --version

设计实时聊天应用架构

让我们为我们的实时聊天应用创建一些设计架构。我们将构建一个简单的服务器;我们应用的架构将涵盖以下功能:

  • 聊天:通过直接消息在两个用户之间
  • 打字指示器:当用户开始向他们发送聊天时通知接收者
  • 用户状态:指示用户是在线还是离线

Rust:实现一个实时聊天系统(server) 这个架构非常简单,易于遵循。它由几个组件组成:

  • WebSocket服务器:这是我们应用中最重要的组件;它处理客户端和房间之间的所有通信
  • 房间管理器:这个组件负责管理我们应用中的所有房间。它将创建、更新和删除房间。这个组件将在HTTP服务器上
  • 用户管理器:这个组件负责管理我们应用中的所有用户。它将创建、更新和删除用户。这个组件也将在HTTP服务器上
  • 消息管理器:这个组件负责管理我们应用中的所有消息。它将创建、更新和删除消息。这个组件将在WebSocket服务器和HTTP服务器上。它将用于存储从WebSockets接收的传入消息,并在用户通过Rest API打开聊天室时检索数据库中已有的所有消息

用Rust构建WebSocket服务器

我们可以使用许多包在Rust中编写WebSocket服务器。对于本教程,我们将使用Actix Web;它是一个成熟的包,易于使用。

首先,使用以下命令创建一个Rust项目:

cargo new rust-react-chat

接下来,在Cargo.toml文件中添加这个包:

[package]
name = "rust-react-chat"
version = "0.1.0"
edition = "2021"

[dependencies]
actix = "0.13.0"
actix-files = "0.6.2"
actix-web = "4.2.1"
actix-web-actors = "4.1.0"
rand = "0.8.5"
serde = "1.0.147"
serde_json = "1.0.88"

现在,安装diesel_cli;我们将使用它作为我们的ORM:

cargo install diesel_cli --no-default-features --features sqlite

以下是项目结构应该的样子:

.
├── Cargo.lock
├── Cargo.toml
├── README.md
├── chat.db
├── .env
└── src
    ├── db.rs
    ├── main.rs
    ├── models.rs
    ├── routes.rs
    ├── schema.rs
    ├── server.rs
    └── session.rs
└── static
└── ui

现在,这里有一些关于文件夹的信息:

  • src:这个文件夹包含我们所有的Rust代码
  • static:这个文件夹包含我们所有的静态资产,HTML文件,JavaScript文件和图片
  • ui:这个文件夹包含我们的React代码;我们稍后将其编译为静态文件并导出到static文件夹

接下来,让我们编写我们WebSocket服务器的入口点:

// src/main.rs
#[macro_use]
extern crate diesel;
use actix::*;
use actix_cors::Cors;
use actix_files::Files;
use actix_web::{web, http, App, HttpServer};
use diesel::{
    prelude::*,
    r2d2::{self, ConnectionManager},
};
mod db;
mod models;
mod routes;
mod schema;
mod server;
mod session;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let server = server::ChatServer::new().start();
    let conn_spec = "chat.db";
    let manager = ConnectionManager::<SqliteConnection>::new(conn_spec);
    let pool = r2d2::Pool::builder().build(manager).expect("Failed to create pool.");
    let server_addr = "127.0.0.1";
    let server_port = 8080;
    let app = HttpServer::new(move || {
        let cors = Cors::default()
            .allowed_origin("http://localhost:3000")
            .allowed_origin("http://localhost:8080")
            .allowed_methods(vec!["GET", "POST"])
            .allowed_headers(vec![http::header::AUTHORIZATION, http::header::ACCEPT])
            .allowed_header(http::header::CONTENT_TYPE)
            .max_age(3600);
        App::new()
            .app_data(web::Data::new(server.clone()))
            .app_data(web::Data::new(pool.clone()))
            .wrap(cors)
            .service(web::resource("/").to(routes::index))
            .route("/ws", web::get().to(routes::chat_server))
            .service(routes::create_user)
            .service(routes::get_user_by_id)
            .service(routes::get_user_by_phone)
            .service(routes::get_conversation_by_id)
            .service(routes::get_rooms)
            .service(Files::new("/", "./static"))
    })
    .workers(2)
    .bind((server_addr, server_port))?
    .run();
    println!("Server running at http://{server_addr}:{server_port}/");
    app.await
}

这里有一些我们正在使用的包的信息:

  • actix_cors:将用于调试UI;我们将接受来自localhost:3000或localhost:8080的POST和GET请求
  • actix_web:用于Actix Web包中所有HTTP相关功能
  • actix_files:用于将静态文件嵌入到我们的某个路由中
  • diesel:将用于从我们的SQLite数据库查询数据。如果你愿意,你可以将其更改为Postgres或MySQL
  • serde_json:将用于解析我们将发送给React应用的JSON数据

创建路由

现在,让我们为我们的服务器创建路由。由于我们将使用REST HTTP和WebSocket服务器,我们可以轻松地将所有内容放在一个文件中。

首先,添加我们需要的所有包:

// src/routes.rs
use std::time::Instant;
use actix::*;
use actix_files::NamedFile;
use actix_web::{get, post, web, Error, HttpRequest, HttpResponse, Responder};
use actix_web_actors::ws;
use diesel::{
    prelude::*,
    r2d2::{self, ConnectionManager},
};
use serde_json::json;
use uuid::Uuid;
use crate::db;
use crate::models;
use crate::server;
use crate::session;
type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;

然后,添加一个路由,将首页嵌入到根URL:

// src/routes.rs
pub async fn index() -> impl Responder {
    NamedFile::open_async("./static/index.html").await.unwrap()
}

这是我们WebSocket服务器的入口点。现在它位于/ws路由上,但你可以根据喜好更改为任何路由名称。由于我们已经在main.rs文件中注册了所有需要的依赖项,我们可以直接将依赖项作为函数参数传递,如下所示:

// src/routes.rs
pub async fn chat_server(
    req: HttpRequest,
    stream: web::Payload,
    pool: web::Data<DbPool>,
    srv: web::Data<Addr<server::ChatServer>>,
) -> Result<HttpResponse, Error> {
    ws::start(
        session::WsChatSession {
            id: 0,
            hb: Instant::now(),
            room: "main".to_string(),
            name: None,
            addr: srv.get_ref().clone(),
            db_pool: pool,
        },
        &req,
        stream
    )
}

接下来,我们需要在我们的路由中添加一个REST API,以获取使我们的聊天工作所需的数据:

// src/routes.rs
#[post("/users/create")]
pub async fn create_user(
    pool: web::Data<DbPool>,
    form: web::Json<models::NewUser>,
) -> Result<HttpResponse, Error> {
    let user = web::block(move || {
        let mut conn = pool.get()?;
        db::insert_new_user(&mut conn, &form.username, &form.phone)
    })
    .await?
    .map_err(actix_web::error::ErrorUnprocessableEntity)?;
    Ok(HttpResponse::Ok().json(user))
}
#[get("/users/{user_id}")]
pub async fn get_user_by_id(
    pool: web::Data<DbPool>,
    id: web::Path<Uuid>,
) -> Result<HttpResponse, Error> {
    let user_id = id.to_owned();
    let user = web::block(move || {
        let mut conn = pool.get()?;
        db::find_user_by_uid(&mut conn, user_id)
    })
    .await?
    .map_err(actix_web::error::ErrorInternalServerError)?;
    if let Some(user) = user {
        Ok(HttpResponse::Ok().json(user))
    } else {
        let res = HttpResponse::NotFound().body(
            json!({
                "error": 404,
                "message": format!("No user found with phone: {id}")
            })
            .to_string(),
        );
        Ok(res)
    }
}
#[get("/conversations/{uid}")]
pub async fn get_conversation_by_id(
    pool: web::Data<DbPool>,
    uid: web::Path<Uuid>,
) -> Result<HttpResponse, Error> {
    let room_id = uid.to_owned();
    let conversations = web::block(move || {
        let mut conn = pool.get()?;
        db::get_conversation_by_room_uid(&mut conn, room_id)
    })
    .await?
    .map_err(actix_web::error::ErrorInternalServerError)?;
    if let Some(data) = conversations {
        Ok(HttpResponse::Ok().json(data))
    } else {
        let res = HttpResponse::NotFound().body(
            json!({
                "error": 404,
                "message": format!("No conversation with room_id: {room_id}")
            })
            .to_string(),
        );
        Ok(res)
    }
}
#[get("/users/phone/{user_phone}")]
pub async fn get_user_by_phone(
    pool: web::Data<DbPool>,
    phone: web::Path<String>,
) -> Result<HttpResponse, Error> {
    let user_phone = phone.to_string();
    let user = web::block(move || {
        let mut conn = pool.get()?;
        db::find_user_by_phone(&mut conn, user_phone)
    })
    .await?
    .map_err(actix_web::error::ErrorInternalServerError)?;
    if let Some(user) = user {
        Ok(HttpResponse::Ok().json(user))
    } else {
        let res = HttpResponse::NotFound().body(
            json!({
                "error": 404,
                "message": format!("No user found with phone: {}", phone.to_string())
            })
            .to_string(),
        );
        Ok(res)
    }
}
#[get("/rooms")]
pub async fn get_rooms(
    pool: web::Data<DbPool>,
) -> Result<HttpResponse, Error> {
    let rooms = web::block(move || {
        let mut conn = pool.get()?;
        db::get_all_rooms(&mut conn)
    })
    .await?
    .map_err(actix_web::error::ErrorInternalServerError)?;
    if !rooms.is_empty() {
        Ok(HttpResponse::Ok().json(rooms))
    } else {
        let res = HttpResponse::NotFound().body(
            json!({
                "error": 404,
                "message": "No rooms available at the moment.",
            })
            .to_string(),
        );
        Ok(res)
    }
}

现在,让我们处理WebSocket连接。首先,让我们再次导入所有需要的包:

// src/server.rs
use std::collections::{HashMap, HashSet};
use serde_json::json;
use actix::prelude::*;
use rand::{self, rngs::ThreadRng, Rng};
use crate::session;
#[derive(Message)]
#[rtype(result = "()")]
pub struct Message(pub String);
#[derive(Message)]
#[rtype(usize)]
pub struct Connect {
    pub addr: Recipient<Message>,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct Disconnect {
    pub id: usize,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct ClientMessage {
    pub id: usize,
    pub msg: String,
    pub room: String,
}
pub struct ListRooms;
impl actix::Message for ListRooms {
    type Result = Vec<String>;
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct Join {
    pub id: usize,
    pub name: String,
}

接下来,让我们实现一个trait来管理WebSocket连接。这段代码将处理来自用户的所有消息,并将它们发送回聊天室中的参与者:

// src/server.rs
#[derive(Debug)]
pub struct ChatServer {
    sessions: HashMap<usize, Recipient<Message>>,
    rooms: HashMap<String, HashSet<usize>>,
    rng: ThreadRng,
}
impl ChatServer {
    pub fn new() -> ChatServer {
        let mut rooms = HashMap::new();
        rooms.insert("main".to_string(), HashSet::new());
        Self {
            sessions: HashMap::new(),
            rooms,
            rng: rand::thread_rng()
        }
    }
    fn send_message(&self, room: &str, message: &str, skip_id: usize) {
        if let Some(sessions) = self.rooms.get(room) {
            for id in sessions {
                if *id != skip_id {
                    if let Some(addr) = self.sessions.get(id) {
                        addr.do_send(Message(message.to_owned()));
                    }
                }
            }
        }
    }
}
impl Actor for ChatServer {
    type Context = Context<Self>;
}
impl Handler<Connect> for ChatServer {
    type Result = usize;
    fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
        let id = self.rng.gen::<usize>();
        self.sessions.insert(id, msg.addr);
        self.rooms
            .entry("main".to_string())
            .or_insert_with(HashSet::new)
            .insert(id);
        self.send_message("main", &json!({
            "value": vec![format!("{}", id)],
            "chat_type": session::ChatType::CONNECT
        }).to_string(), 0);
        id
    }
}
impl Handler<Disconnect> for ChatServer {
    type Result = ();
    fn handle(&mut self, msg: Disconnect, _: &mut Self::Context) -> Self::Result {
        let mut rooms: Vec<String> = vec![];
        if self.sessions.remove(&msg.id).is_some() {
            for (name, sessions) in &mut self.rooms {
                if sessions.remove(&msg.id) {
                    rooms.push(name.to_owned());
                }
            }
        }
        for room in rooms {
            self.send_message("main", &json!({
                "room": room,
                "value": vec![format!("Someone disconnect!")],
                "chat_type": session::ChatType::DISCONNECT
            }).to_string(), 0);
        }
    }
}
impl Handler<ClientMessage> for ChatServer {
    type Result = ();
    fn handle(&mut self, msg: ClientMessage, _: &mut Self::Context) -> Self::Result {
        self.send_message(&msg.room, &msg.msg, msg.id);
    }
}
impl Handler<ListRooms> for ChatServer {
    type Result = MessageResult<ListRooms>;
    fn handle(&mut self, _: ListRooms, _: &mut Self::Context) -> Self::Result {
        let mut rooms = vec![];
        for key in self.rooms.keys() {
            rooms.push(key.to_owned());
        }
        MessageResult(rooms)
    }
}
impl Handler<Join> for ChatServer {
    type Result = ();
    fn handle(&mut self, msg: Join, _: &mut Self::Context) -> Self::Result {
        let Join {id, name} = msg;
        let mut rooms = vec![];
        for (n, sessions) in &mut self.rooms {
            if sessions.remove(&id) {
                rooms.push(n.to_owned());
            }
        }
        for room in rooms {
            self.send_message(&room, &json!({
                "room": room,
                "value": vec![format!("Someone disconnect!")],
                "chat_type": session::ChatType::DISCONNECT
            }).to_string(), 0);
        }
        self.rooms
            .entry(name.clone())
            .or_insert_with(HashSet::new)
            .insert(id);
    }
}

处理用户会话

现在,让我们来处理用户会话。在这里,我们将接收一条消息,将其保存到数据库中,然后将其发送回聊天室中的参与者。

首先,导入所有包:

// src/session.rs
use std::time::{Duration, Instant};
use actix::prelude::*;
use actix_web::web;
use actix_web_actors::ws;
use serde::{Deserialize, Serialize};
use diesel::{
    prelude::*,
    r2d2::{self, ConnectionManager},
};
use crate::db;
use crate::models::NewConversation;
use crate::server;

你可以在这里更改与WebSocket的连接持续时间。因此,HEARTBEAT是保持与客户端连接活跃的持续时间。CLIENT_TIMEOUT是检查客户端是否仍然连接的持续时间:

// src/session.rs
const HEARBEET: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;

现在让我们创建一些结构体来存储我们需要的所有数据:

// src/session.rs
#[derive(Debug)]
pub struct WsChatSession {
    pub id: usize,
    pub hb: Instant,
    pub room: String,
    pub name: Option<String>,
    pub addr: Addr<server::ChatServer>,
    pub db_pool: web::Data<DbPool>,
}
#[derive(PartialEq, Serialize, Deserialize)]
pub enum ChatType {
    TYPING,
    TEXT,
    CONNECT,
    DISCONNECT,
}
#[derive(Serialize, Deserialize)]
struct ChatMessage {
    pub chat_type: ChatType,
    pub value: Vec<String>,
    pub room_id: String,
    pub user_id: String,
    pub id: usize,
}

这个结构体将用于以下目的:

  • WsChatSession:为Actix Web actor制作自定义实现
  • ChatMessage:定义将发送给用户和从用户接收的对象
  • 现在,让我们实现我们会话的Actor和流处理器:
// src/session.rs
impl Actor for WsChatSession {
    type Context = ws::WebsocketContext<Self>;
    fn started(&mut self, ctx: &mut Self::Context) {
        self.hb(ctx);
        let addr = ctx.address();
        self.addr
            .send(server::Connect {
                addr: addr.recipient(),
            })
            .into_actor(self)
            .then(|res, act, ctx| {
                match res {
                    Ok(res) => act.id = res,
                    _ => ctx.stop(),
                }
                fut::ready(())
            })
            .wait(ctx);
    }
    fn stopping(&mut self, _: &mut Self::Context) -> Running {
        self.addr.do_send(server::Disconnect { id: self.id });
        Running::Stop
    }
}
impl Handler<server::Message> for WsChatSession {
    type Result = ();
    fn handle(&mut self, msg: server::Message, ctx: &mut Self::Context) -> Self::Result {
        ctx.text(msg.0);
    }
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
    fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        let msg = match item {
            Err(_) => {
                ctx.stop();
                return;
            }
            Ok(msg) => msg,
        };
        match msg {
            ws::Message::Ping(msg) => {
                self.hb = Instant::now();
                ctx.pong(&msg);
            }
            ws::Message::Pong(_) => {
                self.hb = Instant::now();
            }
            ws::Message::Text(text) => {
                let data_json = serde_json::from_str::<ChatMessage>(&text.to_string());
                if let Err(err) = data_json {
                    println!("{err}");
                    println!("Failed to parse message: {text}");
                    return;
                }
                let input = data_json.as_ref().unwrap();
                match &input.chat_type {
                    ChatType::TYPING => {
                        let chat_msg = ChatMessage {
                            chat_type: ChatType::TYPING,
                            value: input.value.to_vec(),
                            id: self.id,
                            room_id: input.room_id.to_string(),
                            user_id: input.user_id.to_string(),
                        };
                        let msg = serde_json::to_string(&chat_msg).unwrap();
                        self.addr.do_send(server::ClientMessage {
                            id: self.id,
                            msg,
                            room: self.room.clone(),
                        })
                    }
                    ChatType::TEXT => {
                        let input = data_json.as_ref().unwrap();
                        let chat_msg = ChatMessage {
                            chat_type: ChatType::TEXT,
                            value: input.value.to_vec(),
                            id: self.id,
                            room_id: input.room_id.to_string(),
                            user_id: input.user_id.to_string(),
                        };
                        let mut conn = self.db_pool.get().unwrap();
                        let new_conversation = NewConversation {
                            user_id: input.user_id.to_string(),
                            room_id: input.room_id.to_string(),
                            message: input.value.join(""),
                        };
                        let _ = db::insert_new_conversation(&mut conn, new_conversation);
                        let msg = serde_json::to_string(&chat_msg).unwrap();
                        self.addr.do_send(server::ClientMessage {
                            id: self.id,
                            msg,
                            room: self.room.clone(),
                        })
                    }
                    _ => {}
                }
            }
            ws::Message::Binary(_) => println!("Unsupported binary"),
            ws::Message::Close(reason) => {
                ctx.close(reason);
                ctx.stop();
            }
            ws::Message::Continuation(_) => {
                ctx.stop();
            }
            ws::Message::Nop => (),
        }
    }
}
impl WsChatSession {
    fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
        ctx.run_interval(HEARBEET, |act, ctx| {
            if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
                act.addr.do_send(server::Disconnect { id: act.id });
                ctx.stop();
                return;
            }
            ctx.ping(b"");
        });
    }
}

准备数据库

接下来,让我们准备数据库。我们将使用SQLite来让其变得简单。看起来是这样:

Rust:实现一个实时聊天系统(server) 该表将用于以下目的:

  • users:存储用户数据。由于我们这次不实现完整的认证系统,我们暂时只保存用户名和电话号码

  • rooms:存储所有聊天室的列表

  • conversations:列出我们数据库中存储的所有消息

    接下来,让我们为我们的模式生成数据库迁移:

// shell
diesel migration generate create_users
diesel migration generate create_rooms
diesel migration generate create_conversations

这是迁移SQL:

-- migrations/2022-11-21-101206_create_users/up.sql
CREATE TABLE users (
 id TEXT PRIMARY KEY NOT NULL,
 username VARCHAR NOT NULL,
 phone VARCHAR NOT NULL,
 created_at TEXT NOT NULL,
 unique(phone)
)

-- migrations/2022-11-21-101215_create_rooms/up.sql
CREATE TABLE rooms (
 id TEXT PRIMARY KEY NOT NULL,
 name VARCHAR NOT NULL,
 last_message TEXT NOT NULL,
 participant_ids TEXT NOT NULL,
 created_at TEXT NOT NULL
)

-- migrations/2022-11-21-101223_create_conversations/up.sql
CREATE TABLE conversations (
 id TEXT PRIMARY KEY NOT NULL,
 room_id TEXT NOT NULL,
 user_id TEXT NOT NULL,
 content VARCHAR NOT NULL,
 created_at TEXT NOT NULL
)

我们还需要添加一些虚拟数据,只是为了稍后初次渲染给客户端时有一些示例:

diesel migration generate dummy_data

数据看起来这样的:

-- migrations/2022-11-24-034153_generate_dummy_data/up.sql
INSERT INTO users(id, username, phone, created_at) 
VALUES
("4fbd288c-d3b2-4f78-adcf-def976902d50","Ahmad Rosid","123","2022-11-23T07:56:30.214162+00:00"),
("1e9a12c1-e98c-4a83-a55a-32cc548a169d","Ashley Young","345","2022-11-23T07:56:30.214162+00:00"),
("1bc833808-05ed-455a-9d26-64fe1d96d62d","Charles Edward","678","2022-12-23T07:56:30.214162+00:00");
INSERT INTO rooms(id, name, last_message, participant_ids, created_at)
VALUES
("f061383b-0393-4ce8-9a85-f31d03762263", "Charles Edward", "Hi, how are you?", "1e9a12c1-e98c-4a83-a55a-32cc548a169d,1bc833808-05ed-455a-9d26-64fe1d96d62d", "2022-12-23T07:56:30.214162+00:00"),
("008e9dc4-f01d-4429-ba31-986d7e63cce8", "Ahmad Rosid", "Hi... are free today?", "1e9a12c1-e98c-4a83-a55a-32cc548a169d,1bc833808-05ed-455a-9d26-64fe1d96d62d", "2022-12-23T07:56:30.214162+00:00");
INSERT INTO conversations(id, user_id, room_id, content, created_at)
VALUES
("9aeab1a7-e063-40d1-a120-1f7585fa47d6", "1bc833808-05ed-455a-9d26-64fe1d96d62d", "f061383b-0393-4ce8-9a85-f31d03762263", "Hello", "2022-12-23T07:56:30.214162+00:00"),
("f4e54e70-736b-4a79-a622-3659b0b555e8", "1e9a12c1-e98c-4a83-a55a-32cc548a169d", "f061383b-0393-4ce8-9a85-f31d03762263", "Hi, how are you?", "2022-12-23T07:56:30.214162+00:00"),
("d3ea6e39-ed58-4613-8922-b78f14a2676a", "1bc833808-05ed-455a-9d26-64fe1d96d62d", "008e9dc4-f01d-4429-ba31-986d7e63cce8", "Hi... are free today?", "2022-12-23T07:56:30.214162+00:00");

生成模式

现在我们生成模式并运行迁移:

diesel database setup
diesel migration run

CLI自动生成的模式:

// src/schema.rs
// 由Diesel CLI自动生成。
diesel::table! {
    conversations (id) {
        id -> Text,
        room_id -> Text,
        user_id -> Text,
        content -> Text,
        created_at -> Text,
    }
}
diesel::table! {
    rooms (id) {
        id -> Text,
        name -> Text,
        last_message -> Nullable<Text>,
        participant_ids -> Text,
        created_at -> Text,
    }
}
diesel::table! {
    users (id) {
        id -> Text,
        username -> Text,
        phone -> Text,
        created_at -> Text,
    }
}
diesel::allow_tables_to_appear_in_same_query!(
    conversations,
    rooms,
    users,
);

上述代码是自动生成的,所以不要对这个文件进行任何更改。

创建结构体

让我们创建一些结构体来存储所有表。需要记住的一点是,结构体中属性的顺序应该与模式文件中的顺序相同。如果顺序不匹配,你会得到错误的数据。

// src/model.rs
use serde::{Deserialize, Serialize};
use crate::schema::*;
#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Insertable)]
pub struct User {
    pub id: String,
    pub username: String,
    pub phone: String,
    pub created_at: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Queryable, Insertable)]
pub struct Conversation {
    pub id: String,
    pub room_id: String,
    pub user_id: String,
    pub content: String,
    pub created_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Insertable)]
pub struct Room {
    pub id: String,
    pub name: String,
    pub last_message: String,
    pub participant_ids: String,
    pub created_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewUser {
    pub username: String,
    pub phone: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewConversation {
    pub user_id: String,
    pub room_id: String,
    pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoomResponse {
    pub room: Room,
    pub users: Vec<User>,
}

设置查询

现在,让我们从数据库中获取数据。

首先,导入依赖项:

// src/db.rs
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use std::{
    collections::{HashMap, HashSet},
    time::SystemTime,
};
use uuid::Uuid;
use crate::models::{Conversation, NewConversation, Room, RoomResponse, User};
type DbError = Box<dyn std::error::Error + Send + Sync>;

由于SQLite没有内建的日期功能,我们将创建一个:

// src/db.rs
fn iso_date() -> String {
    let now = SystemTime::now();
    let now: DateTime<Utc> = now.into();
    return now.to_rfc3339();
}

通过电话号码查找用户 在这里,我们将设置一个查询,实现一个简单的登录功能,并使我们能够通过电话号码找到用户。我们仅将此登录方法作为示例。在生产中,你会想使用一种可以轻松验证和调试的方法:

// src/db.rs
pub fn find_user_by_phone(
    conn: &mut SqliteConnection,
    user_phone: String,
) -> Result<Option<User>, DbError> {
    use crate::schema::users::dsl::*;
    let user = users
        .filter(phone.eq(user_phone))
        .first::<User>(conn)
        .optional()?;
    Ok(user)
}

添加新用户 这是一个存储为我们的应用注册的新用户的查询。这也是我们认证系统的一部分。同样,请不要在你的生产应用中使用这种方法:

// src/db.rs
pub fn insert_new_user(conn: &mut SqliteConnection, nm: &str, pn: &str) -> Result<User, DbError> {
    use crate::schema::users::dsl::*;
    let new_user = NewUser {
        id: Uuid::new_v4().to_string(),
        username: nm.to_owned(),
        phone: pn.to_owned(),
        created_at: iso_date(),
    };
    diesel::insert_into(users).values(&new_user).execute(conn)?;
    Ok(new_user)
}

添加了新用户后,我们现在插入新的对话:

// src/db.rs
pub fn insert_new_conversation(
    conn: &mut SqliteConnection,
    new: NewConversation,
) -> Result<Conversation, DbError> {
    use crate::schema::conversations::dsl::*;
    let new_conversation = Conversation {
        id: Uuid::new_v4().to_string(),
        user_id: new.user_id,
        room_id: new.room_id,
        content: new.message,
        created_at: iso_date(),
    };
    diesel::insert_into(conversations)
        .values(&new_conversation)
        .execute(conn)?;
    Ok(new_conversation)
}

查找聊天室和参与者

接下来,让我们设置一个查询,从数据库中获取所有聊天室和参与者:

// src/db.rs
pub fn get_all_rooms(conn: &mut SqliteConnection) -> Result<Vec<RoomResponse>, DbError> {
    use crate::schema::rooms;
    use crate::schema::users;
    let rooms_data: Vec<Room> = rooms::table.load::<Room>(conn)?;
    let mut ids = HashSet::new();
    let mut rooms_map = HashMap::new();
    for room in &rooms_data {
        let user_ids = room
            .participant_ids
            .split(",")
            .collect::<Vec<_>>();
        for id in &user_ids {
            ids.insert(id.to_string());
        }
        rooms_map.insert(room.id.clone(), user_ids);
    }
    let ids: Vec<String> = ids.into_iter().collect();
    let users_data: Vec<User> = users::table
        .filter(users::id.eq_any(&ids))
        .load::<User>(conn)?;
    let users_map: HashMap<String, User> = users_data
        .into_iter()
        .map(|user| (user.id.clone(), user))
        .collect();
    let response_rooms = rooms_data.into_iter().map(|room| {
        let user_ids = rooms_map.get(&room.id).unwrap();
        let users = user_ids
            .iter()
            .map(|id| users_map.get(id.to_owned()).unwrap().clone())
            .collect::<Vec<_>>();
        return RoomResponse{ room, users };
    }).collect::<Vec<_>>();
    Ok(response_rooms)
}

ok,至此为止,完成server部分,剩余部分为使用React构建客户端,感兴趣可以参考原文