SpringBoot整合Redis总结
SpringBoot整合Redis
Redis数据类型
数据类型 | 最大存储数据量 |
---|---|
key | 512M |
string | 512M |
hash | 2^32-1 |
list | 2^32-1 |
set | 2^32-1 |
sorted set | |
bitmap | 512M |
hyperloglog | 12k |
Redis基本语法
存储字符串string
字符串类型是Redis中最为基础的数据存储类型,它在Redis中是二进制安全的,这便意味着该类型可以接受任何格式的数据,如JPEG图像数据或Json对象描述信息等。在Redis中字符串类型的Value最多可以容纳的数据长度是512M。
set key value:设定key持有指定的字符串value,如果该key存在则进行覆盖操作。总是返回”OK”
get key:获取key的value。如果与该key关联的value不是String类型,redis将返回错误信息,因为get命令只能用于获取String value;如果该key不存在,返回null。
getset key value:先获取(输出)该key的值,然后在设置该key的值。
del key : 删除key的值
incr key:将指定的key的value原子性的递增1。如果该key不存在,其初始值为0,在incr之后其值为1。如果value的值不能转成整型,如”hello”,该操作将执行失败并返回相应的错误信息。
decr key:将指定的key的value原子性的递减1。如果该key不存在,其初始值为0,在incr之后其值为-1。如果value的值不能转成整型,如”hello”,该操作将执行失败并返回相应的错误信息。
incrby key increment:将指定的key的value原子性增加increment。如果该key不存在,初始值为0,在incrby之后,该值为increment。如果该值不能转成整型,如hello则失败并返回错误信息。
decrby key decrement:将指定的key的value原子性减少decrement。如果该key不存在,初始值为0,在decrby之后,该值为decrement。如果该值不能转成整型,如hello则失败并返回错误信息。
append key value:如果该key存在,则在原有的value后追加该值(即拼接子字符串)。如果该 key不存在,则重新创建一个key/value
存储hash
Redis中的Hashes类型可以看成具有String Key和String Value的map容器。所以该类型非常适合于存储值对象的信息。如Username、Password和Age等。如果 Hash中包含很少的字段,那么该类型的数据也将仅占用很少的磁盘空间。每一个Hash可以存储4294967295个键值对。
hset key field value:为指定的key设定field/value对(键值对)。
hgetall key:获取key中的所有filed-vaule。
hget key field:返回指定的key中的field的值。
hmset key fields:设置key中的多个filed/value。
hmget key fileds:获取key中的多个filed的值。
hdel key filed:删除key中的指定filed
del key:删除key的hash值
hincrby key field increment:设置key中filed的值增加increment
hexists key field:判断指定的key中的filed是否存在
hlen key:获取key所包含的field的数量
hkeys key:获取key中所有fieldname值
hvals key:获取key中所有fieldvalue值
存储list
在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部(left)和尾部(right)添加新的元素。在插入时,如果该键并不存在,Redis将为该键创建一个新的链表。与此相反,如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是 4294967295。从元素插入和删除的效率视角来看,如果我们是在链表的两头插入或删除元素,这将会是非常高效的操作,即使链表中已经存储了百万条记录,该操作也可以在常量时间内完成。然而需要说明的是,如果元素插入或删除操作是作用于链表中间,那将会是非常低效的。相信对于有良好数据结构基础的开发者而言,这一点并不难理解。
lpush key value1 value2…:在指定的key所关联的list的头部插入所有的values,如果该key不存在,该命令在插入的之前创建一个与该key关联的空链表,之后再向该链表的头部插入数据。插入成功,返回元素的个数。[注意:一个一个都从左边插入,如ipush key value1 value2,则插入结果是value2 value1]
rpush key value1 value2…:在该list的尾部添加元素
lrange key start end:获取链表中从start到end的元素的值(从0开始),start、end可为负数,若为-1则表示链表尾部的元素,-2则表示倒数第二个,依次类推…
lpushx key value:仅当参数中指定的key存在时(如果与key管理的list中没有值时,则该key不会创建)在指定的key所关联的list的头部插入value
rpushx key value:在该list的尾部添加元素
lpop key:返回并弹出指定的key关联的链表中的第一个元素,即头部元素
rpop key:从尾部弹出元素
rpoplpush resource destination:将resource集合中的尾部元素弹出并添加到destination集合头部
llen key:返回指定的key关联的链表中的元素的数量
lrem key count value:删除count个值为value的元素,如果count大于0,从头向尾遍历并删除count个值为value的元素;如果count小于0,则从尾向头遍历并删除。如果count等于0,则删除链表中所有等于value的元素。
lset key index value:设置链表中的index的脚标的元素值(相当于替换),0代表链表的头元素,-1代表链表的尾元素。
linsert key before|after pivot value:在pivot元素前或者后插入value这个元素。
存储set
在Redis中,我们可以将Set类型看作为没有排序的字符集合,和List类型一样,我们也可以在该类型的数据值上执行添加、删除或判断某一元素是否存在等操作。需要说明的是,这些操作的时间是常量时间。Set可包含的最大元素数是4294967295。和List类型不同的是,Set集合中不允许出现重复的元素。和List类型相比,Set类型在功能上还存在着一个非常重要的特性,即在服务器端完成多个Sets之间的聚合计算操作,如unions、intersections和differences。由于这些操作均在服务端完成,因此效率极高,而且也节省了大量的网络IO开销。
sadd key value1 value2…:向set中添加数据,如果该key的值已有则不会重复添加
smembers key:获取set中所有的成员
srem key member1 member2…:删除set中指定的成员
sismember key member:判断参数中指定的成员是否在该set中,1表示存在,0表示不存在或者该key本身就不存在
scard key:获取set中成员的数量
sdiff key1 key2:返回key1与key2中相差的成员(key1-key2),而且与key的顺序有关。即返回差集
sdiffstore destination key1 key2:将key1、key2相差的成员存储在 destination上
sinter key1 key2:返回key1和key2的交集
sinterstore destination key1 key2:将返回的交集存储在destination上
sunion key1 key2:返回并集
sunionstore destination key1 key2:将返回的并集存储在destination上
srandmember key:随机返回set中的一个成员
存储sortedset
Sorted-Sets和Sets类型极为相似,它们都是字符串的集合,都不允许重复的成员出现在一个Set中。它们之间的主要差别是Sorted-Sets中的每一个成员都会有一个分数(score)与之关联,Redis正是通过分数来为集合中的成员进行从小到大的排序。然而需要额外指出的是,尽管Sorted-Sets中的成员必须是唯一的,但是分数(score) 却是可以重复的。
在Sorted-Set中添加、删除或更新一个成员都是非常快速的操作,其时间复杂度为集合中成员数量的对数。由于Sorted-Sets中的成员在集合中的位置是有序的,因此,即便是访问位于集合中部的成员也仍然是非常高效的。事实上,Redis所具有的这一特征在很多其它类型的数据库中是很难实现的,换句话说,在该点上要想达到和Redis同样的高效,在其它数据库中进行建模是非常困难的。
zadd key score member score2 member2 …:将所有成员以及该成员的权重分数存放到sorted-set中
zrange key start end [withscores]:获取集合中脚标为start-end的成员,[withscores]参数表明返回的成员包含其分数。
zrevrange key start end [withscores]:倒序获取集合中脚标为start-end的成员,[withscores]参数表明返回的成员包含其分数。
zrangebyscore key min max [withscores][limit offset count]:返回分数在[min,max]的成员并按照分数从低到高排序。[withscores]:显示分数;[limit offset count]:offset,表明从脚标为offset的元素开始并返回count个成员
zcount key min max:获取分数在[min,max]之间的成员数量
zcard key:获取集合中的成员数量
zscore key member:返回指定成员的分数
zrem key member[member…]:移除集合中指定的成员,可以指定多个成员
zremrangebyrank key start end:按照排名范围(从小到大排名)删除元素(start和end为排名位置)
zremrangebyscore key min max:按照分数(从小到大排名)删除元素(min和max为分数)
zincrby key increment member:设置指定成员的增加的分数,返回值是更改后的分数
zrank key member:返回成员在集合中的位置
zrevrank key member:逆序返回成员在集合中的位置
keys的通用操作
keys pattern:获取所有与pattern匹配的key,但会所有与该key匹配的keys,*表示任意一个或多个字符,?表示任意一个字符
del key1 key2…:删除指定的key
exists key:判断key是否存在,1代表存在,0代表不存在
rename key newkey:为当前的key重命名
expire key time:为key设置过期时间,单位秒
ttl key:获取该key所剩的超时时间,如果没有设置超时返回-1,如果返回-2表示超时了(不存在),如果没超时还存在则返回剩余时间
type key:获取指定key的类型,该命令将以字符串的格式返回。返回的字符串为string、list、set、hash和zset。如果key不存在返回none
maven坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置文件参数
#8号库 一个16个【0 - 15】
spring.redis.database=8
spring.redis.host=localhost
spring.redis.port=6379
配置类的编写
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// 设置key的序列化方式
template.setKeySerializer(RedisSerializer.string());
// 设置value的序列化方式
template.setValueSerializer(RedisSerializer.json());
// 设置hash的key的序列化方式
template.setHashKeySerializer(RedisSerializer.string());
// 设置hash的value的序列化方式
template.setHashValueSerializer(RedisSerializer.json());
template.afterPropertiesSet();
return template;
}
}
点赞功能示例
其中的RedisKeyUtil为生成redisKey的工具类
@Service
public class LikeService {
@Autowired
private RedisTemplate redisTemplate;
// 点赞
public void like(int userId, int entityType, int entityId, int entityUserId) {
redisTemplate.execute(new SessionCallback() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
String entityLikeKey = RedisKeyUtil.getEntityLikeKey(entityType, entityId);
String userLikeKey = RedisKeyUtil.getUserLikeKey(entityUserId);
boolean isMember = operations.opsForSet().isMember(entityLikeKey, userId);
operations.multi();
if (isMember) {
operations.opsForSet().remove(entityLikeKey, userId);
operations.opsForValue().decrement(userLikeKey);
} else {
operations.opsForSet().add(entityLikeKey, userId);
operations.opsForValue().increment(userLikeKey);
}
return operations.exec();
}
});
}
// 查询某实体点赞的数量
public long findEntityLikeCount(int entityType, int entityId) {
String entityLikeKey = RedisKeyUtil.getEntityLikeKey(entityType, entityId);
return redisTemplate.opsForSet().size(entityLikeKey);
}
// 查询某人对某实体的点赞状态
public int findEntityLikeStatus(int userId, int entityType, int entityId) {
String entityLikeKey = RedisKeyUtil.getEntityLikeKey(entityType, entityId);
return redisTemplate.opsForSet().isMember(entityLikeKey, userId) ? 1 : 0;
}
// 查询某个用户获得的赞
public int findUserLikeCount(int userId) {
String userLikeKey = RedisKeyUtil.getUserLikeKey(userId);
Integer count = (Integer) redisTemplate.opsForValue().get(userLikeKey);
return count == null ? 0 : count.intValue();
}
}
关注功能示例
其中的CommunityConstant接口定义了项目所需的常用常量。
@Service
public class FollowService implements CommunityConstant {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private UserService userService;
public void follow(int userId, int entityType, int entityId) {
redisTemplate.execute(new SessionCallback() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
String followeeKey = RedisKeyUtil.getFolloweeKey(userId, entityType);
String followerKey = RedisKeyUtil.getFollowerKey(entityType, entityId);
operations.multi();
operations.opsForZSet().add(followeeKey, entityId, System.currentTimeMillis());
operations.opsForZSet().add(followerKey, userId, System.currentTimeMillis());
return operations.exec();
}
});
}
public void unfollow(int userId, int entityType, int entityId) {
redisTemplate.execute(new SessionCallback() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
String followeeKey = RedisKeyUtil.getFolloweeKey(userId, entityType);
String followerKey = RedisKeyUtil.getFollowerKey(entityType, entityId);
operations.multi();
operations.opsForZSet().remove(followeeKey, entityId);
operations.opsForZSet().remove(followerKey, userId);
return operations.exec();
}
});
}
// 查询关注的实体的数量
public long findFolloweeCount(int userId, int entityType) {
String followeeKey = RedisKeyUtil.getFolloweeKey(userId, entityType);
return redisTemplate.opsForZSet().zCard(followeeKey);
}
// 查询实体的粉丝的数量
public long findFollowerCount(int entityType, int entityId) {
String followerKey = RedisKeyUtil.getFollowerKey(entityType, entityId);
return redisTemplate.opsForZSet().zCard(followerKey);
}
// 查询当前用户是否已关注该实体
public boolean hasFollowed(int userId, int entityType, int entityId) {
String followeeKey = RedisKeyUtil.getFolloweeKey(userId, entityType);
return redisTemplate.opsForZSet().score(followeeKey, entityId) != null;
}
// 查询某用户关注的人
public List<Map<String, Object>> findFollowees(int userId, int offset, int limit) {
String followeeKey = RedisKeyUtil.getFolloweeKey(userId, ENTITY_TYPE_USER);
Set<Integer> targetIds = redisTemplate.opsForZSet().reverseRange(followeeKey, offset, offset + limit - 1);
if (targetIds == null) {
return null;
}
List<Map<String, Object>> list = new ArrayList<>();
for (Integer targetId : targetIds) {
Map<String, Object> map = new HashMap<>();
User user = userService.findUserById(targetId);
map.put("user", user);
Double score = redisTemplate.opsForZSet().score(followeeKey, targetId);
map.put("followTime", new Date(score.longValue()));
list.add(map);
}
return list;
}
// 查询某用户的粉丝
public List<Map<String, Object>> findFollowers(int userId, int offset, int limit) {
String followerKey = RedisKeyUtil.getFollowerKey(ENTITY_TYPE_USER, userId);
Set<Integer> targetIds = redisTemplate.opsForZSet().reverseRange(followerKey, offset, offset + limit - 1);
if (targetIds == null) {
return null;
}
List<Map<String, Object>> list = new ArrayList<>();
for (Integer targetId : targetIds) {
Map<String, Object> map = new HashMap<>();
User user = userService.findUserById(targetId);
map.put("user", user);
Double score = redisTemplate.opsForZSet().score(followerKey, targetId);
map.put("followTime", new Date(score.longValue()));
list.add(map);
}
return list;
}
}
高级数据类型-HyperLogLog
// 统计20万个重复数据的独立总数.
@Test
public void testHyperLogLog() {
String redisKey = "test:hll:01";
for (int i = 1; i <= 100000; i++) {
redisTemplate.opsForHyperLogLog().add(redisKey, i);
}
for (int i = 1; i <= 100000; i++) {
int r = (int) (Math.random() * 100000 + 1);
redisTemplate.opsForHyperLogLog().add(redisKey, r);
}
long size = redisTemplate.opsForHyperLogLog().size(redisKey);
System.out.println(size);//结果: 99553
}
// 将3组数据合并, 再统计合并后的重复数据的独立总数.
@Test
public void testHyperLogLogUnion() {
String redisKey2 = "test:hll:02";
for (int i = 1; i <= 10000; i++) {
redisTemplate.opsForHyperLogLog().add(redisKey2, i);
}
String redisKey3 = "test:hll:03";
for (int i = 5001; i <= 15000; i++) {
redisTemplate.opsForHyperLogLog().add(redisKey3, i);
}
String redisKey4 = "test:hll:04";
for (int i = 10001; i <= 20000; i++) {
redisTemplate.opsForHyperLogLog().add(redisKey4, i);
}
String unionKey = "test:hll:union";
redisTemplate.opsForHyperLogLog().union(unionKey, redisKey2, redisKey3, redisKey4);
long size = redisTemplate.opsForHyperLogLog().size(unionKey);
System.out.println(size); // 结果:19833
}
高级数据类型-bitmap
// 统计一组数据的布尔值
@Test
public void testBitMap() {
String redisKey = "test:bm:01";
// 记录
redisTemplate.opsForValue().setBit(redisKey, 1, true);
redisTemplate.opsForValue().setBit(redisKey, 4, true);
redisTemplate.opsForValue().setBit(redisKey, 7, true);
// 查询
System.out.println(redisTemplate.opsForValue().getBit(redisKey, 0)); //false
System.out.println(redisTemplate.opsForValue().getBit(redisKey, 1)); //true
System.out.println(redisTemplate.opsForValue().getBit(redisKey, 2)); //false
// 统计
Object obj = redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
return connection.bitCount(redisKey.getBytes());
}
});
System.out.println(obj); // 3
}
// 统计3组数据的布尔值, 并对这3组数据做OR运算.
@Test
public void testBitMapOperation() {
String redisKey2 = "test:bm:02";
redisTemplate.opsForValue().setBit(redisKey2, 0, true);
redisTemplate.opsForValue().setBit(redisKey2, 1, true);
redisTemplate.opsForValue().setBit(redisKey2, 2, true);
String redisKey3 = "test:bm:03";
redisTemplate.opsForValue().setBit(redisKey3, 2, true);
redisTemplate.opsForValue().setBit(redisKey3, 3, true);
redisTemplate.opsForValue().setBit(redisKey3, 4, true);
String redisKey4 = "test:bm:04";
redisTemplate.opsForValue().setBit(redisKey4, 4, true);
redisTemplate.opsForValue().setBit(redisKey4, 5, true);
redisTemplate.opsForValue().setBit(redisKey4, 6, true);
String redisKey = "test:bm:or";
Object obj = redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.bitOp(RedisStringCommands.BitOperation.OR,
redisKey.getBytes(), redisKey2.getBytes(), redisKey3.getBytes(), redisKey4.getBytes());
return connection.bitCount(redisKey.getBytes());
}
});
System.out.println(obj); // 7
System.out.println(redisTemplate.opsForValue().getBit(redisKey, 0)); // 全是 true
System.out.println(redisTemplate.opsForValue().getBit(redisKey, 1));
System.out.println(redisTemplate.opsForValue().getBit(redisKey, 2));
System.out.println(redisTemplate.opsForValue().getBit(redisKey, 3));
System.out.println(redisTemplate.opsForValue().getBit(redisKey, 4));
System.out.println(redisTemplate.opsForValue().getBit(redisKey, 5));
System.out.println(redisTemplate.opsForValue().getBit(redisKey, 6));
}
统计UV(独立访客) DAU(日活跃用户数量)
service
@Service
public class DataService {
@Autowired
private RedisTemplate redisTemplate;
private SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
// 将指定的IP计入UV
public void recordUV(String ip) {
String redisKey = RedisKeyUtil.getUVKey(df.format(new Date()));
redisTemplate.opsForHyperLogLog().add(redisKey, ip);
}
// 统计指定日期范围内的UV
public long calculateUV(Date start, Date end) {
if (start == null || end == null) {
throw new IllegalArgumentException("参数不能为空!");
}
// 整理该日期范围内的key
List<String> keyList = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
calendar.setTime(start);
while (!calendar.getTime().after(end)) {
String key = RedisKeyUtil.getUVKey(df.format(calendar.getTime()));
keyList.add(key);
calendar.add(Calendar.DATE, 1);
}
// 合并这些数据
String redisKey = RedisKeyUtil.getUVKey(df.format(start), df.format(end));
redisTemplate.opsForHyperLogLog().union(redisKey, keyList.toArray());
// 返回统计的结果
return redisTemplate.opsForHyperLogLog().size(redisKey);
}
// 将指定用户计入DAU
public void recordDAU(int userId) {
String redisKey = RedisKeyUtil.getDAUKey(df.format(new Date()));
redisTemplate.opsForValue().setBit(redisKey, userId, true);
}
// 统计指定日期范围内的DAU
public long calculateDAU(Date start, Date end) {
if (start == null || end == null) {
throw new IllegalArgumentException("参数不能为空!");
}
// 整理该日期范围内的key
List<byte[]> keyList = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
calendar.setTime(start);
while (!calendar.getTime().after(end)) {
String key = RedisKeyUtil.getDAUKey(df.format(calendar.getTime()));
keyList.add(key.getBytes());
calendar.add(Calendar.DATE, 1);
}
// 进行OR运算
return (long) redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
String redisKey = RedisKeyUtil.getDAUKey(df.format(start), df.format(end));
connection.bitOp(RedisStringCommands.BitOperation.OR,
redisKey.getBytes(), keyList.toArray(new byte[0][0]));
return connection.bitCount(redisKey.getBytes());
}
});
}
}
拦截器
@Component
public class DataInterceptor implements HandlerInterceptor {
@Autowired
private DataService dataService;
@Autowired
private HostHolder hostHolder;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 统计UV
String ip = request.getRemoteHost();
dataService.recordUV(ip);
// 统计DAU
User user = hostHolder.getUser();
if (user != null) {
dataService.recordDAU(user.getId());
}
return true;
}
}
controller
@Controller
public class DataController {
@Autowired
private DataService dataService;
// 统计页面
@RequestMapping(path = "/data", method = {RequestMethod.GET, RequestMethod.POST})
public String getDataPage() {
return "/site/admin/data";
}
// 统计网站UV
@RequestMapping(path = "/data/uv", method = RequestMethod.POST)
public String getUV(@DateTimeFormat(pattern = "yyyy-MM-dd") Date start,
@DateTimeFormat(pattern = "yyyy-MM-dd") Date end, Model model) {
long uv = dataService.calculateUV(start, end);
model.addAttribute("uvResult", uv);
model.addAttribute("uvStartDate", start);
model.addAttribute("uvEndDate", end);
return "forward:/data";
}
// 统计活跃用户
@RequestMapping(path = "/data/dau", method = RequestMethod.POST)
public String getDAU(@DateTimeFormat(pattern = "yyyy-MM-dd") Date start,
@DateTimeFormat(pattern = "yyyy-MM-dd") Date end, Model model) {
long dau = dataService.calculateDAU(start, end);
model.addAttribute("dauResult", dau);
model.addAttribute("dauStartDate", start);
model.addAttribute("dauEndDate", end);
return "forward:/data";
}
}
Redis-过期策略
Redis会把设置了过期时间的key放入一个独立的字典里,在key过期时并不会立刻删除它。Redis会通过如下两种策略,来删除过期的key:
-
惰性删除
客户端访问某个key时, Redis会检查该key是否过期,若过期则删除。
-
定期扫描 Redis默认每秒执行10次过期扫描(配置hz选项),扫描策略如下:
1.从过期字典中随机选择20个key;
2.删除这20个key中已过期的key;
3.如果过期的key的比例超过25%,则重复步骤1;
Redis-淘汰策略
当Red占用内存超出最大限制(maxmemory)时,可采用如下策略(maxmemory-policy),让Redis淘汰一些数据,以腾出空间继续提供读写服务:
noeviction: 对可能导致增大内存的命令返回错误(大多数写命令,DEL除外)
volatile-ttl: 在设置了过期时间的key中,选择剩余寿命(TTL)最短的key,将其淘汰
volatile-lru: 在设置了过期时间的key中,选择最少使用的key (LRU),将其淘汰
volatile-random: 在设置了过期时间的key中,随机选择一些key,将其淘汰
allkeys-lru: 在所有的key中,选择最少使用的key (LRU),将其淘汰
allkeys-random: 在所有的key中,随机选择一些key,将其淘汰
LRU算法
维护一个链表,用于顺序存储被访问过的key。在访问数据时,最新访问过的key将被移动到表头,即最近访问的key在表头,最少访问的key在表尾。
近似LRU算法(Redis)
给每个key维护一个时间戳,淘汰时随机采样5个key,从中淘汰掉最旧的key。如果还是超出内存限制,则继续随机采样淘汰。
优点: 比LRu算法节约内存,却可以取得非常近似的效果。
Redis-缓存穿透
场景
查询根本不存在的数据,使得请求直达存储层,导致其负载过大,甚至宕机。
解决方案
- 1、缓存空对象 存储层未命中后,仍然将空值存入缓存层。再次访问该数据时,缓存层会直接返回空值。
- 2、布隆过滤器 将所有存在的key提前存入布隆过滤器,在访问缓存层之前,先通过过滤器拦截,若请求的是不存在的key,则直接返回空值。
Redis-缓存击穿
场景
一份热点数据,它的访问量非常大。在其缓存失效瞬间,大量请求直达存储层,导致服务崩溃。
解决方案
- 1、加互斥锁 对数据的访问加互斥锁,当一个线程访问该数据时,其他线程只能等待。 这个线程访问过后,缓存中的数据将被重建,届时其他线程就可以直接从缓存取值。
- 2、永不过期 不设置过期时间,所以不会出现上述问题,这是“物理”上的不过期。 为每个value设置逻辑过期时间,当发现该值逻辑过期时,使用单独的线程重建缓存。
Redis-缓存雪崩
场景
由于某些原因,缓存层不能提供服务,导致所有的请求直达存储层,造成存储层宕机。
解决方案
- 1、避免同时过期 设置过期时间时,附加一个随机数,避免大量的key同时过期。
- 2、构建高可用的Redis缓存 部署多个Redis实例,个别节点宕机,依然可以保持服务的整体可用。
- 3、构建多级缓存 增加本地缓存,在存储层前面多加一级屏障,降低请求直达存储层的几率。
- 4、启用限流和降级措施 对存储层增加限流措施,当请求超出限制时,对其提供降级服务。
Redis-分布式锁
场景
修改时,经常需要先将数据读取到内存,在内存中修改后再存回去。在分布式应用中,可能多个进程同时执行上述操作,而读取和修改非原子操作,所以会产生冲突。增加分布式锁,可以解决此类问题。
基本原理
同步锁:在多个线程都能访问到的地方,做一个标记,标识该数据的访问权限。分布式锁:在多个进程都能访问到的地方,做一个标记,标识该数据的访问权限。
实现方式
1.基于数据库实现分布式锁;
2.基于Redis实现分布式锁;
3.基于zookeeper实现分布式锁;
Redis实现分布式锁的原则
1.安全属性:独享。在任一时刻,只有一个客户端持有锁。
2.活性A:无死锁。即便持有锁的客户端崩溃或者网络被分裂,锁仍然可以被获取。
3.活性B:容错。只要大部分Redis节点都活着,客户端就可以获取和释放锁。
单Redis实例实现分布式锁
1.获取锁使用命令:
SET resource_name my_random_value NX PX 3000oNX:仅在key不存在时才执行成功。Px:设置锁的自动过期时间。
2.通过Lua脚本释放锁:
if redis.call ( "get",KEYS[1])== ARGV [1] then
return redis.call ( "del",KEYS [1])
else return o end
可以避免删除别的客户端获取成功的锁: A加锁->A阻塞->因超时释放锁->B加锁->A恢复->释放锁
多Redis实例实现分布式锁
Redlock算法,该算法有现成的实现,其Java版本的库为Redisson。
1.获取当前unix时间,以毫秒为单位。
2.依次尝试从n个实例,使用相同的key和随机值获取锁,并设置响应超时时间。如果服务器没有在 规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
3.客户端使用当前时间减去开始获取锁的时间,得到获取锁使用的时间。当且仅当大多数的Redis节 点都取到锁,并且使用的时间小于锁失效时间时,锁才算取得成功。
4.如果取到了锁,key的真正有效时间等于有效时间减去获取锁使用的时间。
5.如果获取锁失败,客户端应该在所有的Redis实例上进行解锁。
转载自:https://juejin.cn/post/7075328504870469669