likes
comments
collection
share

使用@Transactional配置redis事务的坑你遇到了吗

作者站长头像
站长
· 阅读数 19

现象

先来看一段代码,在事务方法中使用stringRedisTemplate查询数据。先抛开在业务代码中使用keys不谈,并且redis中存在数据,你觉得这里keys方法能够正常返回吗?

stringRedisTemplate.keys可能查不出数据, 但stringRedisTemplate.keys查不出数据不太可能。


@Transactional
public void doSomething(String pattern) {
    doDB();
    String<String> keys = stringRedisTemplate.keys(pattern);
    doOtherThing(keys);
}

如果认为这边的查询能够正常返回,那么恭喜你,错了!

这边我拉下了个前提,那就是redisTemplate需要开启事务。即 redisTemplate.setEnableTransactionSupport(true)

排查 & 验证

先来看下redis事务机制的介绍:

  1. redis的事务保证事务中的多条命令是被“揉合”成一个命令执行的,即在redis同一个事务中的命令是一个连续的序列。
  2. 在调用exec后,事务中的命令才被真正执行。也就是说在调用exec命令之前,命令的输入不会有任何结果。

啊哈,看到这里我大概已经有了一些头绪,难道是springboot事务导致redisTemplate的命令并没有真正执行,只有在被@Transactional注解注释的方法结束后,事务才回去提交,才执行redis的exec命令。

使用@Transactional配置redis事务的坑你遇到了吗

为了进一步验证,亮出我的demo

@RestController
@RequestMapping("/redis")
public class RedisController {

    private StringRedisTemplate redisTemplate;

    public RedisController(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        // redis开启支持事务
        this.redisTemplate.setEnableTransactionSupport(true);
    }
    
    @PostMapping("/init")
    public String init() {
        for (int i = 0; i < 10000; i++) {
            redisTemplate.opsForValue().set("TEST:" + i, "VALUE");
        }
        return "ok";
    }

    @GetMapping("/get")
    @Transactional
    public String get(@RequestParam String key) {
        return redisTemplate.opsForValue().get(key);
    }
}

结果如同之前的猜想,返回了空字符串。 使用@Transactional配置redis事务的坑你遇到了吗

再验

到这里问题应该已经排查完了,在redis事务中我们不应该使用查询语句,但有时候可能只是因为多看了一眼,一切便大不相同。正当我兴致勃勃觉得这个问题已经解决了的时候,我去翻了下redis的官网

使用@Transactional配置redis事务的坑你遇到了吗

纳尼,他居然说如果是读命令的话会新建一个连接,那应该会正常返回才对!这和我们刚才观测的完全相反。

使用@Transactional配置redis事务的坑你遇到了吗

源码解读

那么我问你,那么我问你,那么我问你,那能怎么办?从源码开始看。

redisTemplate执行命令时候,会判断当前方法是否执行在@Transactional注解下以及enableTransactionSupport是否为true,来获取当前的连接


	public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
			boolean transactionSupport) {

		Assert.notNull(factory, "No RedisConnectionFactory specified");

        // 事务连接持有者,ThreadLocal
		RedisConnectionHolder conHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);

		if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
			conHolder.requested();
			if (!conHolder.hasConnection()) {
				log.debug("Fetching resumed Redis Connection from RedisConnectionFactory");
				conHolder.setConnection(fetchConnection(factory));
			}
			return conHolder.getRequiredConnection();
		}

		// Else we either got no holder or an empty thread-bound holder here.

		if (!allowCreate) {
			throw new IllegalArgumentException("No connection found and allowCreate = false");
		}

		log.debug("Fetching Redis Connection from RedisConnectionFactory");
		RedisConnection connection = fetchConnection(factory);

        //核心判断 是否绑定事务
		boolean bindSynchronization = TransactionSynchronizationManager.isActualTransactionActive() && transactionSupport;

		if (bind || bindSynchronization) {

			if (bindSynchronization && isActualNonReadonlyTransactionActive()) {
				// 进行事务代理
                connection = createConnectionSplittingProxy(connection, factory);
			}

			try {
				// Use same RedisConnection for further Redis actions within the transaction.
				// Thread-bound object will get removed by synchronization at transaction completion.
				RedisConnectionHolder holderToUse = conHolder;
				if (holderToUse == null) {
					holderToUse = new RedisConnectionHolder(connection);
				} else {
					holderToUse.setConnection(connection);
				}
				holderToUse.requested();

				// Consider callback-scope connection binding vs. transaction scope binding
				if (bindSynchronization) {
					potentiallyRegisterTransactionSynchronisation(holderToUse, factory);
				}

				if (holderToUse != conHolder) {
					TransactionSynchronizationManager.bindResource(factory, holderToUse);
				}
			} catch (RuntimeException ex) {
				// Unexpected exception from external delegation call -> close Connection and rethrow.
				releaseConnection(connection, factory);
				throw ex;
			}

			return connection;
		}

		return connection;
    }

事务代理采用的jdk代理增强,核心方法是

public Object intercept(Object obj, Method method, Object[] args) throws Throwable {

    if (method.getName().equals("getTargetConnection")) {
        // Handle getTargetConnection method: return underlying RedisConnection.
        return obj;
    }

    RedisCommand commandToExecute = RedisCommand.failsafeCommandLookup(method.getName());

    // 当前连接需要在事务连接中执行,写操作
    if (isPotentiallyThreadBoundCommand(commandToExecute)) {

        if (log.isDebugEnabled()) {
            log.debug(String.format("Invoke '%s' on bound connection", method.getName()));
        }

        return invoke(method, obj, args);
    }

    if (log.isDebugEnabled()) {
        log.debug(String.format("Invoke '%s' on unbound connection", method.getName()));
    }

    // 不需要事务连接的在一个新的连接中执行,只读操作
    RedisConnection connection = factory.getConnection();

    try {
        return invoke(method, connection, args);
    } finally {
        // properly close the unbound connection after executing command
        if (!connection.isClosed()) {
            doCloseConnection(connection);
        }
    }
}

什么样的命令可以在新的连接中执行呢?

private boolean isPotentiallyThreadBoundCommand(RedisCommand command) {
    // 未知命令,或者非只读命令
    return RedisCommand.UNKNOWN.equals(command) || !command.isReadonly();
}

也就是说,当在事务中执行redis命令时,只读的命令将被路由到合适的connection中,非只读的命令才会在处理事务的连接中执行。

那么操蛋的事情来了,他妈的我们的读命令为啥会是空的呢?

Debug

首先在connection的新建连接的代理处打一个断点

org.springframework.data.redis.core.RedisConnectionUtils.ConnectionSplittingInterceptor#intercept:502

使用@Transactional配置redis事务的坑你遇到了吗

一步步debug进去,可以看到redis connection还是拿到值了的,为啥没有返回呢?

使用@Transactional配置redis事务的坑你遇到了吗

继续往下跟,来到了

org.springframework.data.redis.connection.DefaultStringRedisConnection#convertAndReturn

可以看到该isFutureConversion()返回为true,直接无视了value返回null。

使用@Transactional配置redis事务的坑你遇到了吗

再来看下判断条件的方法,该方法用于判断当前连接是否是在执行pipeline命令或者multi的命令。使用@Transactional配置redis事务的坑你遇到了吗

以isQueueing方法为例,返回的是当前被代理的连接是否在执行事务。使用@Transactional配置redis事务的坑你遇到了吗

但是奇怪的一点是,在最开始的代理中我们明显是新建了一个连接,这个连接不可能处于事务当中。

最终将目光移回RedisTemplate#execute方法

在如下方法的211行处,获取到了当前的连接,在216行处对连接进行了额外的处理

使用@Transactional配置redis事务的坑你遇到了吗

由于我们使用的是StringRedisTemplate,在经过216后,我们的连接被DefualtStringRedisConnection所包装

使用@Transactional配置redis事务的坑你遇到了吗

刚才看的convertAndReturn方法恰是DefualtStringRedisConnection的。使用@Transactional配置redis事务的坑你遇到了吗

看到这里大部分应该已经明白了

首先,redisTemplate在调用方法时,会根据当前是否处于事务获取连接。当处于事务时会复用同一个连接,这是针对写命令的,此外还会使用jdk对Connection的方法进行增强。当为读命令时,会新建一个连接进行调用,所以理论上读命令应该是能够拿到数据的。

然后,在使用StringRedisTemplate时,redis在原始的LetucceConnection(事务connection)上进行包装,封装了一层DefaultStringRedisConnection用于反序列化。

最后,DefaultStringRedisConnection在接受到数据时,会判断当前的连接是不是事务连接,如果是则直接返回null,而DefaultStringRedisConnection在接受到数据时持有的是一个事务的Connection,这就导致了我们的读操作永远返回null。ps:并且主动调用exec也拿不到数据, 相信看过上面的代码,你应该能够理解。

结论

就目前而言,@Transactional注解和redis事务配合的不是很好,尤其别在事务中执行读操作,原因就是本文出现的现象。在使用redisTemplate时候应该将enableTransactionSupport设置为false,如果非要事务,可以使用官方推荐的callback的形式或者lua脚本。

//execute a transaction
List<Object> txResults = redisOperations.execute(new SessionCallback<List<Object>>() {
  public List<Object> execute(RedisOperations operations) throws DataAccessException {
    operations.multi();
    operations.opsForSet().add("key", "value1");

    // This will contain the results of all operations in the transaction
    return operations.exec();
  }
});
System.out.println("Number of items added to set: " + txResults.get(0));

参考

docs.spring.io/spring-data…

blog.csdn.net/weixin_4473…

github.com/spring-proj…

转载自:https://juejin.cn/post/7398045655794712585
评论
请登录