目录
- 一、简介
- 二、maven依赖
- 三、编码实现
- 3.1、application.properties
- 3.2、Redis配置类
- 3.3、监听器
- 3.4、服务类
- 3.5、工具类
- 四、测试
- 4.1、测试类
- 4.2、单实例
- 4.3、多实例
- 结语
一、简介
本文今天主要是讲Redis中对过期key的监听,可能很多小伙伴不会,或者使用会出现一些不可思议的问题,比如在系统中设置了一个缓存,希望在缓存失效后去做什么操作,但是实际中可能又出现了操作重复的问题。所以今天来讨论下怎么正确使用。我们来个最简单的集群架构,如下图:
我们上面图中看到是服务A和服务B就是同一个服务的不同实例。
二、maven依赖
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<parent> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-parent</artifactId> | |
<version>2.6.0</version> | |
<relativePath/> <!-- lookup parent from repository --> | |
</parent> | |
<groupId>com.alian</groupId> | |
<artifactId>expiration</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<name>expiration</name> | |
<description>redis-key-expiration-listener</description> | |
<properties> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> | |
<project.package.directory>target</project.package.directory> | |
<java.version>1.8</java.version> | |
<!--com.fasterxml.jackson 版本--> | |
<jackson.version>2.9.10</jackson.version> | |
<!--阿里巴巴fastjson 版本--> | |
<fastjson.version>1.2.68</fastjson.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-web</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
</dependency> | |
<!--redis依赖--> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-data-redis</artifactId> | |
<version>${parent.version}</version> | |
</dependency> | |
<!--用于序列化--> | |
<dependency> | |
<groupId>com.fasterxml.jackson.core</groupId> | |
<artifactId>jackson-databind</artifactId> | |
<version>${jackson.version}</version> | |
</dependency> | |
<!--java 8时间序列化--> | |
<dependency> | |
<groupId>com.fasterxml.jackson.datatype</groupId> | |
<artifactId>jackson-datatype-jsr310</artifactId> | |
<version>${jackson.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>com.alibaba</groupId> | |
<artifactId>fastjson</artifactId> | |
<version>1.2.68</version> | |
</dependency> | |
<dependency> | |
<groupId>org.projectlombok</groupId> | |
<artifactId>lombok</artifactId> | |
<version>1.16.14</version> | |
</dependency> | |
<dependency> | |
<groupId>junit</groupId> | |
<artifactId>junit</artifactId> | |
<version>4.13.2</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-maven-plugin</artifactId> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
三、编码实现
3.1、application.properties
# 端口
server.port=8090
# 上下文路径
server.servlet.context-path=/expiration
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=192.168.0.193
#spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=10
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=20000
# 读时间(毫秒)
spring.redis.timeout=10000
# 连接超时时间(毫秒)
spring.redis.connect-timeout=10000
3.2、Redis配置类
RedisConfig
package com.alian.expiration.config; | |
import com.fasterxml.jackson.annotation.JsonAutoDetect; | |
import com.fasterxml.jackson.annotation.PropertyAccessor; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.fasterxml.jackson.databind.SerializationFeature; | |
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; | |
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer; | |
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; | |
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer; | |
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer; | |
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; | |
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.data.redis.connection.RedisConnectionFactory; | |
import org.springframework.data.redis.core.RedisTemplate; | |
import org.springframework.data.redis.listener.RedisMessageListenerContainer; | |
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; | |
import org.springframework.data.redis.serializer.RedisSerializer; | |
import org.springframework.data.redis.serializer.StringRedisSerializer; | |
import java.time.LocalDate; | |
import java.time.LocalDateTime; | |
import java.time.LocalTime; | |
import java.time.format.DateTimeFormatter; | |
public class RedisConfig { | |
/** | |
* redis配置 | |
* | |
* @param redisConnectionFactory | |
* @return | |
*/ | |
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { | |
// 实例化redisTemplate | |
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); | |
//设置连接工厂 | |
redisTemplate.setConnectionFactory(redisConnectionFactory); | |
// key采用String的序列化 | |
redisTemplate.setKeySerializer(keySerializer()); | |
// value采用jackson序列化 | |
redisTemplate.setValueSerializer(valueSerializer()); | |
// Hash key采用String的序列化 | |
redisTemplate.setHashKeySerializer(keySerializer()); | |
// Hash value采用jackson序列化 | |
redisTemplate.setHashValueSerializer(valueSerializer()); | |
// 支持事务 | |
// redisTemplate.setEnableTransactionSupport(true); | |
//执行函数,初始化RedisTemplate | |
redisTemplate.afterPropertiesSet(); | |
return redisTemplate; | |
} | |
/** | |
* key类型采用String序列化 | |
* | |
* @return | |
*/ | |
private RedisSerializer<String> keySerializer() { | |
return new StringRedisSerializer(); | |
} | |
/** | |
* value采用JSON序列化 | |
* | |
* @return | |
*/ | |
private RedisSerializer<Object> valueSerializer() { | |
//设置jackson序列化 | |
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); | |
//设置序列化对象 | |
jackson2JsonRedisSerializer.setObjectMapper(getMapper()); | |
return jackson2JsonRedisSerializer; | |
} | |
/** | |
* 使用com.fasterxml.jackson.databind.ObjectMapper | |
* 对数据进行处理包括java8里的时间 | |
* | |
* @return | |
*/ | |
private ObjectMapper getMapper() { | |
ObjectMapper mapper = new ObjectMapper(); | |
//设置可见性 | |
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); | |
//默认键入对象 | |
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); | |
//设置Java 8 时间序列化 | |
JavaTimeModule timeModule = new JavaTimeModule(); | |
timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); | |
timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); | |
timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss"))); | |
timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); | |
timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); | |
timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss"))); | |
//禁用把时间转为时间戳 | |
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); | |
mapper.registerModule(timeModule); | |
return mapper; | |
} | |
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { | |
RedisMessageListenerContainer container = new RedisMessageListenerContainer(); | |
container.setConnectionFactory(connectionFactory); | |
return container; | |
} | |
} |
和我们之前整合redis差不多,只不过在最后增加了一个redis消息监听监听容器RedisMessageListenerContainer
3.3、监听器
RedisKeyExpirationListener
package com.alian.expiration.listener; | |
import com.alian.expiration.service.RedisExpirationService; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.data.redis.connection.Message; | |
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; | |
import org.springframework.data.redis.listener.RedisMessageListenerContainer; | |
import org.springframework.stereotype.Component; | |
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { | |
private RedisExpirationService redisExpirationService; | |
// 把我们上面一步配置的bean注入进去 | |
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { | |
super(listenerContainer); | |
} | |
/** | |
* 针对redis数据失效事件,进行数据处理 | |
* | |
* @param message | |
* @param pattern | |
*/ | |
public void onMessage(Message message, byte[] pattern) { | |
// 用户做自己的业务处理即可,注意message.toString()可以获取失效的key | |
String expiredKey = message.toString(); | |
log.info("onMessage --> redis 过期的key是:{}", expiredKey); | |
try { | |
// 对过期key进行处理 | |
redisExpirationService.processingExpiredKey(expiredKey); | |
log.info("过期key处理完成:{}", expiredKey); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
log.error("处理redis 过期的key异常:{}", expiredKey, e); | |
} | |
} | |
} |
实现的步骤如下:
- 继承KeyExpirationEventMessageListener
- 把redis消息监听监听容器RedisMessageListenerContainer 注入到密钥空间事件消息侦 听器中
- 重写onMessage方法
- 通过Message 的 toString() 方法就可以获取到过期的key
- 对key中关键信息进行业务处理,比如 id
3.4、服务类
RedisExpirationService
package com.alian.expiration.service; | |
import com.alian.expiration.util.SignUtils; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.data.redis.core.RedisTemplate; | |
import org.springframework.stereotype.Service; | |
import java.util.concurrent.TimeUnit; | |
public class RedisExpirationService { | |
private RedisTemplate<String, Object> redisTemplate; | |
public void processingExpiredKey(String expiredKey) { | |
// 如果是优惠券的key(一定要规范命名) | |
if (expiredKey.startsWith("com.mall.coupon.id")) { | |
// 临时key,此key可以在业务处理完,然后延迟一定时间删除,或者不处理 | |
String tempKey = SignUtils.md5(expiredKey, "UTF-8"); | |
// 临时key不存在才设置值,key超时时间为10秒(此处相当于分布式锁的应用) | |
Boolean exist = redisTemplate.opsForValue().setIfAbsent(tempKey, "1", 10, TimeUnit.SECONDS); | |
if (Boolean.TRUE.equals(exist)) { | |
log.info("Business Handing..."); | |
// 比如截取里面的id,然后关联数据库进行处理 | |
} else { | |
log.info("Other service is handing..."); | |
} | |
} else { | |
log.info("Expired keys without processing"); | |
} | |
} | |
} |
基本流程如下:
- 判断是否是需要处理的key,一般这种key通过命名规范加以处理
- 以当前key生成一个新的key作为分布式key
- 如果redis中不存在这个新的key,则为新的key设置一个值,达到分布式服务处理(核心)
- 设置成功的,进行业务处理;设置失败了,说明其他服务正在处理这个key
- 根据 key 的关键信息(比如截取id),进行业务处理
3.5、工具类
SignUtils
package com.alian.expiration.util; | |
import java.security.MessageDigest; | |
public class SignUtils { | |
public static final String md5(String s, String charset) { | |
char[] hexDigits = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; | |
try { | |
byte[] btInput = s.getBytes(charset); | |
MessageDigest mdInst = MessageDigest.getInstance("MD5"); | |
mdInst.update(btInput); | |
byte[] md = mdInst.digest(); | |
int j = md.length; | |
char[] str = new char[j * 2]; | |
int k = 0; | |
for (byte byte0 : md) { | |
str[k++] = hexDigits[byte0 >>> 4 & 15]; | |
str[k++] = hexDigits[byte0 & 15]; | |
} | |
return new String(str); | |
} catch (Exception var11) { | |
return ""; | |
} | |
} | |
} |
四、测试
4.1、测试类
简单模拟下发送一个优惠券数据到redis,然后设置超时时间
package com.alian.expiration; | |
import lombok.extern.slf4j.Slf4j; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.data.redis.core.RedisTemplate; | |
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
public class RedisKeyExpirationTest { | |
private RedisTemplate<String, Object> redisTemplate; | |
public void keyExpiration() { | |
// 优惠券信息 | |
String id = "2023021685264735"; | |
Map<String, String> map = new HashMap<>(); | |
map.put("id", id); | |
map.put("amount", "1000"); | |
map.put("type", "1001"); | |
map.put("describe", "满减红包"); | |
// 缓存到redis | |
redisTemplate.opsForHash().putAll("com.mall.coupon.id." + id, map); | |
// 设置过期时间 | |
redisTemplate.expire("com.mall.coupon.id." + id, 10, TimeUnit.SECONDS); | |
} | |
} |
4.2、单实例
单实例就是服务只部署了一份,我们启动一份,端口是8090,然后通过上面的测试类,发送一个消息,结果如下:
10:23:39 701 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
10:23:39 988 INFO [container-2]:Business Handing...
10:23:39 989 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
10:23:50 005 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
10:23:50 005 INFO [container-3]:Expired keys without processing
10:23:50 005 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12
4.3、多实例
多实例就是服务部署了多份,比如我们启动两份,端口分别为8090和8091,然后通过上面的测试类,发送一个消息,8090端口的服务结果如下(Business Handing…):
11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Business Handing...
11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12
8091端口的服务结果如下(Other service is handing…):
11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Other service is handing...
11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12
结果分析:
- 多实例的情况下,每个实例都会收到过期key通知
- 通过redis分布式锁,实现只有一个实例会进行业务处理,防止重复
- 使用分布式锁会有一个新的key过期,并且收到该key的通知,你可以业务执行完延迟一定时间(避免重复执行),再删除,也可以不处理(因为本就不是要处理业务的key)
结语
多实例的情况下,每个实例都会收到过期key通知,可以通过分布式锁的方式去处理业务,避免业务重复执行