likes
comments
collection
share

Rust 中使用原生 SQL 与 SQLx

作者站长头像
站长
· 阅读数 10

Rust 中使用原生 SQL 与 SQLx

Rust 中使用原生 SQL 与 SQLx

当谈到使用 SQL 时,Rust 生态系统让我们有太多的选择。 SQLx 是一个纯粹的异步、与运行时无关的 Rust SQL 包,它允许您在没有 DSL 的情况下使用编译时类型检查的查询。作为 Rust 中使用 SQL 的最流行方法之一,它具有以下优点:

  • 它与您最喜欢的所有 SQL 风格(MySQL、SQLite、Postgres)兼容
  • 编译时检查查询确保类型和查询的有效性
  • 支持 Postgres 监听/通知等额外功能
  • 构建和使用查询的多种不同方法
  • 您还可以使用 SQLx 创建自己的查询生成器!

让我们看看 SQLx 的实际应用!

入门

首先,您需要将 sqlx 添加到 Rust 程序中:

cargo add sqlx

您还需要安装官方 SQLx CLI sqlx-cli ,它可以帮助您更轻松地管理迁移等。您可以通过运行以下命令来安装它:

cargo install sqlx-cli

迁移

第一步:迁移。如果您愿意,您可以自己手动创建表 - 但这会花费大量时间和精力......并且您需要记住您做了什么!值得庆幸的是,我们可以编写 .sql 文件来表示我们的迁移,然后通过 sqlx-cli 或使用 sqlx::execute 命令将它们迁移到我们正在使用的任何数据库。一个简单的 SQL 模式可能如下所示:

-- this only creates a table if it doesn't exist, avoiding the issue of tables being wiped
CREATE TABLE IF NOT EXISTS foo (
  id SERIAL PRIMARY KEY,
  message TEXT
);

只要它是有效的 SQL,无论您决定使用哪种方法都会成功,并将在数据库中创建一个 _sqlx_migrations 表,其中包含已应用的迁移的列表。

应用内迁移命令可能如下所示:

pool.execute(include_str!("../schema.sql"))
        .await
        .context("Failed to initialize DB")?;

作为个人建议,我使用 sqlx-cli 并使用 sqlx migrate -r add <filename> 。此命令本质上添加了一个新的迁移,但是 -r 标志允许您在出现问题时随时恢复迁移。如果在将新的迁移部署到生产环境后出现任何问题,这是一种能够恢复事物的便捷方法。

查询

默认情况下,您可以通过快速运行查询然后使用连接池执行它来使用原始 SQL 查询:

let query = sqlx::query("SELECT * FROM TABLE")
  .execute(&pool)
  .await
  .unwrap();

默认情况下,SQLx 提倡使用绑定参数,这对于防止 SQL 注入非常重要 - 您只需将它们添加到查询中即可做到这一点(在此处找到有关此内容的更多信息):

sqlx::query("INSERT INTO TABLE (foo) VALUES ($1)")
  .bind("bar".to_string())
  .execute(&pool)
  .await
  .unwrap();

现在假设您正在编写一个返回某些内容的查询。当您从该查询中获取行时,您很可能必须单独获取每个值 - 返回数量少时很好,但是当您使用 fetch_all 时,您必须制作一个迭代器从每一行获取您需要的内容。方便的是,SQLx 知道这一点,并且幸运地为我们提供了一个宏,以便我们能够从 SQL 行向量中提取结构体向量 - 您可以使用 query_as 将返回结果绑定到使用 #[derive(Sqlx::FromRow)]

你会像这样使用它:

#[derive(sqlx::FromRow)]
struct Foo {
  id: i32,
  message: String
}

async fn foo(pool: PgPool) -> Vec<Foo> {
let res = sqlx::query_as::<_, Foo>("SELECT * FROM FOO")
  .fetch_all(&pool).await.unwrap();

  Ok(res)
}

来点更复杂的东西,您还可以使用 QueryBuilder 类型来构造查询。虽然它非常适合以编程方式向查询添加动态语句,但在使用它时应该小心,因为它具有添加非绑定参数的值的方法 - 如果您不确定,您最好使用 push_bind 您使用的内容是否安全。

一个使用示例:

const BIND_LIMIT: usize = 65535;

// This would normally produce values forever!
let records = (0..).map(|i| Foo {
    id: i,
    message: format!("This is note {i}"),
});

let mut query_builder: QueryBuilder<MySql> = QueryBuilder::new(
    // Note the trailing space; most calls to `QueryBuilder` don't automatically insert
    // spaces as that might interfere with identifiers or quoted strings where exact
    // values may matter.
    "SELECT * FROM users WHERE (id, username, email, password) in"
);

// Note that `.into_iter()` wasn't needed here since `users` is already an iterator.
query_builder.push_tuples(records.take(BIND_LIMIT / 2), |mut bound, foo| {
    // If you wanted to bind these by-reference instead of by-value,
    // you'd need an iterator that yields references that live as long as `query_builder`,
    // e.g. collect it to a `Vec` first.
    bound.push_bind(foo.id)
    .push_bind(foo.username);
});

let mut query = query_builder.build();

let res = query.fetch_all(&pool).await.unwrap();

现在,如果您尝试运行它,您将能够获得 Foo 结构体!但请记住,此方法确实有其警告,如下所示:您不会从 SQLx 编译时检查宏中受益,并且如果您不小心,这种查询生成方法可能会有些不安全。然而,当您需要在 Rust 中使用 SQL 动态生成查询时,它的功能非常强大。

我们还可以使用的最后一种查询类型是标量查询,它以元组的形式返回结果。例如,如果我们在执行 SELECT * FROM TABLE 查询时不具体知道有多少字段,则可以使用 query_scalar 来简单地引用列按它们出现的顺序而不是给定的名称。请参阅下面的示例:

let query = sqlx::query_scalar("SELECT * FROM FOO LIMIT 1").fetch_one(&pool).await.unwrap();

println!("{:?}", query.0);

现在来看看 SQLx 作为一个crate的优势之一:编译时查询检查。如果您使用原始 SQL,那么有某种保证您的 SQL 有效几乎从来都不是一件坏事:除非您是数据库管理员,否则如果您正在运行带有多个联接的查询,您肯定会想要以确保它在运行之前确实有效。这里应该说,您需要安装 sqlx-cli 才能利用此功能:如果没有,您将不得不使用以前的方法。

使用 query! 宏的简单查询可能如下所示:

// note that bound parameters are added to the query macro
let query = query!("SELECT * FROM FOO WHERE ID = $1", 1).fetch_one(&pool).await.unwrap();

同样,使用我们之前创建的 Foo 结构的等效查询可用于直接绑定我们的结果以创建结构体向量:

#[derive(sqlx::FromRow)]
struct Foo {
  id: i32,
  message: String
}

let query = query_as!(Foo, "SELECT * FROM FOO").fetch_all(&pool).await.unwrap();

当您使用 query!query_as! 宏时,您需要使用 cargo sqlx prepare ,它将为您的查询生成 JSON 文件。当您编译程序时,它会在编译时自动检查:如果有任何错误,它会自动为您检查。

在使用编译时检查宏(特别是 Postgres)时,有一个特殊的问题可能会让您陷入困境:如果您使用 as _ 重命名 SQL 字段,则该类型将自动包装在 Option 中如果您没有显式地将其设置为不可为空的值。 SQLx 对此有一个解决方案,即能够使用原始字符串将值显式声明为不可空列。例如,采用以下语句:

let query = query_as!(Foo, "SELECT id, message as message from foo").fetch_all(&pool).await.unwrap();

如果我们仍然将 Message 类型设置为 String,则此查询实际上将无法编译,因为 message 现在是 Option<String> 而不是 String 类型。但是,通过将上面的查询转换为上面的原始字符串,我们可以再次强制该字段不可为空:

// note that message is now "message!"
let query = query_as!(Foo, r#"SELECT id, message as "message!" from foo"#).fetch_all(&pool).await.unwrap();

当然,类似地, query_scalar 也有一个与之关联的宏,并且可以与 query! 宏类似地使用,同时返回元组。

我们还可以做一些非常棒的事情,那就是在文件中存储 SQL 查询并运行宏来运行 SQL 文件的内容,同时仍然绑定我们的参数。见下文:

SELECT * FROM FOO WHERE id = $1;
let query = query_file!("query.sql", 1i32).fetch_one(&pool).await.unwrap();

当然,这个特定的宏还支持使用 query_file_as!query_file_scalar! 进行结构绑定和标量查询。

值得注意的是,如果您只想在编译时检查语法而不是查询宏的数据库输入和输出是否正确,则可以在宏末尾添加 unchecked 。例如: query! 将变为 query_unchecked! 。如果您实际上还没有设置数据库,或者没有方便的方法来检索数据库 URL(或者在您不想让 SQLx 直接访问数据库的其他此类情况下),这非常有用。

PostgreSQL Listen/Notify

拥有与 Postgres 一样多的功能,SQLx 支持它们是一件好事 - 虽然 SQLx 主要是编写原始 SQL,但我们没有理由必须在其中编写所有内容。 SQLx 支持通道 LISTEN ,更重要的是 pg_notify ,这是我们在记录更新时能够处理来自 Postgres 的通知的好方法。让我们看一下下面的示例,了解如何设置事件侦听器:

// set up pool beforehand

let mut listener = PgListener::connect_with(&pool).await.unwrap();
listener.listen("testNotify").await.unwrap();

// set up a loop to receive notifications
tokio::spawn(async move || {
  while let Some(notification) = listener.try_recv().await.unwrap() {
    println!("{notification:?}");
  }
});

loop {
  sqlx::query("SELECT pg_notify('testNotify', 'Hello world!')").execute(&pool).await;
}

正如您在这里所看到的,我们spawn 了一个 Tokio 任务,以便能够异步循环并接收通知,然后将其打印出来 - 同时,在主执行线程中,我们还设置了一个loop来连续发送一个查询,该查询发送“你好世界!” 传递给 PgListener 接收通道。

对于将数据库 更新流 实现为 Web 服务中的更高级的 API端点(endpoint) ,您需要使用 .into_stream() 方法,因为框架通常会接受数据流,然后将其包装在框架中' 相关类型。例如,在 Axum 中,您可以使用 axum::response::Sse 类型(请注意,这假设您已经设置了 Web 服务):

use axum::{Extension, response::{Sse, sse::Event}};
use tokio_stream::StreamExt as _ ;
use futures_util::stream::{self, Stream};
use std::convert::Infallbile;
async fn return_stream(Extension(listener): Extension<PgListener>) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
  //监听通知流
  let stream = listener.into_stream();
  //用sse 通知前端
  Sse::new(stream
    .map(|msg| {
      let msg = msg.uwnrap();

      let json = json!(msg).to_string();
     Event::default().data(json)
    }).map(Ok),
   ).keep_alive(KeepAlive::default())
}

当我们设置 Web 服务时,我们可以通过以下两种方式之一创建通知:

  • 使用SQL
  • 在特定事件上使用 pg_notify

使用 pg_notify 本身非常容易,尽管只需使用Tokio Channels而不是SQL,就可以这样做。让换一种方法,使用SQL来设置我们的channel,以便我们不必手动生成代码。

CREATE TABLE IF NOT EXISTS test_table (
  id SERIAL PRIMARY KEY,
  message TEXT NOT NULL
);

CREATE TRIGGER "testNotify"
AFTER INSERT ON test_table
FOR EACH ROW EXECUTE PROCEDURE testNotify();

CREATE OR REPLACE FUNCTION testNotify()
  RETURNS TRIGGER AS $$
DECLARE
BEGIN
  PERFORM pg_notify('testNotify', ROW_TO_JSON(NEW)::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

现在,如果我们将其添加到 SQL 迁移文件中,然后运行我们正在使用的应用程序并转到我们用于 流的端点(endpoint),现在将能够接收通知流!

将 SQLx 与 Shuttle 结合使用

Shuttle 目前通过我们的注释宏提供 SQLx 作为默认连接,让您直接从代码配置基础设施,从而节省您的时间。您需要做的就是在代码中声明宏,如下所示:

use sqlx::PgPool;

#[shuttle_runtime::main]
async fn main(
  #[shuttle_shared_db::Postgres] db: PgPool // gets declared here
) -> shuttle_axum::ShuttleAxum {
  sqlx::migrate!().run(&db).await.map_err(|e| format!("Oh no! Migrations failed :( {e}");

  ... the rest of your code
}

我们的免费数据库通过共享数据库服务器提供(用户为每个应用程序拥有单独的数据库)。不过,我们现在通过 Pro 层提供 100% 独立的 AWS RDS 数据库,您可以在此处找到更多信息,它支持 MySQL、Postgres 和 MariaDB。

尾声

感谢您阅读这篇文章!我希望您已经很好地了解了如何在 Rust 中使用 SQL,以及在使用 Rust SQL 时,SQLx 在使原始的、编译时检查的 SQL 查询的功能为您提供帮助方面提供了多少实用程序。

转载自:https://juejin.cn/post/7342432361098051634
评论
请登录