如何在Java中使用 MongoDB 中使用事务的一致性问题
最新的 MongoDB 版本 4.2 引入了多文档事务。这是大多数 NoSQL 数据库所缺少的一项关键功能(也是 SQL DB 所吹嘘的)。
事务可以由一个或多个操作组成,充当原子操作。如果所有子操作都成功,则该事务被视为完成。否则,它将失败。
这称为原子性。这是一个重要的概念,理解它可以在并发读取/写入数据时保持数据的一致性。
文章范围和目标
本文的目的是向您展示一个现实生活中的例子,其中没有事务的情况下会发生数据不一致。然后我们将使用 MongoDB 事务在 Java 中构建一个解决方案来防止它们。
通过这样做,你将学会:
- 避免可能导致数据不一致的竞争条件
- 使用 Mongo 内置的可重试写入功能构建更具弹性的应用程序
另外,我添加了一个包装函数,static <R> R withTransaction(final Function<ClientSession, R> executeFn);
您可以使用它来提高代码的可读性。
示例:如何处理针对同一银行账户的并发交易
假设你和你的配偶共用一个银行账户。你们俩同时去 ATM 机取款。
t1 -> You: Press check balance. ATM shows 100 dollars
t2 -> Spouse: Press check balance. ATM shows 100 dollars
t3 -> You & Spouse: withdraw 10 dollars
t4 -> Bank: initializes P1 and P2 to handle your and your spouse's requests.
t5 -> P1 and P2 checked the balance and saw 100 dollars
t6 -> P1 and P2 subtracted 10 dollars from the balance
t7 -> P1 updated the DB with the new balance of 90
t8 -> P2 updated the DB with the new balance of 90
t1 - t8 是事件的时间线。P1 和 P2 是处理银行 ATM 机请求的进程。
在上面的例子中,操作并非按顺序发生。银行的进程 P2 没有等待 P1 完成其任务。如果银行在读取最新余额之前等待 P1 完成读取余额、计算新余额并将更新后的余额写回数据库,它就不会损失 10 美元。
解决这个问题的方法是事务。你可以把它们想象成有点类似于Java 中的锁、信号量和 Synchronized 块。在 Java 中,它保证只有锁的持有者才能执行被锁保护的代码。
如何设置辅助函数
现在让我们开始编码部分。我假设您已经设置了 MongoClient。您将需要Java Mongo Driver 3.8 或更高版本。
final static MongoClient client; // assumed you initialized this somewhere
public static ClientSession getNewClientSession() {
return client.startSession();
}
public static TransactionOptions getTransactionOptions() {
return TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.LOCAL)
.writeConcern(WriteConcern.MAJORITY)
.build();
}
下面的示例需要一些常规功能
getNewClientSession
仅返回事务的会话。ClientSession
是特定事务的标识符。这是您传递到所有后续 Mongo 操作的重要数据,以便它可以隔离操作。
getTransactionOptions
为事务提供选项。ReadPreference.primary()
在我们读取数据时为我们提供集群上最新的信息。WriteConcern.MAJORITY
导致数据库在成功写入大多数服务器后确认提交。
我们不应该到处创建客户端会话和事务选项,而应该在单一方法上执行此操作并仅将需要原子性的函数传递给它。
static <R> R withTransaction(final Function<ClientSession, R> executeFn) {
final ClientSession clientSession = getNewClientSession();
TransactionOptions txnOptions = this.getTransactionOptions();
TransactionBody<R> txnBody = new TransactionBody<R>() {
public R execute() {
return executeFn.apply(clientSession);
}
};
try {
return clientSession.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
e.printStackTrace();
} finally {
clientSession.close();
}
return null;
}
在事务中执行函数的通用函数。
上述函数在传入的函数(参数)内运行操作,executeFn
作为原子操作或事务。让我们使用事务来实现我们的提款功能。
请注意,我返回的是null
。您可以抛出一个新的异常,让调用者知道交易失败。为了便于说明,返回 null 表示交易失败。
Java 中的银行帐户示例
public class Account {
@BsonId
ObjectId _id;
int balance;
... getters and setters
}
public class AccountService {
public Collection<Account> getAccounts() {
return dbClient.getCollection('account', Account.class);
}
private Account currentBalance(ClientSession session, Bson accountId) {
return getAccounts().findOne(session, Filters.eq('_id', accountId)).first();
}
private int currentBalance(ClientSession session, Bson accountId) {
Account account = getAccounts().findOne(session, Filters.eq('_id', accountId)).first();
return account.balance;
}
private int updateBalance(ClientSession session, Bson accountId, int newBalance) {
Account account = getAccounts().updateOne(session, Filters.eq('_id', accountId), Updates.set('balance', newBalance)).first();
return account.balance;
}
public Account drawCash(ClientSession session, Bson accountId, int amount){
int currentBalance = this.currentBalance(accountId);
int newBalance = currentBalance - amount;
return updateBalance(session, accountId, amount);
}
}
注意:为了简单起见,不检查诸如检查余额是否大于提款金额等边缘情况。
在上面的代码片段中,Account
类是用户帐户的普通 Java 类模型。AccountService
是帐户集合的数据库访问器。该drawCach
方法完成第一个示例中描述的单个进程(P1 或 P2)执行的一组操作,以将钱款分配给您或您的配偶。
现在我们使用此withTransaction
函数来调用drawCache
:
... Some REST API
AccountService accountService = ...; // Dependency injected
@Path('/account/withdraw') // Endpoint to withdraw money
withdrawMoney() {
ObjectId accountId = ...// some method to get current users account ID
Account account = withTransaction(new Function<ClientSession, Account>() {
@Override
public Workflow apply(ClientSession clientSession) {
// Everything inside this block run with in the same transaction as long as you pass the argument clientSession to mongo
accountService.drawCash(clientSession, accountId, 10);
}
});
if(Objects.isNull(account)){
return "Failed to withdraw money";
}
return "New account balance is " + account.balance;
}
现在,如果您同时调用此端点两次,一个用户将看到最终余额为 90,而第二个用户将看到 80。
您可能已经猜到第二个用户的交易应该失败了。是的,确实失败了。但是 MongoDB 有一个内置的重试机制,它会自动重试我们的第二个操作并成功。
真实用例示例
我们在PS2PDF.com 在线视频转换器上使用事务来防止一个线程覆盖另一个线程更新的进程状态。
例如,对于每个视频转换过程,我们在 DB 上创建一个名为 Job 的文档。它有一个状态字段,可以采用诸如STARTED
、IN_PROGRESS
和等值COMPLETED
。
一旦线程将 DB 上的 Job.status 更新为COMPLETED
,我们不希望任何慢速线程将该消息恢复为IN_PROGRESS
。一旦作业完成,就无法更改。
我们使用上面提到的withTransaction
方法来保证没有操作会覆盖COMPLETE
状态。
结论
我希望您现在可以使用事务来避免应用程序出现竞争条件。此外,使用内置的retryWrite
和retryRead
来提高容错能力。
我应该指出,MongoDB 事务相当新,并且有文章指出在特殊情况下会发生一些不一致。但您不太可能遇到这些问题。
转载自:https://juejin.cn/post/7394789388143017993