likes
comments
collection
share

Redis只用来做缓存?来认识一下它其他强大的能力吧。

作者站长头像
站长
· 阅读数 63

当今互联网应用中,随着业务的发展,数据量越来越大,查询效率越来越高,对于时序数据的存储、查询和分析需求也越来越强烈,这时候 Redis 就成为了首选的方案之一。

Redis 提供了多种数据结构,如字符串、哈希表、列表、集合、有序集合等,每种数据结构都具备不同的特性,可以满足不同的业务需求。其中,有序集合的 score 可以存储时间戳,非常适合用于存储时序数据,例如监控指标、日志、统计数据、报表等。下面举几个时序数据场景例子:

  1. 监控指标:

假设我们有一个服务,名为 my_service,需要监控它的请求响应时间。我们可以使用 Redis 有序集合来存储数据,每个请求的响应时间作为 value,请求的时间戳作为 score。示例如下:

> ZADD requests:my_service 1613115560 350
(integer) 1
> ZADD requests:my_service 1613115570 450
(integer) 1
> ZADD requests:my_service 1613115580 550
(integer) 1

这些命令向名为 requests:my_service 的有序集合中添加了 3 条数据,分别是 2021 年 2 月 12 日 10:19:20 的请求响应时间为 350ms,10:19:30 的请求响应时间为 450ms,10:19:40 的请求响应时间为 550ms。

接下来,我们来看一下如何使用 Redis 命令查询这些监控指标的数据。下面的命令会返回 requests:my_service 有序集合内所有数据:

> ZRANGE requests:my_service 0 -1 WITHSCORES
1) "350"
2) "1613115560"
3) "450"
4) "1613115570"
5) "550"
6) "1613115580"

命令执行结果表示,数据按照 score 排序,其中 score 是时间戳(单位为秒),value 是请求响应时间(单位为毫秒)。同时,使用 ZRANGEBYSCORE 命令可以获取一段时间范围内的监控数据,例如:

> ZRANGEBYSCORE requests:my_service 1613115570 1613115580 WITHSCORES
1) "450"
2) "1613115570"
3) "550"
4) "1613115580"

这条命令返回了 requests:my_service 有序集合中在时间戳 1613115570 到 1613115580 之间的所有数据。

  1. 日志:

假设我们要存储的日志是一条指定格式的字符串,包含时间戳和日志内容。使用 Redis 列表存储日志数据,每次写入新日志时可以使用 Redis 列表的 rpush 命令将数据写入列表的尾部。示例如下:

> RPUSH logs:my_logs 2021-02-12 10:30:00 INFO message 1
(integer) 1
> RPUSH logs:my_logs 2021-02-12 10:30:01 ERROR message 2
(integer) 2
> RPUSH logs:my_logs 2021-02-12 10:30:02 WARN message 3
(integer) 3

这些命令向名为 logs:my_logs 的列表尾部添加 3 条数据,分别是 2021 年 2 月 12 日 10:30:00 的 INFO 级别消息,10:30:01 的 ERROR 级别消息和 10:30:02 的 WARN 级别消息。

接下来,我们来看一下如何使用 Redis 命令查询这些日志数据。下面的命令会返回 logs:my_logs 列表内所有数据:

> LRANGE logs:my_logs 0 -1
1) "2021-02-12 10:30:00 INFO message 1"
2) "2021-02-12 10:30:01 ERROR message 2"
3) "2021-02-12 10:30:02 WARN message 3"

命令执行结果表示,数据按照插入顺序排序,从列表头部开始遍历。使用 ZRANGEBYSCORE 命令可以获取一段时间范围内的日志数据,例如:

> ZRANGEBYSCORE logs:my_logs 1613115570 1613115580
1) "2021-02-12 10:30:01 ERROR message 2"

这条命令返回了 logs:my_logs 列表中在时间戳 1613115570 到 1613115580 之间的日志数据,但因为日志数据并没有具体的 time stamp 做 score,所以这个例子只是演示这个命令的用法,实际上应该使用有序集合去查询时间区间内的日志数据。

  1. 统计数据:

假设我们要存储的统计数据是一些具体业务相关的计数器,例如每分钟用户访问量。我们可以使用 Redis 有序集合来存储统计数据,key 是计数器名称,score 是时间戳,value 是具体的计数值(例如访问次数)。示例如下:

> ZADD visits 1613167800 100
(integer) 1
> ZADD visits 1613167860 120
(integer) 1
> ZADD visits 1613167920 150
(integer) 1

这些命令向名为 visits 的有序集合中添加了 3 条数据,分别是 2021 年 2 月 12 日 23:30:00 的访问次数为 100,23:31:00 的访问次数为 120,23:32:00 的访问次数为 150。

接下来,我们来看一下如何使用 Redis 命令查询这些统计数据。下面的命令会返回 visits 有序集合内所有数据:

> ZRANGE visits 0 -1 WITHSCORES
1) "100"
2) "1613167800"
3) "120"
4) "1613167860"
5) "150"
6) "1613167920"

命令执行结果表示,数据按照 score 排序,其中 score 是时间戳(单位为秒),value 是访问次数。使用 ZRANGEBYSCORE 命令可以获取一段时间范围内的统计数据,例如:

> ZRANGEBYSCORE visits 1613167860 1613167920 WITHSCORES
1) "120"
2) "1613167860"
3) "150"
4) "1613167920"

这条命令返回了 visits 有序集合中在时间戳 1613167860 到 1613167920 之间的所有数据。

使用 Redis 有序集合中的另一个常见场景是计算 TopN,例如找出访问次数最多的前 10 个计数器,可以使用命令 ZREVRANGE visits 0 9 WITHSCORES,它返回 visits 有序集合中前 10 个元素,按照 value 从大到小排列,并且返回每个元素的 score。

需求实践:

这是一个实时监控系统,主要用于记录和统计服务发生的错误情况,以便在错误数量超过预设阈值时发出警告信息。

系统每秒钟生成随机错误数据,并将它们存储到 Redis 数据库中。每隔 10 秒钟,系统会从 Redis 数据库中聚合最近一分钟内的错误数据,并按照服务名和错误类型进行统计计算。如果某个服务的错误数量超过预设阈值,系统会输出一条警告信息提示用户。

整个系统的目标是帮助用户及时了解每个服务的错误情况,以便及时采取相应的措施,保障服务的稳定性和可靠性。

代码示例:

模拟接口服务异常数据

package com.example.demo.redis;

import redis.clients.jedis.Jedis;
import java.util.*;

public class DataGenerator {
    // 定义服务列表
    private static final List<String> SERVICES = Arrays.asList("service1", "service2", "service3");
    // 定义错误列表
    public static final List<String> ERRORS = Arrays.asList("invalid_param", "timeout", "unknown_error");

    /**
     * 生成数据
     *
     * @param total 数据总数
     * @param jedis Redis 客户端连接
     */
    public static void generateData(int total, Jedis jedis) {
        Random rand = new Random(); // 初始化随机数生成器
        long currentTimestamp = System.currentTimeMillis() / 1000; // 获取当前时间戳,精确到秒
        long startTimestamp = currentTimestamp - 60; // 计算起始时间戳,为当前时间戳减去 60 秒

        for (int i = 0; i < total; i++) { // 循环 total 次,生成 total 条数据
            String service = SERVICES.get(rand.nextInt(SERVICES.size())); // 随机选择一个服务
            String error = ERRORS.get(rand.nextInt(ERRORS.size())); // 随机选择一个错误
            long timestamp = startTimestamp + rand.nextInt(60); // 生成一个随机时间戳,精确到秒,范围为起始时间戳到当前时间戳
            int count = 1;
            String item = String.format("%s:%s:%d:%d", service, error, timestamp, count);
            jedis.zadd("error_data", timestamp, item); // 将错误数据存储到 Redis 数据库中
        }
    }
}

聚合异常数据,达到阈值告警

package com.example.demo.redis;

import redis.clients.jedis.Jedis;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class DataAggregator {
    private static final String REDIS_HOST = "localhost"; // Redis 主机名
    private static final int REDIS_PORT = 6379; // Redis 端口号
    private static final int THRESHOLD = 100; // 预设阈值,当错误数量超过该阈值时触发警告

    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); // 创建一个只有一个线程的定时任务执行程序
        Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT); // 创建 Redis 客户端连接

        scheduler.scheduleAtFixedRate(() -> {
            // 并发情况下,线程会阻塞
            synchronized (jedis) {
                DataGenerator.generateData(20, jedis); // 生成随机错误数据,并将其存储到 Redis 数据库中
            }
        }, 0, 1, TimeUnit.SECONDS); // 定时任务间隔为 1 秒钟

        scheduler.scheduleAtFixedRate(() -> { // 定时任务逻辑
            synchronized (jedis) {
                long currentTimestamp = System.currentTimeMillis() / 1000; // 获取当前时间戳,精确到秒
                long startTimestamp = currentTimestamp - 60; // 计算起始时间戳,为当前时间戳减去 60 秒
                Set<String> data = jedis.zrangeByScore("error_data", startTimestamp, currentTimestamp); // 使用 zrange 命令获取指定时间范围内的数据

                Map<String, Map<String, Integer>> countMap = new HashMap<>(); // 用于记录聚合后的服务和错误数量信息
                for (String item : data) { // 遍历所有错误数据
                    String[] parts = item.split(":"); // 以冒号为分隔符,将错误数据分割为部分
                    String service = parts[0]; // 获取服务名
                    String error = parts[1]; // 获取错误类型
                    long timestamp = Long.parseLong(parts[2]); // 获取时间戳
                    int count = Integer.parseInt(parts[3]); // 获取错误数量

                    if (timestamp < startTimestamp) { // 如果时间戳早于起始时间戳,则跳过该数据
                        continue;
                    }

                    Map<String, Integer> serviceCountMap = countMap.computeIfAbsent(service, k -> new HashMap<>()); // 获取指定服务的错误数量信息
                    serviceCountMap.put(error, serviceCountMap.getOrDefault(error, 0) + count); // 更新指定服务和错误类型的错误数量信息
                }

                List<String> alerts = new ArrayList<>(); // 用于存储警告信息
                for (String service : countMap.keySet()) { // 遍历服务名列表
                    Map<String, Integer> serviceCountMap = countMap.get(service); // 获取服务和错误数量信息
                    int totalErrors = 0;
                    for (String error : serviceCountMap.keySet()) { // 遍历错误列表
                        int count = serviceCountMap.get(error); // 获取错误数量
                        totalErrors += count;
                    }
                    if (totalErrors > THRESHOLD) { // 如果错误数量超过预设阈值
                        alerts.add(service + " has too many errors: " + serviceCountMap.keySet() + ", count: " + totalErrors); // 将该服务名添加到警告信息列表中
                    }
                }
                if (!alerts.isEmpty()) { // 如果警告信息列表不为空
                    System.out.println(String.join("\n", alerts)); // 打印警告信息
                }
            }
        }, 0, 10, TimeUnit.SECONDS); // 定时任务间隔为 10 秒

        // 关闭 Redis 连接
        jedis.close();
    }
}

以上代码可正常运行,有疑问可以交流~~

欢迎关注公众号:程序员的思考与落地

公众号提供大量实践案例,Java入门者不容错过哦,可以交流!!

转载自:https://juejin.cn/post/7219669309537484837
评论
请登录