Redis 缓存
缓存基础和主动更新
基础模板
一般基础使用都是这个模板
@Autowiredprivate RedisTemplate<String, Object> redisTemplate;
public Object queryWithRedisCache(String key) { // 获取key,一般都会加个前缀来注明作缓存用 String key = "cache:" + key; // 查询Redis中是否存在该key对应的缓存 Object cache = redisTemplate.opsForValue().get(key); if (cache != null) { // 如果存在缓存,直接返回缓存 return cache; } // 如果不存在缓存,则查询数据库,并将结果缓存到Redis中 Object result = queryFromDB(); if (result == null) { // 如果查询结果为空,则直接返回null return null; } redisTemplate.opsForValue().set(key, result); // 设置缓存过期时间为60秒,毕竟Redis空间有限很宝贵 redisTemplate.expire(key, 60, TimeUnit.MINUTES); return result;}
主动更新策略
然而基础模板效率来说其实是不高的,因为在于它的逐出策略,是到了时间才将其删除,那假设一种情况,每一个缓存进来之后,都没再被用过了,都是等到expire时间到了才消失,这样的话这个缓存设置就没意义了。
解决方案就是用主动更新策略,主动地去删除无用缓存,同时为了保证缓存一致性和缓存与数据库的资源一致,我们先操作数据库,然后再删除缓存。
-
对于低一致性要求确实可以用超时自动删除,但是对于高一致性要求,我们还是需要主动更新。
-
对于读操作还是不变的,先读缓存,读到就返回,没有的话再读数据库,然后更新缓存。
-
对于写操作,先更新数据库,然后删除缓存。
@Autowiredprivate RedisTemplate<String, Object> redisTemplate;
public Object updateWithRedisCache(String key, Object value) { updateToDB(key, value); // 删除缓存 String key = "cache:" + key; redisTemplate.delete(key); return value;}
缓存进阶和实际问题
常见的几个问题
-
缓存穿透问题:缓存和数据库都没有,每次查询都要去数据库,这样会导致数据库压力过大,造成系统崩溃。
- 在第一次到缓存未命中到达数据库后,发现符合缓存穿透条件,设置一个空值缓存(有过期时间),当查询不到数据时,直接返回空值。
- 布隆过滤器: 这样的话结构本来是发出查询->redis缓存->数据库,要在查询和缓存间加一层BloomFilter。如果检测到查询的key不存在于数据库中,则直接报错;如果在数据库中,则更布隆过滤器。
-
缓存雪崩问题:缓存服务器宕机或同一时段大量缓存失效,导致大量请求直接落到数据库,数据库压力过大,造成系统崩溃。
- 随机设置缓存过期时间,避免缓存雪崩。
- Redis集群: 避免缓存雪崩,可以将缓存分布到多个Redis节点上,避免单点故障。
- 降级限流: 直接限制对服务器的查询请求,返回错误,不对数据库产生进一步的压力。
- 添加多级缓存
-
缓存击穿问题:缓存击穿是指对于某个被高并发且缓存构建业务比较复杂的key,缓存中没有,但是数据库中有,每次查询都要去数据库,造成数据库压力过大,造成系统崩溃。
- 互斥锁: 对于查询某个key的请求,当缓存没有命中时,加互斥锁,查询数据库返回请求,并在缓存中设置这个key,再释放锁,避免其他线程在查询重建时期的多次访问。
- 逻辑过期: 对于那些已经判断为热点高并发的资源,直接把它定死在redis当中,保证热点资源每时每刻都在缓存中,虽然可能会有旧的没更新的。对于过期时间,直接以值的形式存到redis的value一栏当中,这样即使到了过期时间它也不会被redis删除。如果过期了,则开另外一个线程去查询数据库,更新缓存。
Redis如何上锁
我们可以使用Redis的setnx命令来实现分布式锁,setnx命令的作用是设置一个key,当key不存在时,才会设置成功,如果key已经存在,则不设置成功。
(为什么我们需要用Redis实现分布式锁?因为Java当中的比如synchronized
的锁,在分布式环境下是不安全的,因为它只能锁住一个线程,而分布式环境下,线程是分布在不同的机器上的,只有使用Redis的锁来统一进行控制,才能保证线程间的互斥。)
这样可以发现,setnx
的性质很好刚好就契合了锁的功能。因为如果key不存在,则说明没有人持有锁,可以加锁,如果key已经存在,说明有人持有锁,再怎么申请都不能加锁。
但是还是要注意,如果一个持有锁的程序崩溃了,锁就会一直存在,造成死锁。所以我们还需要类似RAII思想,设置一个过期时间,避免锁一直存在。
public boolean lock(String key) { String lockKey = "lock:" + key; String requestId = UUID.randomUUID().toString(); // 尝试加锁,成功返回true,失败返回false boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, 30, TimeUnit.SECONDS); if (result) { // 加锁成功,设置过期时间 redisTemplate.expire(lockKey, 30, TimeUnit.SECONDS); return true; } else { // 加锁失败,说明有人持有锁 return false; }}
public void unlock(String key) { String lockKey = "lock:" + key; String requestId = redisTemplate.opsForValue().get(lockKey); if (requestId!= null && requestId.equals(UUID.randomUUID().toString())) { // 只有持有锁的线程才可以释放锁 redisTemplate.delete(lockKey); }}
逻辑过期
// 先封装data对象,将原数据和逻辑过期时间合并成一个对象@Data@AllArgsConstructorpublic class CacheData(Object data, LocalDateTime expireTime) { Object rawData; LocalDateTime expireTime;}
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
public Object queryWithRedisCache(String key) { // 获取key,一般都会加个前缀来注明作缓存用 String key = "cache:" + key; // 查询Redis中是否存在该key对应的缓存 CacheData cacheData = redisTemplate.opsForValue().get(key); if (cacheData == null) { // 如果不存在缓存,直接返回空 return null; } // 如果缓存中的逻辑时间过期,直接返回旧的这个过期的缓存 Object data = cacheData.getData(); LocalDateTime expireTime = cacheData.getExpireTime(); if (expireTime.isAfter(LocalDateTime.now())) { // 缓存没有过期,直接返回这个可用的缓存 return data; } // 如果缓存过期,则查询数据库,并将结果缓存到Redis中 // 涉及到对数据库的操作,需要加锁 Boolean lock = requireLock(key); if (!lock) { // 申请失败,说明已经有线程在修改缓存,我们直接返回旧的缓存 return data; } // 申请锁成功,我们另开一个新线程去查询数据库,并更新缓存,本线程直接返回旧的缓存 executor.execute(() -> { try { Object result = queryFromDB(); if (result == null) { // 如果查询结果为空,则直接返回null return null; } // 缓存新数据 CacheData newCacheData = new CacheData(result, LocalDateTime.now().plusSeconds(60)); redisTemplate.opsForValue().set(key, newCacheData); } catch (Exception e) { // 出现异常,说明查询数据库失败,我们直接返回旧的缓存 return data; } finally { // 不管有没有遇到问题,都要释放锁 releaseLock(key); } }); // 因为我们在新线程进行了更新,所以本线程仍旧返回旧的缓存 return data;}
高阶改进
上面的实现看起来还是非常朴素的,还有非常多的优化空间
封装整个Redis的工具类
这里就是把上面的代码都复制粘贴到一个独立的类当中,但是注意泛型的使用,要保证泛用性。
使用官方锁
我们自己写的锁,肯定不如它官方锁来的安全,并发度高,支持并发。
比如说,可以使用Redisson来实现分布式锁,它可以自动续期,避免死锁,还可以实现公平锁,避免锁的饥饿。
// Redisson 客户端的配置 Config config = new Config(); config.useSingleServer().setAddress("redis://your-redis-host:6379"); RedissonClient redisson = Redisson.create(config);
// requireLock 方法 public boolean requireLock(String key) { RLock lock = redisson.getLock(key); // 超时时间设置为 50 毫秒 return lock.tryLock(50, TimeUnit.MILLISECONDS); }
// releaseLock 方法 public void releaseLock(String key) { RLock lock = redisson.getLock(key); lock.unlock(); }
使用Redis集群
Redis集群可以提高Redis的读写性能,避免单点故障。
RedisClusterClient redisClusterClient = RedisClusterClient.create("redis://localhost:7000,redis://localhost:7001,redis://localhost:7002");// 连接集群,获取RedisAdvancedClusterCommands对象,用来操作集群RedisAdvancedClusterCommands<String, String> commands = redisClusterClient.getAdvancedClusterCommands();
使用Lua脚本
为什么要使用 Lua 脚本,而不是直接封装 Java 方法,这里我详细为你解释:
- Lua 脚本在 Redis 中的优势:
- 原子性: Lua 脚本在 Redis 中执行是原子性的,保证脚本中的所有操作要么全部执行成功,要么全部执行失败,避免了因并发问题导致数据不一致的情况。在秒杀场景中,原子性保证了库存更新和订单创建的同步,防止超卖问题。
- 效率: Lua 脚本在 Redis 内执行,直接利用 Redis 的内存数据结构进行操作,效率非常高,而 Java 方法则需要进行网络通信,效率相对较低。对于高并发秒杀场景,效率的提升非常关键。
- 安全性: Lua 脚本只能在 Redis 中执行,无法直接访问系统资源,安全性更高,避免了恶意代码执行的风险。
- 为什么不用直接封装 Java 方法?
- 线程安全性: Java 方法在多线程环境中执行需要考虑线程安全问题,需要额外编写代码来保证数据一致性,相对复杂。
- 网络通信: Java 方法需要通过网络通信与 Redis 交互,增加网络延迟,效率降低。
- 数据一致性: 如果 Java 方法执行期间出现异常,可能会导致数据不一致,例如库存更新成功,但订单创建失败。
所以综上所述,lua脚本其实是不能不学的,就算是实际上机开发也是会用到的。