likes
comments
collection
share

Rust:Redis bb8的使用,优化rust与mysql数据库连接

作者站长头像
站长
· 阅读数 12

为什么要使用Redis?

  • 提升性能:对于需要快速响应的应用,Redis可以大幅减少数据处理时间。
  • 处理高并发:在面对大量并发请求时,Redis能有效分担数据库的压力,提高整体系统的吞吐量。
  • 降低成本:相比于不断扩展数据库硬件来应对增长的读取负载,使用Redis作为缓存层是一种更经济的解决方案。
  • 数据可靠性:通过其持久化和备份机制,Redis确保即使在系统故障的情况下数据也不会丢失。

为什么不直接使用程序内存,而是使用redis?

1. 持久性和可靠性

  • 程序内存:当程序重启或崩溃时,存储在程序内存中的所有数据都会丢失。这对于需要持久存储的数据来说是不可接受的。
  • Redis:尽管Redis是基于内存的存储系统,但它提供了数据持久化的选项,可以将内存中的数据定期保存到磁盘,确保数据的安全和持久性。

2. 数据共享和扩展性

  • 程序内存:数据仅限于单个进程,这限制了应用的扩展能力。在分布式系统或多进程环境中,不同的进程无法直接访问彼此的内存。
  • Redis:作为一个独立的服务,Redis允许多个进程、多个应用甚至多个服务器之间共享和访问同一数据集,从而支持高可扩展性和分布式系统的构建。

3. 数据结构和管理

  • 程序内存:数据结构和管理逻辑需要自己实现,这增加了开发的复杂性和出错的可能性。
  • Redis:提供了丰富的数据结构(如字符串、列表、集合等)和原子操作,简化了数据管理,同时提高了效率和可靠性。

4. 内存管理和效率

  • 程序内存:直接使用程序内存可能导致内存管理的问题,如内存泄漏,特别是在长时间运行的应用中。
  • Redis:拥有优化的内存管理机制,可以有效地处理大量数据,并在内存受限的环境中运行得很好。

5. 可用性和故障恢复

  • 程序内存:一旦应用停止,所有数据都需要从数据库或其他持久存储中重新加载,这可能是一个缓慢的过程。
  • Redis:提供了高可用性选项,如主从复制和故障转移,以及快速的数据恢复能力。

Redis集成前的准备工作

首先,我们需要在开发环境中安装并配置Redis。然后,在Rust项目的Cargo.toml中添加redis库作为依赖。

创建redis连接池

1. 添加依赖

首先,在 Cargo.toml 中添加 bb8bb8-redistokio 的依赖(如果您还没有添加 tokio)。

[dependencies]
bb8 = "0.7"
bb8-redis = "0.7"
redis = "0.21"
tokio = { version = "1", features = ["full"] }

2. 设置静态连接池

在 Rust 文件中,定义一个静态的 OnceCell 来存储 Redis 连接池。

use bb8::{Pool, ManageConnection};
use bb8_redis::{bb8, redis, RedisConnectionManager};
use tokio::sync::OnceCell;

pub static REDIS_POOL: OnceCell<Pool<RedisConnectionManager>> = OnceCell::const_new();

3. 初始化连接池

创建一个异步函数来初始化 Redis 连接池。这个函数会在应用启动时调用。

pub async fn init_redis_pool() {
    let manager = RedisConnectionManager::new("redis://127.0.0.1/").unwrap(); // 替换为您的 Redis 服务器 URL
    let pool = Pool::builder().build(manager).await.expect("Failed to create pool.");

    REDIS_POOL.get_or_init(|| async { pool }).await;
}

4. 使用连接池

当需要与 Redis 交互时,从连接池中获取一个连接并执行操作。

pub async fn some_redis_operation() {
    let pool = REDIS_POOL.get().expect("Redis pool not initialized");
    let mut conn = pool.get().await.expect("Failed to get Redis connection");
    
    // 执行 Redis 操作,例如设置一个键值对
    let _: () = redis::cmd("SET").arg("key").arg("value").query_async(&mut *conn).await.expect("Redis command failed");
}

实现细节

  • 缓存策略:确定哪些数据适合缓存(如用户认证信息),以及缓存的生命周期。
  • 数据一致性:确保Redis和MySQL数据库之间的数据一致性,特别是在更新和删除操作中。
  • 错误处理:实现健壮的错误处理逻辑,以处理Redis服务不可用的情况。
  • 性能监控:使用合适的工具来监控性能指标,确保优化达到预期效果。

通过将Redis集成到现有的Rust和MySQL应用中, 可以显著提高应用的响应速度和处理更大的用户访问量。


随着用户量和数据量的增长,数据库性能变得至关重要。本文将探讨如何在Rust和MySQL的现有架构上,通过集成Redis来提升性能和可扩展性。

我们的应用使用Rust进行编程,搭配MySQL数据库处理数据。Redis,作为一个高性能的键值存储系统,可以为我们的应用提供快速的数据访问和缓存能力。

优化前的代码

这是一个简单的用户相关的增删改查服务,以此为例来说明如何使用redis来优化我们原有的直接使用rust与mysql交互的代码:

use crate::{
    app_response::AppResult,
    db::DB,
    dtos::user::{
        UserAddRequest, UserLoginRequest, UserLoginResponse, UserResponse, UserUpdateRequest,
    },
    entities::user::User,
    middleware::jwt::get_token,
    utils::rand_utils::{self},
};
use uuid::Uuid;
pub async fn add_user(req: UserAddRequest) -> AppResult<UserResponse> {
    let db = DB.get().ok_or(anyhow::anyhow!(""))?;
    let id = Uuid::new_v4().to_string();
    let hash_password = rand_utils::hash_password(req.password).await?;
    let _ = sqlx::query!(
        r#"
            INSERT INTO users (id, username, password)
            VALUES (?, ?, ?)
            "#,
        id,
        req.username,
        hash_password,
    )
    .execute(db)
    .await?;

    Ok(UserResponse {
        id,
        username: req.username,
    })
}

pub async fn login(req: UserLoginRequest) -> AppResult<UserLoginResponse> {
    let db = DB.get().ok_or(anyhow::anyhow!(""))?;
    let user = sqlx::query_as!(
        User,
        r#"
            SELECT id, username, password FROM users
            WHERE username = ?
            "#,
        req.username
    )
    .fetch_optional(db)
    .await?;
    if user.is_none() {
        return Err(anyhow::anyhow!("").into());
    }
    let user = user.unwrap();
    if rand_utils::verify_password(req.password, user.password)
        .await
        .is_err()
    {
        return Err(anyhow::anyhow!("Incorrect password.").into());
    }
    let (token, exp) = get_token(user.username.clone(), user.id.clone())?;
    let res = UserLoginResponse {
        id: user.id,
        username: user.username,
        token,
        exp,
    };
    Ok(res)
}

pub async fn update_user(user_id: String, req: UserUpdateRequest) -> AppResult<UserResponse> {
    let db = DB.get().ok_or(anyhow::anyhow!(""))?;
    let hash_password = rand_utils::hash_password(req.password).await?;
    let _ = sqlx::query!(
        r#"
            UPDATE users
            SET username = ?, password = ?
            WHERE id = ?
            "#,
        req.username,
        hash_password,
        user_id,
    )
    .execute(db)
    .await?;

    Ok(UserResponse {
        id: user_id,
        username: req.username,
    })
}

pub async fn delete_user(id: String) -> AppResult<()> {
    let db = DB.get().ok_or(anyhow::anyhow!(""))?;
    let result = sqlx::query!(
        r#"
            DELETE FROM users
            WHERE id = ?
            "#,
        id,
    )
    .execute(db)
    .await?;

    if result.rows_affected() == 0 {
        // No rows are affected and a 404 error is returned
        Err(anyhow::anyhow!("The user to be deleted does not exist").into())
    } else {
        // Successfully deleted
        Ok(())
    }
}

pub async fn users() -> AppResult<Vec<UserResponse>> {
    let db = DB.get().ok_or(anyhow::anyhow!(""))?;
    let users = sqlx::query_as!(
        User,
        r#"
            SELECT id, username, password FROM users
            "#,
    )
    .fetch_all(db)
    .await?;
    let res = users
        .into_iter()
        .map(|user| UserResponse {
            id: user.id,
            username: user.username,
        })
        .collect::<Vec<_>>();
    Ok(res)
}

1. login 函数优化

在用户登录时,首先从 Redis 检查是否有缓存的用户信息。如果有,则直接返回,否则查询数据库并将结果缓存。

pub async fn login(req: UserLoginRequest) -> AppResult<UserLoginResponse> {
    let redis_pool = REDIS.get().ok_or(anyhow::anyhow!("Redis pool not initialized"))?;
    let mut redis_conn = redis_pool.get().await?;

    // 尝试从 Redis 获取缓存的用户信息
    if let Ok(cached) = redis_conn.get::<_, String>(&req.username).await {
        if let Ok(login_response) = serde_json::from_str::<UserLoginResponse>(&cached) {
            return Ok(login_response);
        }
    }

    // 如果没有缓存,从数据库查询
    let db = DB.get().ok_or(anyhow::anyhow!("Database not available"))?;
    // ... (数据库查询和密码验证代码)

    // 缓存用户登录信息到 Redis
    let cache_value = serde_json::to_string(&res)?;
    let _: () = redis_conn.set_ex(&req.username, cache_value, 3600).await?; // 缓存 1 小时

    Ok(res)
}

2. users 函数优化

缓存用户列表,减少每次请求都要查询数据库的情况。

pub async fn users() -> AppResult<Vec<UserResponse>> {
    let redis_pool = REDIS.get().ok_or(anyhow::anyhow!("Redis pool not initialized"))?;
    let mut redis_conn = redis_pool.get().await?;

    // 尝试从 Redis 获取缓存的用户列表
    if let Ok(cached) = redis_conn.get::<_, String>("user_list").await {
        if let Ok(user_list) = serde_json::from_str::<Vec<UserResponse>>(&cached) {
            return Ok(user_list);
        }
    }

    let db = DB.get().ok_or(anyhow::anyhow!("Database not available"))?;
    // ... (数据库查询代码)

    // 缓存用户列表到 Redis
    let cache_value = serde_json::to_string(&res)?;
    let _: () = redis_conn.set_ex("user_list", cache_value, 3600).await?; // 缓存 1 小时

    Ok(res)
}

3. 在修改用户信息后更新 Redis 缓存

add_userupdate_userdelete_user 函数中,进行相应的数据库操作后,更新 Redis 中的相关缓存。

add_user 为例:

pub async fn add_user(req: UserAddRequest) -> AppResult<UserResponse> {
    // ... (原有的添加用户代码)

    // 更新 Redis 缓存
    let redis_pool = REDIS.get().ok_or(anyhow::anyhow!("Redis pool not initialized"))?;
    let mut redis_conn = redis_pool.get().await?;
    let _: () = redis_conn.del("user_list").await?; // 删除缓存的用户列表

    Ok(UserResponse {
        id,
        username: req.username,
    })
}

对于 update_userdelete_user,采用类似的方式来删除或更新 Redis 中的缓存数据。

为什么进行这些优化

  • 提高性能:缓存可以大大减少数据库查询次数,特别是对于频繁读取但不经常变更的数据。
  • 减少数据库负载:通过减少对数据库的直接访问,可以降低数据库的负载,特别是在高流量情况下。
  • 提升用户体验:使用缓存可以减少响应时间,从而提升最终用户的体验。

update_user 函数优化

当用户信息更新时,我们需要更新 Redis 中的相关缓存。由于用户信息可能已缓存在登录信息和用户列表中,我们需要更新这两个缓存。

pub async fn update_user(user_id: String, req: UserUpdateRequest) -> AppResult<UserResponse> {
    let db = DB.get().ok_or(anyhow::anyhow!("Database not available"))?;
    let hash_password = rand_utils::hash_password(req.password).await?;
    let _ = sqlx::query!(
        r#"
        UPDATE users
        SET username = ?, password = ?
        WHERE id = ?
        "#,
        req.username,
        hash_password,
        user_id,
    )
    .execute(db)
    .await?;

    // 更新 Redis 缓存
    let redis_pool = REDIS.get().ok_or(anyhow::anyhow!("Redis pool not initialized"))?;
    let mut redis_conn = redis_pool.get().await?;
    let _: () = redis_conn.del("user_list").await?; // 删除用户列表缓存
    let _: () = redis_conn.del(&req.username).await?; // 删除特定用户的登录信息缓存

    Ok(UserResponse {
        id: user_id,
        username: req.username,
    })
}

delete_user 函数优化

在删除用户时,我们需要从 Redis 缓存中移除与该用户相关的所有信息。

pub async fn delete_user(id: String) -> AppResult<()> {
    let db = DB.get().ok_or(anyhow::anyhow!("Database not available"))?;
    let result = sqlx::query!(
        r#"
        DELETE FROM users
        WHERE id = ?
        "#,
        id,
    )
    .execute(db)
    .await?;

    if result.rows_affected() == 0 {
        // 没有找到用户,返回错误
        Err(anyhow::anyhow!("The user to be deleted does not exist").into())
    } else {
        // 成功删除用户,更新 Redis 缓存
        let redis_pool = REDIS.get().ok_or(anyhow::anyhow!("Redis pool not initialized"))?;
        let mut redis_conn = redis_pool.get().await?;
        let _: () = redis_conn.del("user_list").await?; // 删除用户列表缓存

        // 你可能还需要删除与该用户相关的其他缓存,例如登录信息等
        // let _: () = redis_conn.del(&user_username).await?; // 假设你有用户名来删除特定的缓存

        Ok(())
    }
}

优化说明

  • update_user 中,我们更新了用户的信息后,需要删除或更新缓存中的用户信息,以保证数据一致性。
  • delete_user 中,删除用户后,我们也需要从缓存中删除该用户的相关信息。
  • 这些操作确保了数据库和缓存之间的数据一致性,同时也利用了 Redis 的高效性能来提高整体应用性能。

通过集成Redis,我们的Rust和MySQL应用在性能和可扩展性方面获得显著提升。未来,我们可以探索更多优化方向,如使用Redis集群来进一步增强应用的高可用性和负载均衡能力。