初探Spring Retry
Running with Spring Boot v2.4.5, Spring Retry v1.3.1
在与外部系统交互时,由网络抖动亦或是外部系统自身的短暂性问题触发的瞬时性故障是一个绕不过的坑,而重试可能是一个比较有效的避坑方案;但有一点需要特别注意:外部系统的接口是否满足幂等性,比如:尽管调用外部系统的下单接口超时了,但外部系统订单数据可能已经落库了,这个时候再重试一次,外部系统内的订单数据可能就重复了!
Spring Retry
为Spring应用提供了重试功能,同时支持声明式重试
(Declarative Retry)和编程式重试
(Programmatic Retry)两种风格;此外,其不仅对业务代码无侵入性,而且还支持重试配置;但Spring Retry的重试决策机制大多是基于Throwable
的,尚不支持基于返回结果来进行重试决策。
1 核心API解读
1.1 RetryContext
RetryContext
记录了已重试次数、上一次重试所触发的异常以及context.state
、context.name
、context.exhausted
、context.closed
、context.recovered
等属性。
public interface RetryContext extends AttributeAccessor {
String NAME = "context.name";
String STATE_KEY = "context.state";
String CLOSED = "context.closed";
String RECOVERED = "context.recovered";
String EXHAUSTED = "context.exhausted";
boolean isExhaustedOnly();
RetryContext getParent();
int getRetryCount();
Throwable getLastThrowable();
}
1.2 RetryOperations
RetryOperations
接口定义了四个execute()
方法,无论是声明式重试还是编程式重试,都是通过execute()来进行重试操作的!
public interface RetryOperations {
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E;
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) throws E;
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RetryState retryState) throws E, ExhaustedRetryException;
<T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState retryState) throws E;
}
1.3 Classifier
Classifier
接口只有一个classify()
方法,主要用于将给定类型对象C
分类为另一类型的对象T
。
public interface Classifier<C, T> extends Serializable {
T classify(C classifiable);
}
SubclassClassifier<C, T>
内部维护了一个ConcurrentHashMap<Class<? extends C>, T>
,用于存储待分类对象C
与分类后对象T
的映射关系。其分类逻辑如下:
- 若待分类对象C为null,则直接返回默认值;
- 从HashMap中查找映射关系,若存在映射关系,则直接将HashMap中的value值返回;
- 若不存在映射关系,则解析待分类对象C的上级继承链,然后将上级继承链中每一级父类作为key,从HashMap中查找映射关系;
- 若不存在映射关系,则将上级继承链中每一级父类所实现的接口作为key,再次从HashMap中查找映射关系;
- 最后,若存在映射关系,则将映射关系添加到HashMap中(以待分类对象C为key),然后将HashMap中的value值返回;若不存在映射关系,则将默认值作为分类结果返回。
BinaryExceptionClassifier<Throwable, Boolean>
继承自SubclassClassifier,主要用于将异常分类为两种类型:true
或false
,这也正符合其二元异常分类器的定位。BinaryExceptionClassifier不仅沿用了SubclassClassifier的分类逻辑,同时还进一步拓展:首先通过getCause()
方法一层一层地获取待分类异常的原因,然后以这些Throwable类型的原因作为key,来寻找映射关系。
1.4 RetryPolicy
RetryPolicy
接口定义了四个方法,canRetry()
方法用来判决是否可以重试;open()
方法用来分配RetryContext;registerThrowable()
方法用于在每次重试失败后注册异常。
public interface RetryPolicy extends Serializable {
boolean canRetry(RetryContext context);
RetryContext open(RetryContext parent);
void close(RetryContext context);
void registerThrowable(RetryContext context, Throwable throwable);
}
1.4.1 SimpleRetryPolicy
- 重试次数为固定值,默认最大重试次数为3;
- 持有BinaryExceptionClassifier二元异常分类器,用于异常分类,只有被分类为true的异常才有可能进行重试操作。
1.4.2 TimeoutRetryPolicy
- 重试次数为动态值;
- 如果目标方法执行时间超过超时阈值,则进行重试操作,默认超时时间1000ms。
1.4.3 ExceptionClassifierRetryPolicy
- 重试次数取决于委托的重试策略;
- 持有SubclassClassifier<Throwable, RetryPolicy>异常分类器,用于将异常分类为不同的重试策略,然后根据特定重试策略来判决是否可以重试。
1.4.4 BinaryExceptionClassifierRetryPolicy
- 重试次数为动态值;
- 持有BinaryExceptionClassifier<Throwable, Boolean>二元异常分类器,用于将异常分类为两种类型:true或false,true意味着可以进行重试,false意味着无法进行重试。
1.4.5 CompositeRetryPolicy
- 重试次数为动态值;
- 组合重试策略持有RetryPolicy[],其有两种模式:乐观模式和悲观模式;若是乐观模式,则只要一个重试策略可以重试,即认为可以重试;若是悲观模式,则只要有一个重试粗略不可以重试,即认为不可以重试。
1.4.6 CircuitBreakerRetryPolicy
- 一种有熔断器功能的重试策略,该重试策略只能应用于有状态重试场景。
1.5 BackOffPolicy
为了应对瞬时性故障,重试操作往往应该间隔一段时间后执行,BackOffPolicy
接口即是对重试间隔的抽象;backOff()
方法是BackOffPolicy接口的核心,在Spring Retry 1.1 前其底层依托于Object#wait()
方法来模拟重试间隔,在在Spring Retry 1.1 后依托于Thread#sleep()
方法来模拟重试间隔。
public interface BackOffPolicy {
BackOffContext start(RetryContext context);
void backOff(BackOffContext backOffContext) throws BackOffInterruptedException;
}
1.5.1 NoBackOffPolicy
- 无间隔重试,即立即执行下次重试。
1.5.2 FixedBackOffPolicy
- 重试间隔时间为固定值,默认值为1000ms。
1.5.3 UniformRandomBackOffPolicy
- 重试间隔时间为动态值;
- minBackOffPeriod默认值为500ms,maxBackOffPeriod默认值为1500ms;下一次重试间隔时间计算公式:
minBackOffPeriod + random.nextInt(maxBackOffPeriod - minBackOffPeriod)
1.5.4 ExponentialBackOffPolicy
- 重试间隔时间为动态指数值;
- initialInterval默认值为100ms,maxInterval默认值为30000ms,multiplier默认值为2;下一次重试间隔时间计算公式:
interval * multiplier
1.5.5 ExponentialRandomBackOffPolicy
- 重试间隔时间为动态随机指数值;
- initialInterval默认值为100ms,maxInterval默认值为30000ms,multiplier默认值为2;下一次重试间隔时间计算公式:
(interval * multiplier) * (1 + r.nextFloat() * (multiplier - 1))
1.6 RecoveryCallback
为了进一步增强业务方法的健壮性,我们可以通过实现RecoveryCallback
回调接口来封装一个兜底逻辑;这样在重试已耗尽且业务方法依然执行失败的时候,就会执行该兜底逻辑。
public interface RecoveryCallback<T> {
T recover(RetryContext context) throws Exception;
}
1.7 RetryListener
个人感觉重试监听器是一个鸡肋的功能,在实际工作中,基本用不到重试监听器,我们重点关注RetryListener
接口中open()
、close()
、onError()
这三个方法的调用时机即可。
public interface RetryListener {
// 在整个重试之前执行
<T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback);
// 在整个重试之后执行
<T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);
// 在每次重试失败后执行
<T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);
}
1.8 RetryState
在Spring Retry中,重试可以分为:无状态重试
(Stateless Retry)与有状态重试
(Stateful Retry)。在无状态重试中,每次进入RetryTemplate#execute()
方法内的while循环
前,一定会生成一个全新的RetryContext
实例。但在有状态重试中,第一次进入RetryTemplate#execute()
方法内的while循环
前,首先会生成一个全新的RetryContext
实例,然后根据目标方法生成一个key,最后将key与RetryContext实例组成键值对保存在RetryContextCache
中;那么在下次进入RetryTemplate#execute()
方法内的while循环
前,就不再生成RetryContext实例了,而是使用RetryContextCache中的RetryContext实例。换句话说,无状态重试中的RetryContext实例是保存在栈内,而有状态重试中的RetryContext实例是保存在堆内。
相信大家和笔者一样,在工作中多使用无状态重试,但其实有状态重试也是有用武之地的,比如:事务回滚和熔断器。
在事务回滚场景中,当目标方法(业务方法)抛出特定异常时,重试变得没有意义了,需要立即从execute()方法内的while循环语句内重新抛出该异常,从而进行事务回滚操作,重新抛出异常代码如下所示:
if (shouldRethrow(retryPolicy, context, state)) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}
在熔断器场景中,CircuitBreakerRetryPolicy一般与SimpleRetryPolicy或MaxAttemptsRetryPolicy组合使用,前者委派后者来进行重试决策,但需要明确一点:目标方法每次只会在execute()方法内的while循环中执行一次(目标方法的每一次执行都是在不同的线程中完成的),之后借助于如下代码立即退出while循环:
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
RetryState
接口用于标识无状态重试与有状态重试,具体地,在调用execute()方法时,若RetryState参数为null,则意味着无状态重试,否则为有状态重试。RetryState接口定义了三个方法:
getKey()
,生成RetryContext实例的全局唯一标识;isForceRefresh()
,若值为true
,则每次都重新生成RetryContext实例,若值为false
,则从RetryContextCache中查询RetryContext实例;rollbackFor()
,标识该异常是否需要回滚。
public interface RetryState {
Object getKey();
boolean isForceRefresh();
boolean rollbackFor(Throwable exception);
}
2 声明式重试中若干注解
2.1 @Backoff
2.2 @Retryable
2.3 @Recover
@Recover
是一个标记接口,并没有定义相关属性。但需要注意:兜底方法的第一个参数是可选的,若存在,则只能是Throwable
及其子类,其余参数必须与目标方法中的参数一致,包括:类型和顺序。
2.4 @CircuitBreaker
3 快速入门
GoogleSearchService
通过委派谷歌搜索接口来搜索给定内容。GoogleSearchService
public interface GoogleSearchService {
public void search(String content) throws IOException, InterruptedException, URISyntaxException;
}
GoogleSearchServiceImpl
public class GoogleSearchServiceImpl implements GoogleSearchService {
private static final Logger LOGGER = LoggerFactory.getLogger(GoogleSearchServiceImpl.class);
@Override
public void search(String content) throws IOException, InterruptedException, URISyntaxException {
LOGGER.info(">>>>>> searching··· ");
URL url = new URL("https://www.google.com/complete/search?q=" + content);
URI uri = new URI(url.getProtocol(), url.getHost(), url.getPath(), url.getQuery(), null);
HttpRequest request = HttpRequest
.newBuilder()
.uri(uri)
.build();
HttpClient client = HttpClient
.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(3))
.build();
HttpResponse<Void> response = client.send(request, HttpResponse.BodyHandlers.discarding());
}
}
需求
若search()
方法抛出HttpConnectTimeoutException异常,则重复执行search()
方法,最大重试次数为三次(包括初始执行),重试间隔为500毫秒;如果重试耗尽后,search()
方法依然抛出该异常,则打印错误日志信息。
下面分别从两个小节来展示编程式重试和声明式重试是如何实现上述需求的。
3.1 编程式重试
public class ProgrammaticSpringRetryApp {
private static final Logger LOGGER = LoggerFactory.getLogger(ProgrammaticSpringRetryApp.class);
public static void main(String[] args) throws Throwable {
GoogleSearchService googleSearchService = new GoogleSearchServiceImpl();
// 重试策略:CompositeRetryPolict,由MaxAttemptRetryPolicy和BinaryExceptionClassifierRetryPolicy组成
// 退避策略:FixedBackOffPolicy,重试间隔500毫秒
RetryTemplate retryTemplate = RetryTemplate
.builder()
.retryOn(HttpConnectTimeoutException.class)
.maxAttempts(3)
.fixedBackoff(500)
.build();
retryTemplate.execute(
new RetryCallback<Object, Throwable>() {
@Override
public Object doWithRetry(RetryContext context) throws Throwable {
googleSearchService.search("spring retry");
return null;
}
},
new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
LOGGER.error("调用谷歌搜索接口失败,执行兜底逻辑");
return null;
}
}
);
}
}
执行结果
2021:20:25.005 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
2021:20:25.015 [main] INFO pers.duxiaotou.service.impl.GoogleSearchServiceImpl - >>>>>> searching···
2021:20:30.117 [main] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=1
2021:20:30.117 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=1
2021:20:30.118 [main] INFO pers.duxiaotou.service.impl.GoogleSearchServiceImpl - >>>>>> searching···
2021:20:33.641 [main] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=2
2021:20:33.641 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=2
2021:20:33.641 [main] INFO pers.duxiaotou.service.impl.GoogleSearchServiceImpl - >>>>>> searching···
2021:20:36.655 [main] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=3
2021:20:36.655 [main] DEBUG org.springframework.retry.support.RetryTemplate - Retry failed last attempt: count=3
2021:20:36.656 [main] ERROR pers.duxiaotou.ProgrammaticSpringRetryApp - 调用谷歌搜索接口失败,执行兜底逻辑
3.2 声明式重试
@Service
@Slf4j
public class GoogleSearchServiceImpl implements GoogleSearchService {
private static final Logger LOGGER = LoggerFactory.getLogger(GoogleSearchServiceImpl.class);
@Retryable(
maxAttempts = 3,
backoff = @Backoff(value = 500),
include = {HttpConnectTimeoutException.class},
stateful = false)
@Override
public void search(String content) throws IOException, InterruptedException, URISyntaxException {
LOGGER.info(">>>>>> searching··· ");
URL url = new URL("https://www.google.com/complete/search?q=" + content);
URI uri = new URI(url.getProtocol(), url.getHost(), url.getPath(), url.getQuery(), null);
HttpRequest request = HttpRequest
.newBuilder()
.uri(uri)
.build();
HttpClient client = HttpClient
.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(3))
.build();
HttpResponse<Void> response = client.send(request, HttpResponse.BodyHandlers.discarding());
}
@Recover
public void recover(Throwable throwable, String content) {
LOGGER.error("调用谷歌搜索接口失败,执行兜底逻辑");
}
}
@SpringBootApplication
@EnableRetry
public class DuXiaoTouSpringAopApp {
public static void main(String[] args) throws Throwable {
ConfigurableApplicationContext applicationContext = SpringApplication.run(DuXiaoTouSpringAopApp.class, args);
GoogleSearchService googleSearchService = applicationContext.getBean(GoogleSearchService.class);
googleSearchService.search("spring retry");
}
}
执行结果
2021-06-14 21:39:01.321 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Retry: count=0
2021-06-14 21:39:01.321 INFO 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl : >>>>>> searching···
2021-06-14 21:39:04.907 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Checking for rethrow: count=1
2021-06-14 21:39:04.907 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Retry: count=1
2021-06-14 21:39:04.908 INFO 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl : >>>>>> searching···
2021-06-14 21:39:08.436 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Checking for rethrow: count=2
2021-06-14 21:39:08.436 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Retry: count=2
2021-06-14 21:39:08.437 INFO 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl : >>>>>> searching···
2021-06-14 21:39:11.449 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Checking for rethrow: count=3
2021-06-14 21:39:11.449 DEBUG 21696 --- [restartedMain] o.s.retry.support.RetryTemplate : Retry failed last attempt: count=3
2021-06-14 21:39:11.450 ERROR 21696 --- [restartedMain] p.d.s.impl.GoogleSearchServiceImpl : 调用谷歌搜索接口失败,执行兜底逻辑
4 @Retryable实现原理剖析
将@Retryable
注解添加到目标对象GoogleSearchServiceImpl中search()方法后,当search()方法抛出HttpConnectTimeoutException异常时,Spring Retry会自动为调用方重复执行search()方法。那Spring Retry究竟是如何为调用方提供自动重试能力的呢?众所周知,获取重试能力的关键在于@EnableRetry
注解,该注解可以开启Spring Retry开关。那@EnableRetry注解可能就是解开谜团的突破口了。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@EnableAspectJAutoProxy(proxyTargetClass = false)
@Import(RetryConfiguration.class)
@Documented
public @interface EnableRetry {
boolean proxyTargetClass() default false;
}
在@Retryable源码中,我们发现了@EnableAspectJAutoProxy
注解的身影,这说明Spring Retry是基于Spring AOP为目标对象生成代理对象从而拓展出重试能力的!
关于Spring AOP的基础知识请参考《Spring AOP,从入门到进阶》。BeanPostProcessor
接口是Spring中常用的IoC容器拓展点;有了BeanPostProcessor,任何人都可以在Bean初始化前后对其进行个性化改造,甚至使用代理对象将Bean替换;在Spring AOP中,扮演BeanPostProcessor角色的是AbstractAutoProxyCreator
抽象类,其主要用于创建代理对象。
public abstract class AbstractAutoProxyCreator implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
return wrapIfNecessary(bean, beanName);
}
return bean;
}
protected Object wrapIfNecessary(Object bean, String beanName) {
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
if (specificInterceptors != DO_NOT_PROXY) {
Object proxy = createProxy(
bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
return proxy;
}
return bean;
}
}
从上述AbstractAutoProxyCreator源码中,我们发现:只有当某一Bean获取到PointcutAdvisor
时(specificInterceptors数组非空),才会为该Bean生成代理对象。显然,Spring Retry肯定定义了一个PointcutAdvisor,要不然谁会闲的无聊帮你定义哦!
RetryConfiguration
就是Spring Retry自己定义的PointcutAdvisor,它主要负责构建Advice
和Pointcut
,RetryConfiguration源码如下:
@Component
public class RetryConfiguration extends AbstractPointcutAdvisor implements IntroductionAdvisor, BeanFactoryAware, InitializingBean {
private Advice advice;
private Pointcut pointcut;
@Override
public void afterPropertiesSet() throws Exception {
Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
retryableAnnotationTypes.add(Retryable.class);
this.pointcut = buildPointcut(retryableAnnotationTypes);
this.advice = buildAdvice();
}
protected Advice buildAdvice() {
return new AnnotationAwareRetryOperationsInterceptor();
}
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> retryAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> retryAnnotationType : retryAnnotationTypes) {
Pointcut filter = new RetryConfiguration.AnnotationClassOrMethodPointcut(retryAnnotationType);
if (result == null) {
result = new ComposablePointcut(filter);
}
else {
result.union(filter);
}
}
return result;
}
}
至此,我们搞明白了一点,即只要目标对象含有@Retryable注解,那么就一定可以获取到RetryConfiguration这一PointcutAdvisor,继而为目标对象生成代理对象!
接下来,我们需要搞清楚RetryConfiguration
在构建Advice时所使用的AnnotationAwareRetryOperationsInterceptor
有何意义?AnnotationAwareRetryOperationsInterceptor继承自MethodInterceptor
,无疑invoke()
是核心逻辑。
public class AnnotationAwareRetryOperationsInterceptor implements MethodInterceptor {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// invocation.getThis()获取到的为目标对象,如:GoogleSearchServiceImpl
// invocation.getMethod()获取到的为目标方法,如:search()
// 若是无状态重试,则委派RetryOperationsInterceptor进行处理
// 若是有状态重试,则委派StatefulRetryOperationsInterceptor进行处理
MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
if (delegate != null) {
return delegate.invoke(invocation);
}
else {
return invocation.proceed();
}
}
/**
* 根据目标对象和目标方法获取对应的重试拦截器
*/
private MethodInterceptor getDelegate(Object target, Method method) {
MethodInterceptor interceptor = NULL_INTERCEPTOR;
Retryable retryable = findAnnotationOnTarget(target, method, Retryable.class);
if (retryable != null) {
if (StringUtils.hasText(retryable.interceptor())) {
interceptor = this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);
}
else if (retryable.stateful()) {
interceptor = getStatefulInterceptor(target, method, retryable);
}
else {
interceptor = getStatelessInterceptor(target, method, retryable);
}
}
return interceptor;
}
/**
* 无状态重试拦截器:RetryOperationsInterceptor
*/
private MethodInterceptor getStatelessInterceptor(Object target, Method method, Retryable retryable) {
RetryTemplate template = createTemplate(retryable.listeners());
template.setRetryPolicy(getRetryPolicy(retryable));
template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));
return RetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label())
.recoverer(getRecoverer(target, method)).build();
}
/**
* 有状态重试拦截器:StatefulRetryOperationsInterceptor
*/
private MethodInterceptor getStatefulInterceptor(Object target, Method method, Retryable retryable) {
RetryTemplate template = createTemplate(retryable.listeners());
template.setRetryContextCache(this.retryContextCache);
CircuitBreaker circuit = findAnnotationOnTarget(target, method, CircuitBreaker.class);
if (circuit != null) {
RetryPolicy policy = getRetryPolicy(circuit);
CircuitBreakerRetryPolicy breaker = new CircuitBreakerRetryPolicy(policy);
breaker.setOpenTimeout(getOpenTimeout(circuit));
breaker.setResetTimeout(getResetTimeout(circuit));
template.setRetryPolicy(breaker);
template.setBackOffPolicy(new NoBackOffPolicy());
String label = circuit.label();
if (!StringUtils.hasText(label)) {
label = method.toGenericString();
}
return RetryInterceptorBuilder.circuitBreaker().keyGenerator(new FixedKeyGenerator("circuit"))
.retryOperations(template).recoverer(getRecoverer(target, method)).label(label).build();
}
RetryPolicy policy = getRetryPolicy(retryable);
template.setRetryPolicy(policy);
template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));
String label = retryable.label();
return RetryInterceptorBuilder.stateful().keyGenerator(this.methodArgumentsKeyGenerator)
.newMethodArgumentsIdentifier(this.newMethodArgumentsIdentifier).retryOperations(template).label(label)
.recoverer(getRecoverer(target, method)).build();
}
}
AnnotationAwareRetryOperationsInterceptor的invoke()主要逻辑有:
- 首先,根据目标对象和目标方法获取Retryable注解接口;
- 然后,优先根据Retryable注解中interceptor属性获取MethodInterceptor重试拦截器;若无重试拦截器,则进一步根据stateful属性获取RetryOperationsInterceptor或StatefulRetryOperationsInterceptor;
- 最后,AnnotationAwareRetryOperationsInterceptor委派RetryOperationsInterceptor或StatefulRetryOperationsInterceptor进行处理。
RetryOperationsInterceptor
和StatefulRetryOperationsInterceptor
同样继承自MethodInterceptor接口,继续跟进invoke()方法。RetryOperationsInterceptor
public class RetryOperationsInterceptor implements MethodInterceptor {
public Object invoke(final MethodInvocation invocation) throws Throwable {
String name;
if (StringUtils.hasText(label)) {
name = label;
} else {
name = invocation.getMethod().toGenericString();
}
final String label = name;
// 封装RetryCallback回调实例
// RetryCallback中doWithRetry()方法内最终将执行到目标对象的目标方法
RetryCallback<Object, Throwable> retryCallback = new MethodInvocationRetryCallback<Object, Throwable>(
invocation, label) {
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute(RetryContext.NAME, label);
return ((ProxyMethodInvocation) invocation).invocableClone().proceed();
}
};
if (recoverer != null) {
RetryOperationsInterceptor.ItemRecovererCallback recoveryCallback = new RetryOperationsInterceptor.ItemRecovererCallback(invocation.getArguments(), recoverer);
return this.retryOperations.execute(retryCallback, recoveryCallback);
}
return this.retryOperations.execute(retryCallback);
}
}
StatefulRetryOperationsInterceptor
public class StatefulRetryOperationsInterceptor implements MethodInterceptor {
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
Object[] args = invocation.getArguments();
Object defaultKey = Arrays.asList(args);
if (args.length == 1) {
defaultKey = args[0];
}
Object key = createKey(invocation, defaultKey);
RetryState retryState = new DefaultRetryState(key,
this.newMethodArgumentsIdentifier != null && this.newMethodArgumentsIdentifier.isNew(args),
this.rollbackClassifier);
RetryCallback<Object, Throwable> retryCallback = new StatefulMethodInvocationRetryCallback<Object, Throwable>(
invocation, label) {
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute(RetryContext.NAME, label);
return this.invocation.proceed();
}
};
if (recoverer != null) {
ItemRecovererCallback recoveryCallback = new ItemRecovererCallback(args, this.recoverer, retryState);
return this.retryOperations.execute(retryCallback, recoveryCallback, retryState);
}
return this.retryOperations.execute(retryCallback, null, retryState);
}
}
从上面RetryOperationsInterceptor和StatefulRetryOperationsInterceptor的源码来看,二者invoke()实现逻辑基本一致,最终都是手动调用RetryTemplate#execute()
方法。
最后,RetryTemplate中的若干execute()重载方法的核心逻辑均落在doExecute()
方法,解读如下:
public class RetryTemplate implements RetryOperations {
private static final String GLOBAL_STATE = "state.global";
protected <T, E extends Throwable> T doExecute(
RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
// 重试策略
RetryPolicy retryPolicy = this.retryPolicy;
// 退避策略
BackOffPolicy backOffPolicy = this.backOffPolicy;
// 对于无状态重试,直接重新生成RetryContext实例
// 对于有状态重试,则从RetryContextCache获取已有RetryContext实例
RetryContext context = open(retryPolicy, state);
// 上一次执行目标方法时抛出异常
Throwable lastException = null;
// 重试是否已耗尽
boolean exhausted = false;
try {
// 在执行目标方法前,执行重试监听器的open()方法
boolean running = doOpenInterceptors(retryCallback, context);
if (!running) {
throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
}
// 生成BackOffContext实例,其内维护的退避策略相关信息,比如重试间隔值
BackOffContext backOffContext = null;
Object resource = context.getAttribute("backOffContext");
if (resource instanceof BackOffContext) {
backOffContext = (BackOffContext) resource;
}
if (backOffContext == null) {
backOffContext = backOffPolicy.start(context);
if (backOffContext != null) {
context.setAttribute("backOffContext", backOffContext);
}
}
// while循环
// 委托具体RetryPolicy来进行重试判决
// context.isExhaustedOnly()一般总是false
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
lastException = null;
// 执行目标方法,即业务方法,同步回调
return retryCallback.doWithRetry(context);
}
catch (Throwable e) {
// 更新lastException
lastException = e;
// 注册异常
// 首先,委托RetryPolicy的registerThrowable()方法注册异常,如:记录重试次数
// 然后,若是有状态重试,则注册当前RetryContext,即将其添加到RetryContextCache
registerThrowable(retryPolicy, state, context, e);
// 执行重试监听器的OnError()方法
doOnErrorInterceptors(retryCallback, context, e);
// 当前执行目标方法失败,会执行退避策略,但前提是可以继续重试
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
// 执行退避策略,主要依赖Thread.sleep()方法来模拟重试间隔
backOffPolicy.backOff(backOffContext);
}
// 是否应该立即重新抛出异常
// 对于无状态异常,是不会重新抛出异常的
// 对于有状态异常,则委托RetryState进行判断是否需要重新抛出异常:
// state != null && state.rollbackFor(context.getLastThrowable())
if (shouldRethrow(retryPolicy, context, state)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
}
// 抛出异常
throw RetryTemplate.<E>wrapIfNecessary(e);
}
}
// 能执行到这里,说明目标方法执行一定是执行失败了的,但并不需要重新抛出异常,即shouldRethrow()为false,
// shouldRethrow()为false,有两种情况:无状态重试和有状态重试;
// 这部分代码主要用于退出while循环,而当前只有在熔断器重试策略中,RetryContext中才会有GLOBAL_STATE哦,
// 这也说明在熔断器模式下,调用方每次调用目标方法,最终只会执行一次,会在此退出while循环啊。
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
}
// 执行至此,说明重试已耗尽了
exhausted = true;
// 重试耗尽,若有兜底方法,则执行兜底逻辑;否则抛出异常
return handleRetryExhausted(recoveryCallback, context, state);
}
catch (Throwable e) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}
finally {
close(retryPolicy, context, state, lastException == null || exhausted);
// 执行重试监听器的close()方法
doCloseInterceptors(retryCallback, context, lastException);
}
}
}
Spring Retry声明式重试执行流程
5 参考文档
- https://github.com/spring-projects/spring-retry
- https://developer.aliyun.com/article/92899