🏆 一文 掌握 Redis的缓存双写一致性
一、什么是Redis的缓存双写一致性
-
Redis中有数据,则必须保证和数据库中的数据是相同的
-
Redis中没有数据,则从数据库的查询最新的数据回写到Redis中
二、Redsi的缓存操作分类
2.1 只读缓存
Redis的只读缓存模式是一种缓存策略,其中数据首先从后端数据库加载到Redis缓存中。当进行读操作时,如果请求的数据在Redis缓存中命中(即存在),则直接返回缓存中的数据,不访问后端数据库。
如果Redis缓存不命中(即数据不存在于缓存中),则直接从后端数据库中读取数据,没有回写机制
2.2 读写缓存
Redis同时接受读和写请求。在读写缓存模式下,当业务应用进行数据访问时,它会首先查询Redis缓存中是否保存了相应的数据。
如果Redis缓存命中(即数据存在于缓存中),则对于读请求,它会直接从缓存中返回数据;对于写请求,它会更新缓存中的数据,并可能根据配置策略将更新后的数据异步写回后端数据库。这种策略可以减少对后端数据库的访问压力,提高系统的性能和响应速度。
如果Redis缓存不命中(即数据不存在于缓存中),则对于读请求,它会从后端数据库中读取数据,并将其加载到缓存中,然后再返回给请求方;对于写请求,它会将新数据写入后端数据库,并在缓存中创建或更新相应的数据项。
2.2.1 同步直写策略
- 将数据写入数据库的同时也要更新Redis,保持Redis与数据库的内容是同步一致的;
2.2.2 异步缓写策略
-
在系统运行期间,数据库的数据有所更新,但是允许Redis更新时间存在一定是误差
-
如果更新失败,我们可以借助消息中间件来进行修补,重新写入数据到缓存中
2.2.3 代码实现
① 小厂,并发量很低 <=> qps很小
public Bottom findBottomById(Integer id) {
final String CACHE_KEY_Bottom = "bottom:";
Bottom bottom = null;
String key = CACHE_KEY_Bottom + id;
//1. 先查Redis,查不到再去查Mysql
bottom = (Bottom) redisTemplate.opsForValue().get(key);
if (bottom == null) {
//2. 查询Mysql
bottom = bottomMapper.selectByKey(id);
if (bottom == null) {
return bottom;
} else {
//3. mysql有则回写到Redis
redisTemplate.opsForValue().set(key, bottom);
}
}
return bottom;
}
② 大厂 ,并发量很高 <=> qps很高
public Bottom findBottomById(Integer id) {
final String CACHE_KEY_Bottom = "bottom:";
Bottom bottom = null;
String key = CACHE_KEY_Bottom + id;
// 1. 先查Redis
bottom = (Bottom) redisTemplate.opsForValue().get(key);
if (bottom == null) {
// 2. Redis为空,加锁再去查
synchronized (UserService.class) {
//3. 第2次查询redis
bottom = (Bottom) redisTemplate.opsForValue().get(key);
//4 . 第二次查询Redis为空,查询Mysql
if (bottom == null) {
//5. 查询Mysql
bottom = bottomMapper.selectByKey(id);
if (bottom == null) {
return null;
} else {
//6. 查到数据,回写到Redis
redisTemplate.opsForValue().setIfAbsent(key, bottom, 7L, TimeUnit.DAYS);
}
}
}
}
return bottom;
}
三、缓存的更新策略
3.1 先更新数据库,再更新缓存
- 回写Redis的时候可能出现失败,则出现Redis与Mysql数据不一致
- 高并发情况下,线程的快慢不一样,导致Redis的回写时间不同造成Redis与Mysql数据不一致
3.2 先更新缓存,再更新数据库
- 高并发情况下,线程的快慢不一样,导致Redis要写入Mysql不同步,也会导致数据不一致
3.3 先删除缓存,再更新数据库
- 高并发的情况下,如果数据库更新失败或超时或返回不及时,就会导致后续的请求线程访问缓存时没有数据缓存缺失,没有命中马上去读my那么从数据库当中读了旧数据,导致数据不一致
解决思路:延迟双删
延迟双删的基本步骤如下:
- 删除缓存:当数据在数据库中更新后,首先立即删除缓存中的数据。
- 更新数据库:然后更新数据库中的数据。
- 设置睡眠时间:等待一段时间后(例如,几百毫秒),再次删除缓存中的数据。
睡眠时间的设定为:线程A的睡眠时间 大于线程B读取数据再写入缓存的时间
代码实现:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public Bottom findBottomById(Integer id) {
final String CACHE_KEY_Bottom = "bottom:";
Bottom bottom = null;
String key = CACHE_KEY_Bottom + id;
// 1. 从Redis中获取数据
bottom = (Bottom) redisTemplate.opsForValue().get(key);
if (bottom == null) {
// 2. 如果Redis中没有数据,则加锁查询数据库
synchronized (UserService.class) {
// 再次从Redis中获取数据,防止在加锁期间其他线程已经更新了Redis
bottom = (Bottom) redisTemplate.opsForValue().get(key);
if (bottom == null) {
// 3. 从数据库中查询数据
bottom = bottomMapper.selectByKey(id);
if (bottom != null) {
// 4. 将数据写入Redis
redisTemplate.opsForValue().set(key, bottom, 7L, TimeUnit.DAYS);
// 5. 使用延迟队列或者定时任务来实现延迟删除
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.schedule(() -> {
// 在延迟一段时间后再次删除缓存
redisTemplate.delete(key);
}, 500, TimeUnit.MILLISECONDS); // 延迟500毫秒后删除
}
}
}
}
return bottom;
}
3.4 先更新数据库,再删除缓存
- 假如缓存删除失败或者来不及,导致请求再次访问redis时缓存命中,读取到的是缓存旧值。
解决思路:使用消息中间件
- 更新数据库数据
- 数据库会将操作信息写入binlog日志当中
- 订阅程序提取出所需要的数据以及key
- 另起一段非业务代码,获得该信息
- 尝试删除缓存操作,发现删除失败
- 将这些信息发送至消息队列
- 重新从消息队列中获得该数据,重试操作。
四、canal的实战方案
4.1 何为Canal
Canal
是一个用于 MySQL 数据库变更数据捕获(Change Data Capture,简称 CDC)的开源工具,由阿里巴巴开发并开源。
其主要功能是监听 MySQL 的 binlog(二进制日志),解析 binlog 中的数据变更事件(如 INSERT、UPDATE、DELETE),并将这些事件发布到消息队列(如 Kafka)中,以便其他系统或应用能够实时消费这些变更数据。
4.2 Canal的工作原理
Canal的工作原理主要涉及以下几个步骤:
-
连接MySQL Master: Canal模拟作为MySQL的Slave,连接到MySQL Master上,并建立与Master的二进制日志(binlog)的订阅。
-
解析binlog: 当MySQL Master上的数据发生变化(INSERT、UPDATE、DELETE等)时,这些变更会写入到binlog中。Canal作为Slave连接到Master后,会读取Master上的binlog,并通过解析binlog来获取数据变更事件。
-
数据变更事件发布: Canal将解析得到的数据变更事件转换成标准化的格式,然后发布到消息队列(如Kafka)中。这样,其他系统或应用就可以订阅这些事件,并实时消费处理。
-
消费数据变更事件: 在你的业务场景中,可以有一个或多个消费者(Consumer)订阅Canal发布到Kafka的数据变更事件。当事件到达时,消费者会读取事件内容,并根据业务逻辑进行相应的处理。例如,在缓存更新场景中,消费者可以读取变更事件,并更新Redis中的缓存数据。
-
保持数据一致性: 通过实时监听和处理MySQL的数据变更事件,Canal能够帮助保持其他系统(如Redis)与MySQL的数据一致性。当MySQL中的数据发生变化时,通过Canal和消费者的协作,可以实时更新其他系统中的数据,确保数据的一致性。
4.3 Canal的代码实现
4.3.1 配置Mysql
①查看mysql版本
select VERSION()
②当前的主机二进制日志
show master status
③查看show variables like 'log_bin';
show variables like 'log_bin'
④开启 MySQL的binlog写入功能:修改mysql的配置文件mysql.cnf
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复
⑤重启mysql
docker restart 14a
⑥再次查看show variables like 'log_bin';
⑦授权canal连接MySQL账号
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'cancal';
FLUSH PRIVILEGES;
select host,user,plugin from mysql.user ;
4.3.2 配置Canal
1. 下载
2.解压
创建目录
mkdir /root/mycanal
将压缩包放入并解压
tar -zxvf
3.配置
修改/mycanal/conf/example路径下instance.properties文件
4.启动
centos需要先安装jdk1.8 并配置环境变量
cd /root/mycanal/bin/
./startup.sh
5.查看
①查看 server 日志
cd /root/mycanal/logs/canal/
cat canal.log
②查看 样例example 的日志
cd /root/mycanal/logs/example/
cat example.log
4.3.3 编写Java代码
① 修改pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.canal</groupId>
<artifactId>canal_demo02</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.14</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<log4j.version>1.2.17</log4j.version>
<lombok.version>1.16.18</lombok.version>
<mysql.version>5.1.47</mysql.version>
<druid.version>1.1.16</druid.version>
<mapper.version>4.1.5</mapper.version>
<mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
</properties>
<dependencies>
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<!--SpringBoot通用依赖模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--swagger2-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--SpringBoot与Redis整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--SpringBoot与AOP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<!--Mysql数据库驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!--SpringBoot集成druid连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<!--mybatis和springboot整合-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.spring.boot.version}</version>
</dependency>
<!--通用基础配置junit/devtools/test/log4j/lombok/hutool-->
<!--hutool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<!--persistence-->
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>persistence-api</artifactId>
<version>1.0.2</version>
</dependency>
<!--通用Mapper-->
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<version>${mapper.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
②.修改yaml配置
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/bottom?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.druid.test-while-idle=false
③ 注释掉主启动类的run
@SpringBootApplication
public class CanalTestApp
{
public static void main(String[] args)
{
//SpringApplication.run(CanalTestApp.class,args);
}
}
③ 编写一个RedisUtils
public class RedisUtils
{
public static final String REDIS_IP_ADDR = "192.168.118.130";
public static final String REDIS_pwd = "123456";
public static JedisPool jedisPool;
static {
JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(20);
jedisPoolConfig.setMaxIdle(10);
jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);
}
public static Jedis getJedis() throws Exception {
if(null!=jedisPool){
return jedisPool.getResource();
}
throw new Exception("Jedispool is not ok");
}
}
④ 编写CanalUntils工具类
public class CanalUntils
{
public static final Integer _60SECONDS = 60;
public static final String REDIS_IP_ADDR = "192.168.111.185";
private static void redisInsert(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
}catch (Exception e){
e.printStackTrace();
}
}
}
private static void redisDelete(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.del(columns.get(0).getValue());
}catch (Exception e){
e.printStackTrace();
}
}
}
private static void redisUpdate(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
}catch (Exception e){
e.printStackTrace();
}
}
}
public static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
//获取变更的row数据
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
}
//获取变动类型
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else {//EventType.UPDATE
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
public static void main(String[] args)
{
System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
//=================================
// 创建链接canal服务端
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111),
"example",
"",
"");
int batchSize = 1000;
//空闲空转计数器
int emptyCount = 0;
System.out.println("---------------------canal init OK,开始监听mysql变化------");
try {
connector.connect();
//connector.subscribe(".*\\..*");
connector.subscribe("bigdata.t_user");
connector.rollback();
int totalEmptyCount = 10 * _60SECONDS;
while (emptyCount < totalEmptyCount) {
System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
} else {
//计数器重新置零
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
} finally {
connector.disconnect();
}
}
}
五、总结
本文主要介绍了Redis的缓存相关问题,什么是Redis的读写缓存,读写缓存会产生什么问题,以及如何解决这些问题
转载自:https://juejin.cn/post/7361433958574571558