likes
comments
collection
share

时间轮在Redission中的应用和源码分析

作者站长头像
站长
· 阅读数 7

本篇文章主要从Redission中WatchDog机制来作为切入点;看下Redission是如何应用时间轮机制的,以及在Redission中时间轮机制解决了什么问题?

Redission是什么?

Redisson 是架设在 Redis基础上的一个 Java 驻内存数据网格框架,充分利用 Redis 键值数据库提供的一系列优势,基于 Java 实用工具包中常用接口,为使用者提供了 一系列具有分布式特性的常用工具类。

时间轮在Redssion中解决了什么问题?

使用过Redission的可能会知道,Redission针对于未设置过期时间的Key会采用WatchDog机制保证Key不会过期;其中锁续期就是使用时间轮的方式实现的。

时间轮是什么?

时间轮这个名次想必大家都耳熟能详了,在Kafka、Zookeeper、Netty等技术中都有使用时间轮的方式。

简单来说:时间轮是一种高效利用线程资源进行批量化调度的一种调度模型。把大批量的调度任务全部绑定到同一个调度器上,使用这一个调度器来进行所有任务的管理、触发、以及运行。

时间轮可以高效的管理延迟任务、通知任务、周期任务;后续在设计开发中如果有类似场景可以借鉴时间轮的思路,完成更高效的设计。

时间轮在Redission中的应用和源码分析

  • 从上图可以看出:时间轮就像手表的表盘一样,表针每一次移动一格,每一个小格在时间轮中称为bucket,每一个bucket内部都是一个双向链表;通过一个指针一次移动一个格子,当到达某一个bucket的时候就会遍历双向链表,从而执行双向链表中的延迟任务。

  • 结合Redission时间轮的代码:在Redission中,使用的时间轮是Netty包提供的;上图中整个表盘是一个时间轮(HashedWheelTimer),上图中的每一个bucket是一个(HashedWheelBucket),链表上的每一个任务是一个(HashedWheelTimeout);时间轮由多个时间格组成,每一个时间格代表当前时间轮的基本时间调度(tickDuration),其中时间轮在创建的时候格子数量就是固定的了。

  • 举个例子:在上图中一共有8个格子,假设每一个时间格子的调度时间为1s,那么时间轮转动一圈需要8s。每秒钟指针会沿着同一个方向旋转,每一次移动一个格子,当移动到指定的bucket的时候,开始遍历双向链表,将任务交给任务线程池来执行任务。

时间轮的原理

时间轮整体分为:时间轮的创建、添加任务、任务执行

  • 时间轮的创建
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        /**
         * @ticksPerWheel: 创建时间轮传入的数字
         * findNextPositivePowerOfTwo=方法:通过将传入的数字转换成2^n处理
         */
        ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);

        // 根据ticksPerWheel创建时间轮数组,也就是时间轮里面有多少个格子
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i ++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
}

时间轮在Redission中的应用和源码分析

如上图:如果我们创建了一个大小为8的时间轮:那么在初始化的时候就会创建一个长度为8的 HashedWheelBucket 数组;在创建时间轮的时候我们指定的 TickDuration = 100那么每一个时间调度就是100;

  • 任务的添加:
private void transferTimeoutsToBuckets() {
            // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
            // 为了避免影响任务的执行,一次最多只能循环处理10W个任务
            for (int i = 0; i < 100000; i++) {
                // 从队列中获取到需要添加到时间轮的任务信息
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    // Was cancelled in the meantime.
                    continue;
                }
                /**
                 * @deadline: 任务的执行时间
                 * @tickDuration: 时间轮每个格子的时间间隔,在Redission中就是100ms
                 * @calculated 根据任务的执行时间除以每个格子的时间间隔;可以算出一个值(表示需要经过多少tick,指针每移动一次,tick + 1)
                 * 时间单位为 ns
                */
                long calculated = timeout.deadline / tickDuration;
                // 通过calculated减去已经走过的tick数量除以时间轮数组大小;可以获取到时间轮需要转几圈才能够到达;remainingRounds圈数
                timeout.remainingRounds = (calculated - tick) / wheel.length;
                // 为了避免加入到已经过去的时间轮:所以这里通过取 calculated 和 tick的最大值
                final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
                // 通过与运算:算出当前timeout任务应该放在时间轮中的数组下标
                int stopIndex = (int) (ticks & mask);
                // 通过数组小标获取时间轮中对应的bucket;将这个timeout添加到相应的bucket对应的双向链表中
                HashedWheelBucket bucket = wheel[stopIndex];
                bucket.addTimeout(timeout);
            }
        }

时间轮在Redission中的应用和源码分析

  • 时间轮任务的添加:

    • 在Redission添加任务的时候:只是放在了一个延迟队列中,并没有真正的放入到时间轮的bucket中;通过Worker线程每次移动到一个bucket时,将timeout任务依据duration放在对应的时间轮bucket中。
    • 在上述的代码中可以看到:通过时间轮的duration时间,计算需要经过多少次tick才开始执行,同时计算任务需要指针沿着时间轮转多少圈才能够执行。
    • 从上图中也可以看到:每一个bucket都会对应一个双向链表,通过head和tail链接双向链表的头部和尾部
  • 任务的执行

public void expireTimeouts(long deadline) {
            // 获取双线链表的head
            HashedWheelTimeout timeout = head;
            // 沿着双向链表:处理所有的任务
            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                /**
                 * 任务被执行的条件:
                 * 1. @remainingRounds <= 0表示任务就在本圈执行
                 * 2. @timeout.deadline < deadline: 当前任务的时间小于等于当前时间,表示任务时间已经到达
                 */
                if (timeout.remainingRounds <= 0) {
                    next = remove(timeout);
                    if (timeout.deadline <= deadline) {
                        // 具体执行任务信息
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds --;
                }
                timeout = next;
            }
        }

时间轮在Redission中的应用和源码分析

  • Worker线程按照每次的时间间隔转动后,得到该bucket里面的任务,通过head获取所有任务信息,逐一进行遍历判断,有以下两个条件:
    • 当前任务 remainingRounds = 0;表示当前任务圈次已经开始执行。
    • 当前任务已经达到了delay时间;也就是 timeout.deadline <= deadline
  • 当任务执行的时候:会将到达执行时间的任务交给一个任务线程池去处理;这么设计的目的也是为了避免Worker线程被阻塞而影响指针的移动;

源码分析

时间轮的创建

这里使用的是Netty包提供的;

  • tickDuration = 100; 表示每个时间调度,代表当前时间轮的基本时间调度为100ms,也就是说指针100ms跳动一次,每次跳动一个bucket。
  • ticksPerWheel = 1024; 表示当前时间轮一共有1024个时间轮,分配的窗格越多占用的内存就越大,同时时间轮中格子的数量必须是 2^n 个。
  • leakDetection: 是否开启内存泄漏检测。
private void initTimer() {
    int minTimeout = Math.min(config.getRetryInterval(), config.getTimeout());
    if (minTimeout % 100 != 0) {
        minTimeout = (minTimeout % 100) / 2;
    } else if (minTimeout == 100) {
        minTimeout = 50;
    } else {
        minTimeout = 100;
    }

    /**
     * @retryInterval = 1500; @timeout = 3000
     * 指定时间轮格子为1024个;tickDuration为重试间隔与
     */
    timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);

    connectionWatcher = new IdleConnectionWatcher(group, config);
}

HashedWheelTimer 构造

在创建时间轮的过程中:会将传的时间轮格子数量转换成2^n个;便于后续mask计算以及其他操作;可以看到其实时间轮就是一个数组,每一个数组会对应一个双向链表。

public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts, Executor taskExecutor) {

        checkNotNull(threadFactory, "threadFactory");
        checkNotNull(unit, "unit");
        checkPositive(tickDuration, "tickDuration");
        checkPositive(ticksPerWheel, "ticksPerWheel");
        this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");

        // 创建时间轮:实际进行了创建数组,数组大小为ticksPerWheel 1024
        wheel = createWheel(ticksPerWheel);
        /**
         * @param mask 主要用于快速取余运算:由于wheel长度一定为2^n,2^n - 1的低位一定都是1;
         * 如果想要求出一个时间应该在哪一个时间格中,通常会用 tick % wheel.length 但是性能比较低;使用 tick & mask替代
         */
        mask = wheel.length - 1;

        // 转换成ms
        long duration = unit.toNanos(tickDuration);

        // Prevent overflow.
        if (duration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }

        if (duration < MILLISECOND_NANOS) {
            logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
                        tickDuration, MILLISECOND_NANOS);
            this.tickDuration = MILLISECOND_NANOS;
        } else {
            this.tickDuration = duration;
        }

        workerThread = threadFactory.newThread(worker);

        leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

        this.maxPendingTimeouts = maxPendingTimeouts;

        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        /**
         * @ticksPerWheel: 创建时间轮传入的数字
         * findNextPositivePowerOfTwo=方法:通过将传入的数字转换成2^n处理
         */
        ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);

        // 根据ticksPerWheel创建时间轮数组,也就是时间轮里面有多少个格子
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i ++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
}

添加任务到时间轮

 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        // task、unit 参数合法性校验
        checkNotNull(task, "task");
        checkNotNull(unit, "unit");
        // 当前等待的任务数
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
        // 如果等待的任务数已经超过最大等待任务数,那么添加失败
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }

        // 如果时间轮没有启动,则通过start方法进行启动
        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.

        // 计算任务的延迟时间:通过当前时间 + 延迟时间 - 时间轮启动时间
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        //在delay为正数的情况下,deadline是不可能为负数
        //如果为负数,那么说明超过了long的最大值
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        // 构造一个timeout任务:理论上timeout任务创建后会被加入到时间轮的某一个格子中,当时这里并没有真正的放在时间轮的格子中
        // 而是先加入到一个阻塞队列中,然后等待时间轮执行到下一个格子时,在从中取出最多10W个timeout任务,添加到时间轮的格子中。
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

在创建timeout任务的时候,只是将这个任务加到了一个阻塞队列中,并没有真正的放在时间轮的格子中;后续通过Worker线程执行到下一个格子的时候再加入;这里可以看到在创建timeout任务的时候用到了 start()用来启动时间轮。

下面是Start代码具体实现

public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
            //workerState一开始的时候是0(WORKER_STATE_INIT),然后才会设置为1(WORKER_STATE_STARTED)
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
        // 等待worker线程初始化时间轮的启动时间
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

Worker任务的执行

下面是Worker线程具体执行的代码:

  • Worker在启动后会通知其他的线程可以继续执行了;
  • 通过睡眠sleep的方式获取下一个时间格的执行时间;里面主要通过循环的方式不断判断当前时间格子是否到达;如果没有到达会进行sleep
  • 在这里会将延迟队列中的任务真正的添加到时间轮中;
  • 针对到期的任务也会在这里进行处理。
public void run() {
            // 获取系统当前ns时间
            startTime = System.nanoTime();
            if (startTime == 0) {
                // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
                // 时间轮启动时间不能为0,startTime = 0表示时间轮没有初始化;这里确保时间轮初始化过 startTime != 0
                startTime = 1; 
            }

            // <span data-word-id="1060" class="abbreviate-word"><span data-word-id="1060" class="abbreviate-word">Notify</span></span> the other threads waiting for the initialization at start().
            // 在newtimeout 的时候 start会wait通知
            startTimeInitialized.countDown();

            do {
                // 通过睡眠sleep的方式获取下一个时间格的执行时间;里面主要通过循环的方式不断判断当前时间格子是否到达;如果没有到达会进行sleep
                // deadline 当前时间格时间
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    // 根据 tick & mask 获取到当前已经到达的时间格数组下标
                    int idx = (int) (tick & mask);
                    // 处理取消的timeout任务
                    processCancelledTasks();
                    // 获取到指针指向的bucket
                    HashedWheelBucket bucket =
                            wheel[idx];
                    // 这里就是进行真正的任务添加到时间轮的操作;里面最多处理10W格任务添加到时间轮的格子中
                    // 可以看到netty时间轮是在指针每指向一个bucket的时候最多将10W个任务放在时间轮中
                    transferTimeoutsToBuckets();
                    // 根据bucket,沿着双向链表遍历所有的timeout任务;执行到时间的任务
                    bucket.expireTimeouts(deadline);
                    // 指针再走一步
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

            // Fill the unprocessedTimeouts so we can return them from stop() method.
            for (HashedWheelBucket bucket: wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (;;) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            processCancelledTasks();
        }

获取下一个时间格的时间

通过时间调度 * (tick + 1) 获取下一个bucket的时间;如果没有到达那么会通过sleep的方式进行等待。

如果下一个时间格时间已经到达:会返回当前的时间

private long waitForNextTick() {
            /**
             * @tickDuration:每一个时间格的时间
             * @tick 当前指针执行的次数,指针每移动一次tick + 1
             * @deadline: 通过tickDuration * (tick + 1) 获得下一个格子执行的时间
             */
            long deadline = tickDuration * (tick + 1);

            for (;;) {
                // 当前时间:系统时间 - 时间轮启动时间
                final long currentTime = System.nanoTime() - startTime;
                // 睡眠时间:下一个格子的时间 - 当前时间轮时间
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

                // 时间到达:如果 currentTime == Long.MIN_VALUE 返回 -Long.MAX_VALUE
                // 因为只有中断才会返回 Long.MIN_VALUE,所以这里进行了一些处理
                if (sleepTimeMs <= 0) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        return currentTime;
                    }
                }

                // Check if we run on windows, as if thats the case we will need
                // to round the sleepTime as workaround for a bug that only affect
                // the JVM if it runs on windows.
                //
                // See https://github.com/netty/netty/issues/356
                if (PlatformDependent.isWindows()) {
                    sleepTimeMs = sleepTimeMs / 10 * 10;
                    if (sleepTimeMs == 0) {
                        sleepTimeMs = 1;
                    }
                }

                try {
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }

取消的timeout任务处理

private void processCancelledTasks() {
    for (;;) {
        HashedWheelTimeout timeout = cancelledTimeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        try {
            timeout.remove();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown while process a cancellation task", t);
            }
        }
    }
}

将延迟队列中的任务添加到时间轮中

private void transferTimeoutsToBuckets() {
            // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
            // 为了避免影响任务的执行,一次最多只能循环处理10W个任务
            for (int i = 0; i < 100000; i++) {
                // 从队列中获取到需要添加到时间轮的任务信息
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    // Was cancelled in the meantime.
                    continue;
                }
                /**
                 * @deadline: 任务的执行时间
                 * @tickDuration: 时间轮每个格子的时间间隔,在Redission中就是100ms
                 * @calculated 根据任务的执行时间除以每个格子的时间间隔;可以算出一个值(表示需要经过多少tick,指针每移动一次,tick + 1)
                 * 时间单位为 ns
                */
                long calculated = timeout.deadline / tickDuration;
                // 通过calculated减去已经走过的tick数量除以时间轮数组大小;可以获取到时间轮需要转几圈才能够到达;remainingRounds圈数
                timeout.remainingRounds = (calculated - tick) / wheel.length;
                // 为了避免加入到已经过去的时间轮:所以这里通过取 calculated 和 tick的最大值
                final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
                // 通过与运算:算出当前timeout任务应该放在时间轮中的数组下标
                int stopIndex = (int) (ticks & mask);
                // 通过数组小标获取时间轮中对应的bucket;将这个timeout添加到相应的bucket对应的双向链表中
                HashedWheelBucket bucket = wheel[stopIndex];
                bucket.addTimeout(timeout);
            }
        }

任务的执行

任务执行这里可以看到:当某一个timeout任务delay到达的时候,会通过 taskExecutor 来执行;worker线程继续执行时间格移动操作。

public void expireTimeouts(long deadline) {
            // 获取双线链表的head
            HashedWheelTimeout timeout = head;
            // 沿着双向链表:处理所有的任务
            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                /**
                 * 任务被执行的条件:
                 * 1. @remainingRounds <= 0表示任务就在本圈执行
                 * 2. @timeout.deadline < deadline: 当前任务的时间小于等于当前时间,表示任务时间已经到达
                 */
                if (timeout.remainingRounds <= 0) {
                    next = remove(timeout);
                    if (timeout.deadline <= deadline) {
                        // 具体执行任务信息
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds --;
                }
                timeout = next;
            }
        }

Redission时间轮任务做了什么?

Redission通过时间轮不断的进行锁续期:针对没有设置超时时间的KEY;可以一直进行锁续期;通过执行lua脚本实现。

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    /**
     * 通过判断:hexists key:lock_key value: thread_id用于判断锁重入
     * 重制锁过期时间
     */
    return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

总结

  • 时间轮可以高效的管理延迟任务、通知任务、周期任务;后续在设计开发中如果有类似场景可以借鉴时间轮的思路,完成更高效的设计。
  • 从时间轮的设计思路中可以学习到:时间轮的实现、Netty网络架构的实现,都是通过主线程和工作线程实现的;这样可以保证异步执行不受影响,同时可以提高任务处理效率。
  • 时间轮在设计上巧妙的时间用数组的取余的方式来实现无限循环,通过一个指针可以反复遍历每一个时间格;同时针对每一个时间格子通过双向链表存储所有的任务信息。
  • 每一次添加一个timeout任务的时候:先放到一个延迟队列中,随后worker线程移动到下一个时间格的时候再真正添加到时间轮中;这里看到了针对于线程资源的合理利用。
转载自:https://juejin.cn/post/7358518759117553727
评论
请登录