likes
comments
collection
share

基于Redission高级应用22-基于RAtomicLong原理及封装的工具类

作者站长头像
站长
· 阅读数 17

写作原由:

最近工作中,因为项目出现生成唯一code时候出现了并发,其实现原理以当前时间戳作为code并且以及追加后三位使用随机数进行补充,该项目正常使用几年都没有问题,最近还是生成了相同的Code.原由为相同时间下发两个相同单据,且随机数也一样,故此针对进行优化,使用RAtomicLong进行获取后三位随机保证不会出现相同值,其大致实现如下:

sb.append(System.currentTimeMillis());
String key = "cc_Code_autoIncrement";
RAtomicLong atomicLong=client.getAtomicLong(key);
long num = atomicLong.incrementAndGet()%100;
sb.append(String.format("%03d", num));

故此针对RAtomicLong 进一步了解和研究封装

引言:

RAtomicLong是 Redission 提供的一个分布式原子长整型类,它模仿了 Javajava.util.concurrent.atomic.AtomicLong类的行为,但是专为分布式环境设计。RAtomicLong` 使用 Redis 作为后端来保证操作的原子性和一致性。

实现原理

RAtomicLong 的实现依赖于 Redis 提供的原子操作,如 INCR, DECR, GETSET 等命令,以及 Lua 脚本来执行复合操作。以下是一些基本操作的实现方式:

  • Increment (INCR): RAtomicLong 使用 Redis 的 INCR 命令来原子性地增加存储在特定键下的值。
  • Decrement (DECR): 类似地,它使用 DECR 命令来原子性地减少值。
  • Get and Set (GETSET): 获取当前值并设置新值的操作可以通过 GETSET 命令实现,它原子性地替换值并返回旧值。
  • Compare and Set: 对于比较并设置的操作,Redission 可能使用 Lua 脚本来确保比较和设置的原子性。Lua 脚本在 Redis 中作为一个事务执行,因此可以保证操作过程中不会被其他命令中断。

通过使用这些命令,RAtomicLong 可以确保即使在多个客户端同时进行操作时,每个操作也都是原子性的,从而保证了分布式环境下的数据一致性。

优点

  1. 分布式原子性: RAtomicLong 提供跨多个节点的原子操作,对于构建分布式系统中的计数器和序列生成器非常有用。
  2. 高可用性和持久性: 由于基于 Redis,RAtomicLong 可以利用 Redis 的持久化和复制特性,确保数据的高可用性和持久性。
  3. 简单易用: RAtomicLong 提供了与 AtomicLong 类似的 API,使得开发者可以轻松迁移现有代码到分布式环境。
  4. 性能: Redis 的性能非常高,所以 RAtomicLong 在大多数情况下能提供很好的性能。

缺点

  1. 网络延迟: 由于操作依赖于网络通信,RAtomicLong 的性能受限于网络延迟,特别是在高频率更新的场景下,性能可能受到影响。
  2. Redis 单点故障: 尽管 Redis 可以配置为高可用模式,但如果没有正确配置,Redis 服务器可能成为单点故障。
  3. 资源消耗: 频繁地使用 RAtomicLong 可能会导致 Redis 服务器上的资源消耗增加,尤其是在高负载的系统中。
  4. 成本: 如果使用云服务提供的 Redis 实例,频繁的读写操作可能会增加成本。

总的来说,RAtomicLong 是一个在分布式环境中提供原子长整型操作的有力工具,但是开发者需要考虑其在特定应用场景下的性能和成本。

工具类:

import org.redisson.api.RAtomicLong;
import org.redisson.api.RedissonClient;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.LongUnaryOperator;
/**
 * @Author derek_smart
 * @Date 2024/6/18 10:41
 * @Description RAtomicLong 工具类
 */
public class AdvancedAtomicLong {

    private final RAtomicLong atomicLong;

    public AdvancedAtomicLong(RedissonClient redisson, String name) {
        this.atomicLong = redisson.getAtomicLong(name);
    }

    // 原子性地增加值,并在操作后执行回调
    public long incrementAndGet(LongUnaryOperator afterIncrement) {
        long result = atomicLong.incrementAndGet();
        return afterIncrement.applyAsLong(result);
    }

    // 异步版本的 incrementAndGet
    public CompletableFuture<Long> incrementAndGetAsync() {
        return atomicLong.incrementAndGetAsync().toCompletableFuture();
    }

    // 原子性地减少值,并在操作后执行回调
    public long decrementAndGet(LongUnaryOperator afterDecrement) {
        long result = atomicLong.decrementAndGet();
        return afterDecrement.applyAsLong(result);
    }

    // 获取当前值
    public long get() {
        return atomicLong.get();
    }

    // 设置当前值
    public void set(long newValue) {
        atomicLong.set(newValue);
    }

    // 如果当前值等于预期值,则以原子方式将该值设置为给定的更新值
    public boolean compareAndSet(long expect, long update) {
        return atomicLong.compareAndSet(expect, update);
    }

    // 获取并增加,带过期时间
    public long getAndAdd(long delta, long ttl, TimeUnit timeUnit) {
        long result = atomicLong.getAndAdd(delta);
        atomicLong.expire(ttl, timeUnit);
        return result;
    }

    // 分布式序列生成器,每次调用返回唯一序列号
    public long nextSequence() {
        return atomicLong.incrementAndGet();
    }

    // 设置值并指定过期时间
    public void setAndExpire(long newValue, long ttl, TimeUnit timeUnit) {
        atomicLong.set(newValue);
        atomicLong.expire(ttl, timeUnit);
    }

    // 带有过期时间的原子性增加
    public long incrementAndGetWithTTL(long ttl, TimeUnit timeUnit) {
        long result = atomicLong.incrementAndGet();
        atomicLong.expire(ttl, timeUnit);
        return result;
    }

    // 带有过期时间的原子性减少
    public long decrementAndGetWithTTL(long ttl, TimeUnit timeUnit) {
        long result = atomicLong.decrementAndGet();
        atomicLong.expire(ttl, timeUnit);
        return result;
    }

    // 删除这个原子长整型
    public void delete() {
        atomicLong.delete();
    }
}

基于Redission高级应用22-基于RAtomicLong原理及封装的工具类

AdvancedAtomicLong 是一个封装了 Redisson 的 RAtomicLong 的高级工具类,它提供了一系列的原子操作方法,并且增加了一些额外的功能,使得它在分布式环境中更加灵活和强大。

  • 构造方法:允许通过 Redisson 客户端和一个名称来创建 RAtomicLong 实例。
  • 增加与回调:提供 incrementAndGet 方法,该方法在原子性地增加值之后,执行一个回调函数。
  • 异步增加:提供 incrementAndGetAsync 方法,它返回一个 CompletableFuture,用于异步操作。
  • 减少与回调:提供 decrementAndGet 方法,该方法在原子性地减少值之后,执行一个回调函数。
  • 获取当前值:提供 get 方法来获取当前的值。
  • 设置当前值:提供 set 方法来设置一个新的值。
  • 比较并设置:提供 compareAndSet 方法,它在值符合预期时,原子性地更新值。
  • 获取并增加,带过期时间:提供 getAndAdd 方法,该方法在增加值后设置一个过期时间。
  • 分布式序列生成器:提供 nextSequence 方法,用于生成唯一的序列号。
  • 设置值并指定过期时间:提供 setAndExpire 方法,用于设置值并指定过期时间。
  • 带有过期时间的原子性增减incrementAndGetWithTTLdecrementAndGetWithTTL 方法不仅原子性地增减值,还在操作后设置过期时间。
  • 删除原子长整型:提供 delete 方法来删除当前的 RAtomicLong 实例。

时序图:

ClientAdvancedAtomicLongRAtomicLongRedissonClientRedisnew AdvancedAtomicLong(redisson, "name")getAtomicLong("name")Create RAtomicLong instanceEstablish connectionincrementAndGet(afterIncrement)incrementAndGet()INCR commandReturn incremented valueReturn incremented valueafterIncrement.applyAsLong(result)getAndAdd(delta, ttl, timeUnit)getAndAdd(delta)INCRBY commandReturn new valueReturn new valueexpire(ttl, timeUnit)EXPIRE commanddelete()delete()DEL commandClientAdvancedAtomicLongRAtomicLongRedissonClientRedis

测试类:

import org.redisson.Redisson;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RedissonClient;
/**
 * @Author derek_smart
 * @Date 2024/6/18 10:41
 * @Description RAtomicLong 测试循环类
 */
public class CyclicCounter {

    private static final long MAX_SEQUENCE = 999L; // 最大序列值
    private final RAtomicLong atomicLong;

    public CyclicCounter(RedissonClient redisson, String name) {
        this.atomicLong = redisson.getAtomicLong(name);
    }

    // 获取下一个循环数
    public long next() {
        while (true) {
            long current = atomicLong.get();
            long next = (current + 1) % (MAX_SEQUENCE + 1);
            if (atomicLong.compareAndSet(current, next)) {
                return next;
            }
        }
    }

    // 重置计数器到0
    public void reset() {
        atomicLong.set(0);
    }
}
/**
 * @Author derek_smart
 * @Date 2024/6/18 10:41
 * @Description RAtomicLong 测试
 */
// 使用场景和示例代码
class CyclicCounterExample {

    public static void main(String[] args) {
        // 假设你已经有了RedissonClient实例
        RedissonClient redisson = Redisson.create(); // 使用正确的配置创建实例
        CyclicCounter cyclicCounter = new CyclicCounter(redisson, "unique_counter_name");

        // 场景1: 生成循环的票据号
        long ticketNumber = cyclicCounter.next();
        System.out.println("Ticket Number: " + ticketNumber);

        // 场景2: 轮询资源
        // 假设有一个资源数组,我们要循环地分配这些资源
        String[] resources = {"Resource1", "Resource2", "Resource3"};
        long resourceIndex = cyclicCounter.next() % resources.length;
        System.out.println("Allocated " + resources[(int) resourceIndex]);

        // 场景3: 循环任务分配
        // 假设有一个任务列表,我们要循环地分配这些任务给工作线程
        Runnable[] tasks = {() -> System.out.println("Task1"),
                () -> System.out.println("Task2"),
                () -> System.out.println("Task3")};
        long taskIndex = cyclicCounter.next() % tasks.length;
        tasks[(int) taskIndex].run();

        // 注意: 请在应用程序关闭时关闭 Redisson 客户端
        redisson.shutdown();
    }
}

总结:

AdvancedAtomicLong 类通过封装 RAtomicLong,不仅提供了基本的原子长整型操作,还增加了回调机制、异步操作支持、过期时间设置等高级功能。这使得它特别适合在需要高度一致性和可靠性的分布式系统中使用,例如在实现分布式计数器、ID 生成器或其他需要原子性操作的场景。此外,类中提供的方法使得操作后的额外处理(如设置过期时间)变得简单,从而增强了原子操作的实用性和灵活性。

转载自:https://juejin.cn/post/7382051110688243738
评论
请登录