使用@Transactional配置redis事务的坑你遇到了吗
现象
先来看一段代码,在事务方法中使用stringRedisTemplate查询数据。先抛开在业务代码中使用keys不谈,并且redis中存在数据,你觉得这里keys方法能够正常返回吗?
stringRedisTemplate.keys可能查不出数据, 但stringRedisTemplate.keys查不出数据不太可能。
@Transactional
public void doSomething(String pattern) {
doDB();
String<String> keys = stringRedisTemplate.keys(pattern);
doOtherThing(keys);
}
如果认为这边的查询能够正常返回,那么恭喜你,错了!
这边我拉下了个前提,那就是redisTemplate需要开启事务。即 redisTemplate.setEnableTransactionSupport(true)
排查 & 验证
先来看下redis事务机制的介绍:
- redis的事务保证事务中的多条命令是被“揉合”成一个命令执行的,即在redis同一个事务中的命令是一个连续的序列。
- 在调用exec后,事务中的命令才被真正执行。也就是说在调用exec命令之前,命令的输入不会有任何结果。
啊哈,看到这里我大概已经有了一些头绪,难道是springboot事务导致redisTemplate的命令并没有真正执行,只有在被@Transactional注解注释的方法结束后,事务才回去提交,才执行redis的exec命令。
为了进一步验证,亮出我的demo
@RestController
@RequestMapping("/redis")
public class RedisController {
private StringRedisTemplate redisTemplate;
public RedisController(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// redis开启支持事务
this.redisTemplate.setEnableTransactionSupport(true);
}
@PostMapping("/init")
public String init() {
for (int i = 0; i < 10000; i++) {
redisTemplate.opsForValue().set("TEST:" + i, "VALUE");
}
return "ok";
}
@GetMapping("/get")
@Transactional
public String get(@RequestParam String key) {
return redisTemplate.opsForValue().get(key);
}
}
结果如同之前的猜想,返回了空字符串。
再验
到这里问题应该已经排查完了,在redis事务中我们不应该使用查询语句,但有时候可能只是因为多看了一眼,一切便大不相同。正当我兴致勃勃觉得这个问题已经解决了的时候,我去翻了下redis的官网
纳尼,他居然说如果是读命令的话会新建一个连接,那应该会正常返回才对!这和我们刚才观测的完全相反。
源码解读
那么我问你,那么我问你,那么我问你,那能怎么办?从源码开始看。
redisTemplate执行命令时候,会判断当前方法是否执行在@Transactional注解下以及enableTransactionSupport是否为true,来获取当前的连接
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
boolean transactionSupport) {
Assert.notNull(factory, "No RedisConnectionFactory specified");
// 事务连接持有者,ThreadLocal
RedisConnectionHolder conHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
log.debug("Fetching resumed Redis Connection from RedisConnectionFactory");
conHolder.setConnection(fetchConnection(factory));
}
return conHolder.getRequiredConnection();
}
// Else we either got no holder or an empty thread-bound holder here.
if (!allowCreate) {
throw new IllegalArgumentException("No connection found and allowCreate = false");
}
log.debug("Fetching Redis Connection from RedisConnectionFactory");
RedisConnection connection = fetchConnection(factory);
//核心判断 是否绑定事务
boolean bindSynchronization = TransactionSynchronizationManager.isActualTransactionActive() && transactionSupport;
if (bind || bindSynchronization) {
if (bindSynchronization && isActualNonReadonlyTransactionActive()) {
// 进行事务代理
connection = createConnectionSplittingProxy(connection, factory);
}
try {
// Use same RedisConnection for further Redis actions within the transaction.
// Thread-bound object will get removed by synchronization at transaction completion.
RedisConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new RedisConnectionHolder(connection);
} else {
holderToUse.setConnection(connection);
}
holderToUse.requested();
// Consider callback-scope connection binding vs. transaction scope binding
if (bindSynchronization) {
potentiallyRegisterTransactionSynchronisation(holderToUse, factory);
}
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(factory, holderToUse);
}
} catch (RuntimeException ex) {
// Unexpected exception from external delegation call -> close Connection and rethrow.
releaseConnection(connection, factory);
throw ex;
}
return connection;
}
return connection;
}
事务代理采用的jdk代理增强,核心方法是
public Object intercept(Object obj, Method method, Object[] args) throws Throwable {
if (method.getName().equals("getTargetConnection")) {
// Handle getTargetConnection method: return underlying RedisConnection.
return obj;
}
RedisCommand commandToExecute = RedisCommand.failsafeCommandLookup(method.getName());
// 当前连接需要在事务连接中执行,写操作
if (isPotentiallyThreadBoundCommand(commandToExecute)) {
if (log.isDebugEnabled()) {
log.debug(String.format("Invoke '%s' on bound connection", method.getName()));
}
return invoke(method, obj, args);
}
if (log.isDebugEnabled()) {
log.debug(String.format("Invoke '%s' on unbound connection", method.getName()));
}
// 不需要事务连接的在一个新的连接中执行,只读操作
RedisConnection connection = factory.getConnection();
try {
return invoke(method, connection, args);
} finally {
// properly close the unbound connection after executing command
if (!connection.isClosed()) {
doCloseConnection(connection);
}
}
}
什么样的命令可以在新的连接中执行呢?
private boolean isPotentiallyThreadBoundCommand(RedisCommand command) {
// 未知命令,或者非只读命令
return RedisCommand.UNKNOWN.equals(command) || !command.isReadonly();
}
也就是说,当在事务中执行redis命令时,只读的命令将被路由到合适的connection中,非只读的命令才会在处理事务的连接中执行。
那么操蛋的事情来了,他妈的我们的读命令为啥会是空的呢?
Debug
首先在connection的新建连接的代理处打一个断点
org.springframework.data.redis.core.RedisConnectionUtils.ConnectionSplittingInterceptor#intercept:502
一步步debug进去,可以看到redis connection还是拿到值了的,为啥没有返回呢?
继续往下跟,来到了
org.springframework.data.redis.connection.DefaultStringRedisConnection#convertAndReturn
可以看到该isFutureConversion()返回为true,直接无视了value返回null。
再来看下判断条件的方法,该方法用于判断当前连接是否是在执行pipeline命令或者multi的命令。
以isQueueing方法为例,返回的是当前被代理的连接是否在执行事务。
但是奇怪的一点是,在最开始的代理中我们明显是新建了一个连接,这个连接不可能处于事务当中。
最终将目光移回RedisTemplate#execute方法
在如下方法的211行处,获取到了当前的连接,在216行处对连接进行了额外的处理
由于我们使用的是StringRedisTemplate,在经过216后,我们的连接被DefualtStringRedisConnection所包装
刚才看的convertAndReturn方法恰是DefualtStringRedisConnection的。
看到这里大部分应该已经明白了
首先,redisTemplate在调用方法时,会根据当前是否处于事务获取连接。当处于事务时会复用同一个连接,这是针对写命令的,此外还会使用jdk对Connection的方法进行增强。当为读命令时,会新建一个连接进行调用,所以理论上读命令应该是能够拿到数据的。
然后,在使用StringRedisTemplate时,redis在原始的LetucceConnection(事务connection)上进行包装,封装了一层DefaultStringRedisConnection用于反序列化。
最后,DefaultStringRedisConnection在接受到数据时,会判断当前的连接是不是事务连接,如果是则直接返回null,而DefaultStringRedisConnection在接受到数据时持有的是一个事务的Connection,这就导致了我们的读操作永远返回null。ps:并且主动调用exec也拿不到数据, 相信看过上面的代码,你应该能够理解。
结论
就目前而言,@Transactional注解和redis事务配合的不是很好,尤其别在事务中执行读操作,原因就是本文出现的现象。在使用redisTemplate时候应该将enableTransactionSupport设置为false,如果非要事务,可以使用官方推荐的callback的形式或者lua脚本。
//execute a transaction
List<Object> txResults = redisOperations.execute(new SessionCallback<List<Object>>() {
public List<Object> execute(RedisOperations operations) throws DataAccessException {
operations.multi();
operations.opsForSet().add("key", "value1");
// This will contain the results of all operations in the transaction
return operations.exec();
}
});
System.out.println("Number of items added to set: " + txResults.get(0));
参考
转载自:https://juejin.cn/post/7398045655794712585