Java 技术栈中间件优雅停机方案设计与实现全景图——下

Java
263
0
0
2023-06-12

前言

紧接上文,那么接下来笔者就从一些知名框架源码实现角度,为大家详细阐述一下优雅关闭是如何实现的?

4. Spring 的优雅关闭机制

前面两个小节中我们从支持优雅关闭最底层的内核信号机制开始聊起然后到 JVM 进程实现优雅关闭的 ShutdwonHook 原理,经过这一系列的介绍,我们现在对优雅关闭在内核层和 JVM 层的相关机制原理有了一定的了解。

那么在真实 Java 应用中,我们到底该如何基于上述机制实现一套优雅关闭方案呢?本小节我们来从 Spring 源码中获取下答案!!

在介绍 Spring 优雅关闭机制源码实现之前,笔者先来带大家回顾下,在 Spring 的应用上下文关闭的时候,Spring 究竟给我们提供了哪些关闭时的 回调 机制,从而可以让我们在这些回调中编写 Java 应用的优雅关闭逻辑。

4.1 发布 ContextClosedEvent 事件

在 Spring 上下文开始关闭的时候,首先会发布 ContextClosedEvent 事件,注意此时 Spring 容器的 Bean 还没有开始销毁,所以我们可以在该事件回调中执行优雅关闭的操作。

 @Component
public class ShutdownListener implements ApplicationListener<ContextClosedEvent> {
       @Override
       public  void  onApplicationEvent(ContextClosedEvent event) {
                  ........优雅关闭逻辑.....
       }
} 

4.2 Spring 容器中的 Bean 销毁前回调

当 Spring 开始销毁容器中管理的 Bean 之前,会回调所有实现 DestructionAwareBeanPostProcessor 接口的 Bean 中的 postProcessBeforeDestruction 方法。

 @Component
public class DestroyBeanPostProcessor implements DestructionAwareBeanPostProcessor {

    @Override
    public void postProcessBeforeDestruction(Object bean,  String  beanName) throws Beans Exception  {

             ........Spring容器中的Bean开始销毁前回调.......
    }
} 

4.3 回调标注 @PreDestroy 注解的方法

 @Component
public class Shutdown {
    @PreDestroy
    public void preDestroy() {
        ......释放资源.......
    }
} 

4.4 回调 DisposableBean 接口中的 destroy 方法

 @Component
public class Shutdown implements DisposableBean{

    @Override
    public void destroy() throws Exception {
         ......释放资源......
    }

} 

4.5 回调自定义的销毁方法

 <bean id="Shutdown" class="com.test.netty.Shutdown"  destroy-method="doDestroy"/> 
 public class Shutdown {

    public void doDestroy() {
        .....自定义销毁方法....
    }
} 

4.6 Spring 优雅关闭机制的实现

Spring 相关应用程序本质上也是一个 JVM 进程,所以 Spring 框架想要实现优雅关闭机制也必须依托于我们在本文第三小节中介绍的 JVM 的 ShutdownHook 机制。

在 Spring 启动的时候,需要向 JVM 注册 ShutdownHook ,当我们执行 kill – 15 pid 命令时,随后 Spring 会在 ShutdownHook 中触发上述介绍的五种回调。

下面我们来看一下 Spring 中 ShutdownHook 的注册逻辑:

4.6.1 Spring 中 ShutdownHook 的注册

 public abstract class AbstractApplicationContext  extends  DefaultResourceLoader
  implements ConfigurableApplicationContext, DisposableBean {

 @Override
 public void registerShutdownHook() {
  if (this.shutdownHook == null) {
   // No shutdown hook registered yet.
   this.shutdownHook = new Thread() {
    @Override
    public void run() {
      synchronized  (startupShutdownMonitor) {
      doClose();
     }
    }
   };
   Runtime.getRuntime().addShutdownHook(this.shutdownHook);
  }
 }
} 

在 Spring 启动的时候,我们需要调用 AbstractApplicationContext#registerShutdownHook 方法向 JVM 注册 Spring 的 ShutdownHook ,从这段源码中我们看出,Spring 将 doClose() 方法封装在 ShutdownHook 线程中,而 doClose() 方法里边就是 Spring 优雅关闭的逻辑。

这里需要强调的是,当我们在一个纯 Spring 环境下,Spring 框架是不会为我们主动调用 registerShutdownHook 方法去向 JVM 注册 ShutdownHook 的,我们需要手动调用 registerShutdownHook 方法去注册。

 public class SpringShutdownHook {

    public  static  void main(String[] args) throws IOException {
        GenericApplicationContext context = new GenericApplicationContext();
                      ........
        // 注册 Shutdown Hook
        context.registerShutdownHook();
                      ........
    }
} 

而在 SpringBoot 环境下,SpringBoot 在启动的时候会为我们调用这个方法去主动注册 ShutdownHook 。我们不需要手动注册。

 public class SpringApplication {

 public ConfigurableApplicationContext run(String... args) {

                  ...............省略.................

                  ConfigurableApplicationContext context = null;
                  context = createApplicationContext();
                  refreshContext(context);

                  ...............省略.................
 }

  private  void refreshContext(ConfigurableApplicationContext context) {
  refresh(context);
  if (this.registerShutdownHook) {
   try {
    context.registerShutdownHook();
   }
   catch (AccessControlException ex) {
    // Not allowed in some environments.
   }
  }
 }

} 

4.6.2 Spring 中的优雅关闭逻辑

  protected void doClose() {
  // 更新上下文状态
  if (this.active.get() && this.closed.compareAndSet(false, true)) {
   if (logger.isInfoEnabled()) {
    logger.info("Closing " + this);
   }
            // 取消  JMX  托管
   LiveBeansView.unregisterApplicationContext(this);

   try {
    // 发布 ContextClosedEvent 事件
    publishEvent(new ContextClosedEvent(this));
   }
   catch (Throwable ex) {
    logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);
   }

   // 回调 Lifecycle beans,相关 stop 方法
   if (this.lifecycleProcessor != null) {
    try {
     this.lifecycleProcessor.onClose();
    }
    catch (Throwable ex) {
     logger.warn("Exception thrown from LifecycleProcessor on context close", ex);
    }
   }

   // 销毁 bean,触发前面介绍的几种回调
   destroyBeans();

   // Close the  state  of this context itself.
   closeBeanFactory();

   // Let subclasses do some final clean-up if they wish...
   onClose();

   // Switch to inactive.
   this.active.set(false);
  }
 } 

在这里我们可以看出最终是在 AbstractApplicationContext#doClose 方法中触发本小节开始介绍的五种回调:

  1. 发布 ContextClosedEvent 事件。 注意这里是一个同步事件 ,也就是说 Spring 的 ShutdownHook 线程在这里发布完事件之后会继续同步执行事件的处理,等到事件处理完毕后,才会去执行后面的 destroyBeans() 方法对 IOC 容器中的 Bean 进行销毁。

所以在 ContextClosedEvent 事件监听类中,可以放心地去做优雅关闭相关的操作,因为此时 Spring 容器中的 Bean 还没有被销毁。

  1. destroyBeans() 方法中依次触发剩下的四种回调。

最后结合前边小节中介绍的内容,总结 Spring 的整个优雅关闭流程如下图所示:

Spring优雅关闭机制.png

5. Dubbo 的优雅关闭

本小节优雅关闭部分源码基于 apache dubbo 2.7.7 版本,该版本中的优雅关闭是有 Bug 的,下面让我们一起来 Shooting Bug !

在前边几个小节的内容中,我们从内核提供的底层技术支持开始聊到了 JVM 的 ShutdonwHook ,然后又从 JVM 聊到了 Spring 框架的优雅关闭机制。

在了解了这些内容之后,本小节我们就来看下 dubbo 中的优雅关闭实现,由于现在几乎所有 Java 应用都会采用 Spring 作为开发框架,所以 dubbo 一般是集成在 Spring 框架中供我们使用的,它的优雅关闭和 Spring 有着紧密的联系。

5.1 Dubbo 在 Spring 环境下的优雅关闭

在本文第四小节《4. Spring的优雅关闭机制》的介绍中,我们知道在 Spring 的优雅关闭流程中,Spring 的 ShutdownHook 线程会首先发布 ContextClosedEvent 事件, 该事件是一个同步事件 ,ShutdownHook 线程发布完该事件紧接着就会同步执行该事件的监听器,当在事件监听器中处理完 ContextClosedEvent 事件之后,在回过头来执行 destroyBeans() 方法并依次触发剩下的四种回调来销毁 IOC 容器中的 Bean 。

Spring优雅关闭流程.png

由于在处理 ContextClosedEvent 事件的时候,Dubbo 所依赖的一些关键 bean 这时还没有被销毁,所以 dubbo 定义了一个 DubboBootstrapApplicationListener 用来监听 ContextClosedEvent 事件,并在 onContextClosedEvent 事件处理方法中调用 dubboBootstrap.stop() 方法开启 dubbo 的优雅关闭流程。

 public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
        implements Ordered {

    @Override
    public void onApplicationContextEvent(ApplicationContextEvent event) {
        // 这里是 Spring 的同步事件,publishEvent 和处理 Event 是在同一个线程中
        if (event  instanceof  ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }

    private void onContextClosedEvent(ContextClosedEvent event) {
        // spring 在 shutdownhook 中会先触发 ContextClosedEvent ,然后在销毁 spring beans
        // 所以这里 dubbo 开始优雅关闭时,依赖的 spring beans 并未销毁
        dubboBootstrap.stop();
    }

} 

当服务提供者 ServiceBean 和服务消费者 ReferenceBean 被初始化时,会将 DubboBootstrapApplicationListener 注册到 Spring 容器中。并开始监听 ContextClosedEvent 事件和 ContextRefreshedEvent 事件。

 public class ServiceClassPostProcessor implements BeanDefinition Registry PostProcessor, EnvironmentAware,
        ResourceLoaderAware, BeanClassLoaderAware {

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {

        // @since.7.5 注册spring启动 关闭事件的listener
        //在事件回调中中调用启动类 DubboBootStrap的start  stop来启动 关闭dubbo应用
        registerBeans(registry, DubboBootstrapApplicationListener.class);
      
                  ........省略.......
    }
} 

5.2 Dubbo 优雅关闭流程简介

由于本文的主题是介绍优雅关闭的一整条流程主线,所以这里笔者只是简要介绍 Dubbo 优雅关闭的主流程,相关细节部分笔者会在后续的 dubbo 源码解析系列里为大家详细介绍 Dubbo 优雅关闭的细节。为了避免本文发散太多,我们这里还是聚焦于流程主线。

 public class DubboBootstrap extends GenericEventListener {

    public DubboBootstrap stop() throws IllegalStateException {
        destroy();
        return this;
    }

} 

这里的核心逻辑其实就是我们在《1.2 优雅关闭》小节中介绍的两大优雅关闭主题:

  • 从当前正在关闭的应用实例上切走现有生产流量。
  • 保证业务无损。

这里大家只需要了解 Dubbo 优雅关闭的主流程即可,相关细节笔者后续会有一篇专门的文章详细为大家介绍。

 public void destroy() {
    if (destroyLock.tryLock()) {
        try {
            DubboShutdownHook.destroyAll();
             if (started.compareAndSet(true, false)
                    && destroyed.compareAndSet(false, true)) {
                 //取消注册
                unregisterServiceInstance();
                //取消元数据服务
                unexportMetadataService();
                //停止暴露服务
                unexportServices();
                //取消订阅服务
                unreferServices();
                //注销注册中心
                destroyRegistries();
                //关闭服务
                DubboShutdownHook.destroyProtocols();
                //销毁注册中心客户端实例
                destroyServiceDiscoveries();
                //清除应用配置类以及相关应用模型
                clear();
                //关闭 线程池 
                shutdown();
                //释放资源
                release();
            }
        } finally {
            destroyLock.unlock();
        }
    }
} 

从以上内容可以看出,Dubbo 的优雅关闭依托于 Spring ContextClosedEvent 事件的发布,而 ContextClosedEvent 事件的发布又依托于 Spring ShutdownHook 的注册。

dubbo spring环境优雅关闭.png

从《4.6.1 Spring 中 ShutdownHook 的注册》小节的介绍中我们知道,在 SpringBoot 环境下,SpringBoot 在启动的时候会为我们调用 ApplicationContext#registerShutdownHook 方法去主动注册 ShutdownHook 。我们不需要手动注册。

而在一个纯 Spring 环境下,Spring 框架并不会为我们主动调用 registerShutdownHook 方法去向 JVM 注册 ShutdownHook 的,我们需要手动调用 registerShutdownHook 方法去注册。

所以 Dubbo 这里为了兼容 SpringBoot 环境和纯 Spring 环境下的优雅关闭,引入了 SpringExtensionFactory类 ,只要在 Spring 环境下都会调用 registerShutdownHook 去向 JVM 注册 Spring 的 ShutdownHook 。

 public class SpringExtensionFactory implements ExtensionFactory { private static final Logger logger = LoggerFactory.getLogger(SpringExtensionFactory.class);
 private static final Set<ApplicationContext> CONTEXTS = new ConcurrentHashSet<ApplicationContext>();
 public static void addApplicationContext(ApplicationContext context) {
    CONTEXTS.add(context);
    if (context instanceof ConfigurableApplicationContext) {
        //在spring启动成功之后设置shutdownHook(兼容非SpringBoot环境)
        ((ConfigurableApplicationContext) context).registerShutdownHook();
    }
}

} 

当服务提供者 ServiceBean 和服务消费者 ReferenceBean 在初始化完成之后,会回调 SpringExtensionFactory#addApplicationContext 方法注册 ShutdownHook 。

 public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,     ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
    this.applicationContext = applicationContext;
    SpringExtensionFactory.addApplicationContext(applicationContext);
}

} 
 public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean,     ApplicationContextAware, InitializingBean, DisposableBean {
 @Override
public void setApplicationContext(ApplicationContext applicationContext) {
    this.applicationContext = applicationContext;
    SpringExtensionFactory.addApplicationContext(applicationContext);
}

} 

以上就是 Dubbo 在 Spring 集成环境下的优雅关闭全流程,下面我们来看下 Dubbo 在非 Spring 环境下的优雅关闭流程。

5.3 Dubbo 在非 Spring 环境下的优雅关闭

在上小节的介绍中我们知道 Dubbo 在 Spring 环境下依托 Spring 的 ShutdownHook ,通过监听 ContextClosedEvent 事件,从而触发 Dubbo 的优雅关闭流程。

而到了非 Spring 环境下,Dubbo 就需要定义自己的 ShutdownHook ,从而引入了 DubboShutdownHook ,直接将优雅关闭流程封装在自己的 ShutdownHook 中执行。

 public class DubboBootstrap extends GenericEventListener {  private DubboBootstrap() {
    configManager = ApplicationModel.getConfigManager();
    environment = ApplicationModel.getEnvironment();
     DubboShutdownHook.getDubboShutdownHook().register();
    ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
        @Override
        public void callback() throws Throwable {
            DubboBootstrap.this.destroy();
        }
    });
}

} 
 public class DubboShutdownHook extends Thread { public void register() {
    if (registered.compareAndSet(false, true)) {
        DubboShutdownHook dubboShutdownHook = getDubboShutdownHook();
        Runtime.getRuntime().addShutdownHook(dubboShutdownHook);
        dispatch(new DubboShutdownHookRegisteredEvent(dubboShutdownHook));
    }
}
 @Override
public void run() {
    if (logger.isInfoEnabled()) {
        logger.info("Run shutdown hook now.");
    }
     callback();
    doDestroy();
}
private void callback() {
    callbacks.callback();
}

} 

从源码中我们看到,当我们的 Dubbo 应用程序接收到 kill -15 pid 信号时,JVM 捕获到 SIGTERM(15) 信号之后,就会触发 DubboShutdownHook 线程运行,从而通过 callback() 又回调了上小节中介绍的 DubboBootstrap#destroy 方法(dubbo 的整个优雅关闭逻辑全部封装在这里)。

dubbo 非Spring环境下优雅关闭流程.png

 public class DubboBootstrap extends GenericEventListener {  public void destroy() {
    if (destroyLock.tryLock()) {
        try {
            DubboShutdownHook.destroyAll();
             if (started.compareAndSet(true, false)
                    && destroyed.compareAndSet(false, true)) {
                 ........取消注册......
              
                ........取消元数据服务........
              
                ........停止暴露服务........
             
                ........取消订阅服务........
             
                ........注销注册中心........
             
                ........关闭服务........
              
                ........销毁注册中心客户端实例........
             
                ........清除应用配置类以及相关应用模型........
            
                ........关闭线程池........
             
                ........释放资源........
             
            }
        } finally {
            destroyLock.unlock();
        }
    }
}

} 

5.4 啊哈!Bug!

前边我们在《5.1 Dubbo在Spring环境下的优雅关闭》小节和《5.3 Dubbo在非Spring环境下的优雅关闭》小节中介绍的这两个环境的下的优雅关闭方案,当它们在各自的场景下运行的时候是没有任何问题的。

但是当这两种方案结合在一起运行,就出大问题了~~~

还记得笔者在《3.2 使用 ShutdownHook 的注意事项》小节中特别强调的一点:

  • ShutdownHook 其实本质上是一个已经被初始化但是未启动的 Thread ,这些通过 Runtime.getRuntime().addShutdownHook 方法注册的 ShutdownHooks ,在 JVM 进程关闭的时候会被启动 并发执行,但是并不会保证执行顺序

所以在编写 ShutdownHook 中的逻辑时,我们应该确保程序的线程安全性,并尽可能避免死锁。最好是一个 JVM 进程只注册一个 ShutdownHook 。

Dubbo在Spring环境下的优雅关闭Bug.png

那么现在 JVM 中我们注册了两个 ShutdownHook 线程,一个 Spring 的 ShutdownHook ,另一个是 Dubbo 的 ShutdonwHook 。那么这会引出什么问题呢?

经过前边的内容介绍我们知道,无论是在 Spring 的 ShutdownHook 中触发的 ContextClosedEvent 事件还是在 Dubbo 的 ShutdownHook 中执行的 CallBack 。最终都会调用到 DubboBootstrap#destroy 方法执行真正的优雅关闭逻辑。

 public class DubboBootstrap extends GenericEventListener {  private final Lock destroyLock = new ReentrantLock();
 public void destroy() {
    if (destroyLock.tryLock()) {
        try {
            DubboShutdownHook.destroyAll();
             if (started.compareAndSet(true, false)
                    && destroyed.compareAndSet(false, true)) {
                
                    .......dubbo应用的优雅关闭.......
             
            }
        } finally {
            destroyLock.unlock();
        }
    }
}

} 

让我们来设想一个这种的场景:当 Spring 的 ShutdownHook 线程和 Dubbo 的 ShutdownHook 线程同时执行并且在同一个时间点来到 DubboBootstrap#destroy 方法中争夺 destroyLock 。

  • Dubbo 的 ShutdownHook 线程获得 destroyLock 进入 destroy() 方法体开始执行优雅关闭逻辑。
  • Spring 的 ShutdownHook 线程没有获得 destroyLock,退出 destroy() 方法。

Dubbo优雅关闭Bug.png

在 Spring 的 ShutdownHook 线程退出 destroy() 方法之后紧接着就会执行 destroyBeans() 方法销毁 IOC 容器中的 Bean ,这里边肯定涉及到一些关键业务 Bean 的销毁,比如:数据库连接池,以及 Dubbo 相关的核心 Bean。

于此同时 Dubbo 的 ShutdownHook 线程开始执行优雅关闭逻辑,《1.2 优雅关闭》小节中我们提到,优雅关闭要保证业务无损。所以需要将剩下正在进行中的业务流程继续处理完毕并将业务处理结果响应给客户端。但是这时依赖的一些业务关键 Bean 已经被销毁,比如数据库连接池,这时执行数据库操作就会抛出 CannotGetJdbcConnectionException 。导致优雅关闭失败,对业务造成了影响。

5.5 Bug 的修复

该 Bug 最终在 apache dubbo 2.7.15 版本中被修复

详情可查看Issue:

经过上小节的分析,我们知道既然这个 Bug 产生的原因是由于 Spring 的 ShutdownHook 线程和 Dubbo 的 ShutdownHook 线程并发执行所导致的。

那么当我们处于 Spring 环境下的时候,就将 Dubbo 的 ShutdownHook 注销掉即可。

 public class SpringExtensionFactory implements ExtensionFactory { private static final Logger logger = LoggerFactory.getLogger(SpringExtensionFactory.class);
 private static final Set<ApplicationContext> CONTEXTS = new ConcurrentHashSet<ApplicationContext>();
 public static void addApplicationContext(ApplicationContext context) {
    CONTEXTS.add(context);
    if (context instanceof ConfigurableApplicationContext) {
        // 注册 Spring 的 ShutdownHook
        ((ConfigurableApplicationContext) context).registerShutdownHook();
        // 在 Spring 环境下将 Dubbo 的 ShutdownHook 取消掉
        DubboShutdownHook.getDubboShutdownHook().unregister();
    }
}
} 

而在非 Spring 环境下,我们依然保留 Dubbo 的 ShutdownHook 。

 public class DubboBootstrap {  private DubboBootstrap() {
    configManager = ApplicationModel.getConfigManager();
    environment = ApplicationModel.getEnvironment();
     DubboShutdownHook.getDubboShutdownHook().register();
    ShutdownHookCallbacks.INSTANCE.addCallback(DubboBootstrap.this::destroy);
}

} 

以上内容就是 Dubbo 的整个优雅关闭主线流程,以及优雅关闭 Bug 产生的原因和修复方案。

在 Dubbo 的优雅关闭流程中最终会通过 DubboShutdownHook.destroyProtocols() 关闭底层服务。

 public class DubboBootstrap extends GenericEventListener {  private final Lock destroyLock = new ReentrantLock();
 public void destroy() {
    if (destroyLock.tryLock()) {
        try {
            DubboShutdownHook.destroyAll();
             if (started.compareAndSet(true, false)
                    && destroyed.compareAndSet(false, true)) {
                
                    .......dubbo应用的优雅关闭.......
                //关闭服务
                DubboShutdownHook.destroyProtocols();
                     .......dubbo应用的优雅关闭.......
             }
        } finally {
            destroyLock.unlock();
        }
    }
}

} 

在 Dubbo 服务的销毁过程中,会通过调用 server.close 关闭底层的 Netty 服务。

 public class DubboProtocol extends AbstractProtocol { @Override
public void destroy() {
    for (String key : new ArrayList<>(serverMap.keySet())) {
        ProtocolServer protocolServer = serverMap.remove(key);
        RemotingServer server = protocolServer.getRemotingServer();
        server.close(ConfigurationUtils.getServerShutdownTimeout());
         ...........省略........
    }
      ...........省略........
} 

最终触发 Netty 的优雅关闭。

 public class NettyServer extends AbstractServer implements RemotingServer {  @Override
protected void doClose() throws Throwable {
    ..........关闭底层Channel......
    try {
        if (bootstrap != null) {
            // 关闭 Netty 的主从 Reactor 线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
    .........清理缓存Channel数据.......
}

} 

6. Netty 的优雅关闭

通过上小节介绍 dubbo 优雅关闭的相关内容,我们很自然的引出了 Netty 的优雅关闭触发时机,那么在本小节中笔者将为大家详细介绍下 Netty 是如何优雅地装……….优雅地谢幕的~~

image.png

在之前的系列文章中,我们围绕下图所展示的 Netty 整个核心框架的运转流程介绍了主从 ReactorGroup 的创建, 启动 , 运行 , 接收网络连接 , 接收网络数据 ,发送网络数据,以及 如何在pipeline中处理相关IO事件 的整个源码实现。

netty中的reactor.png

本小节就到了 Netty 优雅谢幕的时刻了,在这谢幕的过程中,Netty 会对它的主从 ReactorGroup ,以及对应 ReactorGroup 中的 Reacto r进行优雅的关闭。下面让我们一起来看下这个优雅关闭的过程~~~

6.1 ReactorGroup 的优雅谢幕

 
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {  static final long DEFAULT_SHUTDOWN_QUIET_PERIOD =;
static final long DEFAULT_SHUTDOWN_TIMEOUT =;
@Override
public Future<?> shutdownGracefully() {
    return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}

} 

在 Netty 进行优雅关闭的整个过程中,这里涉及到了两个非常重要的控制参数:

  • gracefulShutdownQuietPeriod :优雅关闭静默期,默认为 2s 。这个参数主要来保证 Netty 整个关闭过程中的 优雅 。在关闭流程开始后,如果 Reactor 中还有遗留的异步任务需要执行,那么 Netty 就不能关闭,需要把所有异步任务执行完毕才可以。当所有异步任务执行完毕后,Netty 为了实现更加优雅的关闭操作,一定要保障业务无损,这时候就引入了静默期这个概念,如果在这个静默期内,用户没有新的任务向 Reactor 提交那么就开始关闭。如果在这个静默期内,还有用户继续提交异步任务,那么就不能关闭,需要把静默期内用户提交的异步任务执行完毕才可以放心关闭。
  • gracefulShutdownTimeout :优雅关闭超时时间,默认为 15s 。这个参数主要来保证 Netty 整个关闭过程的 可控 。我们知道一个生产级的优雅关闭方案既要保证优雅做到业务无损,更重要的是要保证关闭流程的可控,不能无限制的优雅下去。导致长时间无法完成关闭动作。于是 Netty 就引入了这个参数,如果优雅关闭超时,那么无论此时有无异步任务需要执行都要开始关闭了。

这两个控制参数是非常重要核心的两个参数,我们在后面介绍 Netty 关闭细节的时候还会为大家详细剖析,这里大家先从概念上大概理解一下。

在介绍完这两个重要核心参数之后,我们接下来看下 ReactorGroup 的关闭流程:

我们都知道 Netty 为了保证整个系统的吞吐量以及保证 Reactor 可以线程安全地,有序地处理各个 Channel 上的 IO 事件。基于这个目的 Netty 将其承载的海量连接分摊打散到不同的 Reactor 上处理。

ReactorGroup 中包含多个 Reactor ,每个 Channel 只能注册到一个固定的 Reactor 上,由这个固定的 Reactor 负责处理该 Channel 上整个生命周期的事件。

一个 Reactor 上注册了多个 Channel ,负责处理注册在其上的所有 Channel 的 IO 事件以及异步任务。

ReactorGroup 的结构如下图所示:

image.png

ReactorGroup 的关闭流程本质上其实是 ReactorGroup 中包含的所有 Reactor 的关闭,当 ReactorGroup 中的所有 Reactor 完成关闭后,ReactorGroup 才算是真正的关闭。

 
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {  // Reactor线程组中的Reactor集合
private final EventExecutor[] children;
 // 关闭future
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
 @Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
    for (EventExecutor l: children) {
        l.shutdownGracefully(quietPeriod, timeout, unit);
    }
    return terminationFuture();
}
 @Override
public Future<?> terminationFuture() {
    return terminationFuture;
}

} 
  • EventExecutor[] children :数组中存放的是当前 ReactorGroup 中包含的所有 Reactor,类型为 EventExecutor。
  • Promise<?> terminationFuture :ReactorGroup 中的关闭 Future ,用户线程通过这个 terminationFuture 可以知道 ReactorGroup 完成关闭的时机,也可以向 terminationFuture 注册一些 listener 。当 ReactorGroup 完成关闭动作后,会回调用户注册的这些 listener 。大家可以根据各自的业务场景灵活运用。

在 ReactorGroup 的关闭过程中,会挨个触发它所包含的所有 Reactor 的关闭流程。并返回 terminationFuture 给用户线程。

当 ReactorGroup 中的所有 Reactor 完成关闭之后,这个 terminationFuture 会被设置为 success,这样一来用户线程可以感知到 ReactorGroup 已经完成关闭了。

这一点笔者也在 《Reactor在Netty中的实现(创建篇)》 一文中的第四小节《4. 向Reactor线程组中所有的Reactor注册terminated回调函数》强调过。

在 ReactorGroup 创建的最后一步,会定义 Reactor 关闭的 terminationListener。在 Reactor 的 terminationListener 中会判断当前 ReactorGroup 中的 Reactor 是否全部关闭,如果已经全部关闭,则会设置 ReactorGroup的 terminationFuture 为 success 。

 //记录关闭的Reactor个数,当Reactor全部关闭后,ReactorGroup才可以认为关闭成功
private final AtomicInteger terminatedChildren = new AtomicInteger();
//ReactorGroup的关闭future
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
     ........挨个创建Reactor............
     final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                //当所有Reactor关闭后 ReactorGroup才认为是关闭成功
                terminationFuture.setSuccess(null);
            }
        }
    };
     for (EventExecutor e: children) {
        //向每个Reactor注册terminationListener
        e.terminationFuture().addListener(terminationListener);
    }
} 

从以上 ReactorGroup 的关闭流程我们可以看出,ReactorGroup 的关闭逻辑只是挨个去触发它所包含的所有 Reactor 的关闭,Netty 的整个优雅关闭核心其实是在单个 Reactor 的关闭逻辑上。毕竟 Reactor 才是真正驱动 Netty 运转的核心引擎。

6.2 Reactor 的优雅谢幕

Reactor的优雅谢幕流程.png

Reactor 的状态特别重要,从 《一文聊透Netty核心引擎Reactor的运转架构》 一文中我们知道 Reactor 是在一个 for (;;) {….} 死循环中 996 不停地工作。比如轮询 Channel 上的 IO 就绪事件,处理 IO 就绪事件,执行异步任务就是在这个死循环中完成的。

而 Reactor 在每一次循环任务结束之后,都会先去判断一下当前 Reactor 的状态,如果状态变为准备关闭状态 ST_SHUTTING_DOWN 后,Reactor 就会开启优雅关闭流程。

所以在介绍 Reactor 的关闭流程之前,笔者先来为大家捋一捋 Reactor 中的各种状态。

  • ST_NOT_STARTED = 1 :Reactor 的初始状态。在 Reactor 刚被创建出来的时候,状态为 ST_NOT_STARTED 。
  • ST_STARTED = 2 :Reactor 的启动状态。当向 Reactor 提交第一个异步任务的时候会触发 Reactor 的启动。启动之后状态变为 ST_STARTED 。

相关细节可在回顾下 《详细图解Netty Reactor启动全流程》 一文。

  • ST_SHUTTING_DOWN = 3 :Reactor 准备开始关闭状态。当 Reactor 的 shutdownGracefully 方法被调用的时候,Reactor 的状态就会变为ST_SHUTTING_DOWN。在这个状态下,用户仍然可以向 Reactor 提交任务。
  • ST_SHUTDOWN = 4 :Reactor 停止状态。表示 Reactor 的优雅关闭流程已经结束, 此时用户不能在向 Reactor 提交任务 ,Reactor 会在这个状态下最后一次执行剩余的异步任务。
  • ST_TERMINATED = 5 :Reactor 真正的终结状态,该状态表示 Reactor 已经完全关闭了。在这个状态下 Reactor 会设置自己的 terminationFuture 为 Success。进而开始回调上小节末尾提到的 terminationListener 。

在我们了解了 Reactor 的各种状态之后,下面就该来正式开始介绍 Reactor 的关闭流程了:

 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {  //Reactor的状态  初始为未启动状态
private volatile int state = ST_NOT_STARTED;
  //Reactor的初始状态,未启动
private static final int ST_NOT_STARTED =;
//Reactor启动后的状态
private static final int ST_STARTED =;
//准备正在进行优雅关闭,此时用户仍然可以提交任务,Reactor仍可以执行任务
private static final int ST_SHUTTING_DOWN =;
//Reactor停止状态,表示优雅关闭结束,此时用户不能在提交任务,Reactor最后一次执行剩余的任务
private static final int ST_SHUTDOWN =;
//Reactor中的任务已被全部执行完毕,且不在接受新的任务,真正的终止状态
private static final int ST_TERMINATED =;
 //优雅关闭的静默期
private volatile long gracefulShutdownQuietPeriod;
//优雅关闭超时时间
private volatile long gracefulShutdownTimeout;
 //Reactor的关闭Future
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
 @Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
     ......省略参数校验.......
     //此时Reactor的状态为ST_STARTED
    if (isShuttingDown()) {
        return terminationFuture();
    }
     boolean inEventLoop = inEventLoop();
    boolean wakeup;
    int oldState;
    for (;;) {
        if (isShuttingDown()) {
            return terminationFuture();
        }
        int newState;
        //需要唤醒Reactor去执行关闭流程
        wakeup = true;
        oldState = state;
        if (inEventLoop) {
            newState = ST_SHUTTING_DOWN;
        } else {
            switch (oldState) {
                case ST_NOT_STARTED:
                case ST_STARTED:
                    newState = ST_SHUTTING_DOWN;
                    break;
                default:
                    //Reactor正在关闭或者已经关闭
                    newState = oldState;
                    wakeup = false;
            }
        }
        if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
            break;
        }
    }
    //优雅关闭静默期,在该时间内,用户还是可以向Reactor提交任务并且执行,只要有任务在Reactor中,就不能进行关闭
    //每隔ms检测是否有任务提交进来,如果在静默期内没有新的任务提交,那么才会进行关闭 保证关闭行为的优雅
    gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
    //优雅关闭的最大超时时间,优雅关闭行为不能超过该时间,如果超过的话 不管当前是否还有任务 都要进行关闭
    //保证关闭行为的可控
    gracefulShutdownTimeout = unit.toNanos(timeout);
     //这里需要保证Reactor线程是在运行状态,如果已经停止,那么就不在进行后续关闭行为,直接返回terminationFuture
    if (ensureThreadStarted(oldState)) {
        return terminationFuture;
    }
     //将正在监听IO事件的Reactor从Selector上唤醒,表示要关闭了,开始执行关闭流程
    if (wakeup) {
        //确保Reactor线程在执行完任务之后 不会在selector上停留
        taskQueue.offer(WAKEUP_TASK);
        if (!addTaskWakesUp) {
            //如果此时Reactor正在Selector上阻塞,则可以确保Reactor被及时唤醒
            wakeup(inEventLoop);
        }
    }
     return terminationFuture();
}
 @Override
public Future<?> terminationFuture() {
    return terminationFuture;
}

} 

首先在开启关闭流程之前,需要调用 isShuttingDown() 判断一下当前 Reactor 是否已经开始关闭流程或者已经完成关闭。如果已经开始关闭了,这里会直接返回 Reactor 的 terminationFuture 。

  @Override
public boolean isShuttingDown() {
    return state >= ST_SHUTTING_DOWN;
} 

剩下的逻辑就是不停的在一个 for 循环中通过 CAS 不停的尝试将 Reactor 的当前 ST_STARTED 状态改为 ST_SHUTTING_DOWN 正在关闭状态。

如果通过 inEventLoop() 判断出当前执行线程是 Reactor 线程,那么表示当前 Reactor 的状态只会是 ST_STARTED 运行状态,那么就可以直接将 newState 设置为 ST_SHUTTING_DOWN 。因为只有 Reactor 处于 ST_STARTED 状态的时候才会运行到这里。否则在前边就直接返回 terminationFuture了。

如果当前执行线程为用户线程并不是 Reactor 线程的话,那么此时 Reactor 的状态可能是正在关闭状态或者已经关闭状态,用户线程在重复发起 Reactor 的关闭流程。所以这些异常场景的处理会在 switch(oldState){….} 语句中完成。

     switch (oldState) {
            case ST_NOT_STARTED:
            case ST_STARTED:
                newState = ST_SHUTTING_DOWN;
                break;
            default:
                //Reactor正在关闭或者已经关闭
                newState = oldState;
                //当前Reactor已经处于关闭流程中,则无需在唤醒Reactor了
                wakeup = false;
        } 

如果当前 Reactor 还未发起关闭流程,比如状态为 ST_NOT_STARTED 或者 ST_STARTED ,那么直接可以放心的将 newState 设置为 ST_SHUTTING_DOWN 。

如果当前 Reactor 已经处于关闭流程中或者已经完成关闭,比如状态为 ST_SHUTTING_DOWN ,ST_SHUTDOWN 或者 ST_TERMINATED 。则没有必要在唤醒 Reactor 重复执行关闭流程了 wakeup = false。Reactor 的状态维持当前状态不变。

当 Reactor 的状态确定完毕后,则在 for 循环中不断的通过 CAS 修改 Reactor 的当前状态。此时 oldState = ST_STARTED ,newState = ST_SHUTTING_DOWN 。

    if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
        break;
    } 

随后在 Reactor 中设置我们在《6.1 ReactorGroup 的优雅谢幕》小节开始处介绍的控制 Netty 优雅关闭的两个非常重要的核心参数:

  • gracefulShutdownQuietPeriod :优雅关闭静默期,默认为 2s 。当 Reactor 中已经没有异步任务需要在执行时,该静默期开始触发,Netty 在这里会每隔 100ms 检测一下是否有任务提交进来,如果在静默期内没有新的任务提交,那么才会进行关闭,保证关闭行为的优雅。
  • gracefulShutdownTimeout :优雅关闭超时时间,默认为 15s 。优雅关闭行为不能超过该时间,如果超过的话不管当前是否还有任务都要进行关闭,保证关闭行为的可控。

流程走到这里,Reactor 就开始准备执行关闭流程了,那么在进行关闭操作之前,我们需要确保 Reactor 线程此时应该是运行状态,如果此时 Reactor 线程还未开始运行那么就需要让它运行起来执行关闭操作。

  //这里需要保证Reactor线程是在运行状态,如果已经停止,
//那么就不在进行后续关闭行为,直接返回terminationFuture
if (ensureThreadStarted(oldState)) {
    return terminationFuture;
} 
 
    private boolean ensureThreadStarted(int oldState) { if (oldState == ST_NOT_STARTED) {
    try {
        doStartThread();
    } catch (Throwable cause) {
        STATE_UPDATER.set(this, ST_TERMINATED);
        terminationFuture.tryFailure(cause);
         if (!(cause instanceof Exception)) {
            // Also rethrow as it may be an OOME for example
            PlatformDependent.throwException(cause);
        }
        return true;
    }
}
return false;
    } 

如果此时 Reactor 线程刚刚执行完异步任务或者正在 Selector 上阻塞,那么我们需要确保 Reactor 线程被及时的唤醒,从而可以直接进入关闭流程。wakeup == true。

这里的 addTaskWakesUp 默认为 false 。表示并不是只有 addTask 方法才能唤醒 Reactor 线程 还有其他方法可以唤醒 Reactor 线程,比如 SingleThreadEventExecutor#execute 方法还有本小节介绍的 SingleThreadEventExecutor#shutdownGracefully 方法都会唤醒 Reactor 线程。

关于 addTaskWakesUp 字段的详细含义和作用,大家可以回顾下 《一文聊透 Netty 核心引擎 Reactor 的运转架构》 一文中的《1.2.2 Reactor 开始轮询 IO 就绪事件》小节。

 
     //将正在监听IO事件的Reactor从Selector上唤醒,表示要关闭了,开始执行关闭流程 if (wakeup) {
    //确保Reactor线程在执行完任务之后 不会在selector上停留
    taskQueue.offer(WAKEUP_TASK);
    if (!addTaskWakesUp) {
        //如果此时Reactor正在Selector上阻塞,则可以确保Reactor被及时唤醒
        wakeup(inEventLoop);
    }
} 
  • 通过 taskQueue.offer(WAKEUP_TASK) 向 Reactor 中添加 WAKEUP_TASK,可以确保 Reactor 在执行完异步任务之后不会在 Selector 上做停留,直接执行关闭操作。
  • 如果此时 Reactor 线程正在 Selector 上阻塞,那么直接调用 wakeup(inEventLoop) 唤醒 Reactor 线程,直接来到关闭流程。
 public final class NioEventLoop extends SingleThreadEventLoop {
    @Override
    protected void wakeup(boolean inEventLoop) { if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
    selector.wakeup();
}
    }
} 

6.3 Reactor 线程的优雅关闭

我们先来通过一张 Reactor 优雅关闭整体流程图来从总体上俯撼一下关闭流程:

Reactor线程优雅关闭流程.png

通过 《一文聊透Netty核心引擎Reactor的运转架构》 一文的介绍,我们知道 Reacto r是在一个 for 循环中 996 不停地处理 IO 事件以及执行异步任务。如下面笔者提取的 Reactor 运行框架所示:

 public final class NioEventLoop extends SingleThreadEventLoop {

    @Override
    protected void run() { for (;;) {
    try {
          ........监听Channel上的IO事件.......
          ........处理Channel上的IO事件.......
          ........执行异步任务..........
    } finally {
        try {
            if (isShuttingDown()) {
                //关闭Reactor上注册的所有Channel,停止处理IO事件,触发unActive以及unRegister事件
                closeAll();
                //注销掉所有Channel停止处理IO事件之后,剩下的就需要执行Reactor中剩余的异步任务了
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Error e) {
            throw (Error) e;
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}
    }

} 

在 Reactor 在每次 for 循环的末尾 finally{….} 语句块中都会通过 isShuttingDown() 方法去检查当前 Reactor 的状态是否是关闭状态,如果是关闭状态则开始正式进入 Reactor 的优雅关闭流程。

我们在本文前边《1.2 优雅关闭》小节中在讨论优雅关闭方案的时候提到,我们要着重从以下两个方面来实施优雅关闭:

  1. 首先需要切走程序承担的现有流量。
  2. 保证现有剩余的任务可以执行完毕,保证业务无损。

Netty 这里实现的优雅关闭同样也遵从这两个要点。

  1. 在优雅关闭流程开始之前首先会调用 closeAll() 方法,将 Reactor 上注册的所有 Channel 全部关闭掉,切掉现有流量。
  2. 随后会调用 confirmShutdown() 方法,将剩余的异步任务执行完毕。在该方法中只要有异步任务需要执行,就不能关闭,保证业务无损。该方法返回值为 true 时表示可以进行关闭。返回 false 时表示不能马上关闭。

6.3.1 切走流量

 private void closeAll() {
    //这里的目的是清理selector中的一些无效key
    selectAgain();
    //获取Selector上注册的所有Channel
    Set<SelectionKey> keys = selector.keys();
    Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
    for (SelectionKey k: keys) {
        //获取NioSocketChannel
        Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            channels.add((AbstractNioChannel) a);
        } else {
            .........省略......
        }
    }
     for (AbstractNioChannel ch: channels) {
        //关闭Reactor上注册的所有Channel,并在pipeline中触发unActive事件和unRegister事件
        ch.unsafe().close(ch.unsafe().voidPromise());
    }
} 

首先会通过 selectAgain() 最后一次在 Selector 上执行一次非阻塞轮询操作,目的是清除 Selector 上的一些无效 Key 。

关于无效 Key 的清除,详细细节大家可以回看下 《一文聊透Netty核心引擎Reactor的运转架构》 一文中的《3.1.3 从Selector中移除失效的SelectionKey》小节。

随后通过 selector.keys() 获取在 Selector 上注册的所有 SelectionKey 。进而获取到 Netty 中的 NioSocketChannel 。SelectionKey 与 NioSocketChannel 的对应关系如下图所示:

channel与SelectionKey对应关系.png

最后将注册在 Reactor 上的这些 NioSocketChannel 挨个进行关闭。

Channel 的关闭流程可以回看下笔者的这篇文章 《且看 Netty 如何应对 TCP 连接的正常关闭,异常关闭,半关闭场景》

6.3.2 保证业务无损

该方法中的逻辑是保证 Reactor 进行优雅关闭的核心,Netty 这里为了保证业务无损,采取的是只要有异步任务 Task 或者 ShutdwonHooks 需要执行,就不能关闭,需要等待所有 tasks 或者 ShutdownHooks 执行完毕,才会考虑关闭的事情。

 protected boolean confirmShutdown() {
    if (!isShuttingDown()) {
        return false;
    }
     if (!inEventLoop()) {
        throw new IllegalStateException("must be invoked from an event loop");
    }
     //取消掉所有的定时任务
    cancelScheduledTasks();
     if (gracefulShutdownStartTime ==) {
        //获取优雅关闭开始时间,相对时间
        gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
    }
     //这里判断只要有task任务需要执行就不能关闭
    if (runAllTasks() || runShutdownHooks()) {
        if (isShutdown()) {
            // Executor shut down - no new tasks anymore.
            return true;
        }
         /**
         * gracefulShutdownQuietPeriod表示在这段时间内,用户还是可以继续提交异步任务的,Reactor在这段时间内
         * 是会保证这些任务被执行到的。
         *
         * gracefulShutdownQuietPeriod = 表示 没有这段静默时期,当前Reactor中的任务执行完毕后,无需等待静默期,执行关闭
         * */            if (gracefulShutdownQuietPeriod ==) {
            return true;
        }
        //避免Reactor在Selector上阻塞,因为此时已经不会再去处理IO事件了,专心处理关闭流程
        taskQueue.offer(WAKEUP_TASK);
        return false;
    }
     //此时Reactor中已经没有任务可执行了,是时候考虑关闭的事情了
    final long nanoTime = ScheduledFutureTask.nanoTime();
     //当Reactor中所有的任务执行完毕后,判断是否超过gracefulShutdownTimeout
    //如果超过了 则直接关闭
    if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
        return true;
    }
     //即使现在没有任务也还是不能进行关闭,需要等待一个静默期,在静默期内如果没有新的任务提交,才会进行关闭
    //如果在静默期内还有任务继续提交,那么静默期将会重新开始计算,进入一轮新的静默期检测
    if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
        taskQueue.offer(WAKEUP_TASK);
        try {
            //gracefulShutdownQuietPeriod内每隔ms检测一下 是否有任务需要执行
            Thread.sleep();
        } catch (InterruptedException e) {
            // Ignore
        }
         return false;
    }
     // 在整个gracefulShutdownQuietPeriod期间内没有任务需要执行或者静默期结束 则无需等待gracefulShutdownTimeout超时,直接关闭
    return true;
} 

在关闭流程开始之前,Netty 首先会调用 cancelScheduledTasks() 方法将 Reactor 中剩余需要执行的定时任务全部取消掉。

记录优雅关闭开始时间 gracefulShutdownStartTime ,这是为了后续判断优雅关闭流程是否超时。

调用 runAllTasks() 方法将 Reactor 中 TaskQueue 里剩余的异步任务全部取出执行。

运行剩余tasks和hooks.png

调用 runShutdownHooks() 方法将用户注册在 Reactor 上的 ShutdownHook 取出执行。

我们可以在用户线程中通过如下方式向 Reactor 中注册 ShutdownHooks :

 NioEventLoop reactor = (NioEventLoop) ctx.channel().eventLoop();
reactor.addShutdownHook(new Runnable() {
    @Override
    public void run() {
        .....关闭逻辑....
    }
}); 

在 Reactor 进行关闭的时候,会取出用户注册的这些 ShutdownHooks 进行运行。

 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

   //可以向Reactor添加shutdownHook,当Reactor关闭的时候会被调用
   private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();

   private boolean runShutdownHooks() { boolean ran = false;
while (!shutdownHooks.isEmpty()) {
    List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
    shutdownHooks.clear();
    for (Runnable task: copy) {
        try {
            //Reactor线程挨个顺序同步执行
            task.run();
        } catch (Throwable t) {
            logger.warn("Shutdown hook raised an exception.", t);
        } finally {
            ran = true;
        }
    }
}
 if (ran) {
    lastExecutionTime = ScheduledFutureTask.nanoTime();
}
 return ran;
    }

} 

需要注意的是这里的 ShutdownHooks 是 Netty 提供的一种机制并不是我们在《3. JVM 中的 ShutdownHook》小节中介绍的 JVM 中的 ShutdownHooks 。

JVM 中的 ShutdownHooks 是一个 Thread ,JVM 在关闭之前会 并发无序 地运行。而 Netty 中的 ShutdownHooks 是一个 Runnable ,Reactor 在关闭之前,会由 Reactor 线程 同步有序 地执行。

这里需要注意的是只要有 tasks 和 hooks 需要执行 Netty 就会一直执行下去直到这些任务全部执行完为止。

当 Reactor 没有任何任务需要执行时,这时就会判断当前关闭流程所用时间是否超过了我们前边设定的优雅关闭最大超时时间 gracefulShutdownTimeout 。

 nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout 

如果关闭流程因为前边这些任务的执行导致已经超时,那么就直接关闭 Reactor ,退出 Reactor 的工作循环。

如果没有超时,那么这时就会触发前边介绍的优雅关闭的静默期 gracefulShutdownQuietPeriod 。

在静默期中 Reactor 线程会每隔 100ms 检查一下是否有用户提交任务请求,如果有的话,就需要保证将用户提交的这些任务执行完毕。然后静默期将会重新开始计算,进入一轮新的静默期检测。

如果在整个静默期内,没有任何任务提交,则无需等待 gracefulShutdownTimeout 超时,直接关闭 Reactor ,退出 Reactor 的工作循环。

从以上过程我们可以看出 Netty 的优雅关闭至少需要等待一个静默期的时间。还有一点是 Netty 优雅关闭的时间可能会超出 gracefulShutdownTimeout ,因为 Netty 需要保证遗留剩余的任务被执行完毕。当所有任务执行完毕之后,才会去检测是否超时。

6.4 Reactor 的最终关闭流程

当在静默期内没有任何任务提交或者关闭流程超时时,上小节中介绍的 confirmShutdown() 就会返回 true 。随即 Reactor 线程就会退出工作循环。

 public final class NioEventLoop extends SingleThreadEventLoop {

    @Override
    protected void run() { for (;;) {
    try {
          ........监听Channel上的IO事件.......
          ........处理Channel上的IO事件.......
          ........执行异步任务..........
    } finally {
        try {
            if (isShuttingDown()) {
                //关闭Reactor上注册的所有Channel,停止处理IO事件,触发unActive以及unRegister事件
                closeAll();
                //注销掉所有Channel停止处理IO事件之后,剩下的就需要执行Reactor中剩余的异步任务了
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Error e) {
            throw (Error) e;
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}
    }

} 

我们在 《详细图解 Netty Reactor 启动全流程》 一文中的《1.3.3 Reactor 线程的启动》小节中的介绍中提到,Reactor 线程的启动是通过第一个异步任务被提交到 Reactor 中的时候被触发的。在向 Reactor 提交任务的方法 SingleThreadEventExecutor#execute(java.lang.Runnable, boolean) 中会触发下面 doStartThread() 方法的调用,在这里会调用前边提到的 Reactor 工作循环 run() 方法。

在 doStartThread() 方法的 finally{…} 语句块中会完成 Reactor 的最终关闭流程,也就是 Reactor 在退出 run 方法中的 for 循环之后的后续收尾流程。

最终 Reactor 的优雅关闭完整流程如下图所示:

Reactor优雅关闭全流程.png

 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    private void doStartThread() { assert thread == null;
executor.execute(new Runnable() {
    @Override
    public void run() {
         ..........省略.........
         try {
            //Reactor线程开始轮询处理IO事件,执行异步任务
            SingleThreadEventExecutor.this.run();
            //后面的逻辑为用户调用shutdownGracefully关闭Reactor退出循环 走到这里
            success = true;
        } catch (Throwable t) {
            logger.warn("Unexpected exception from an event executor: ", t);
        } finally {
            //走到这里表示在静默期内已经没有用户在向Reactor提交任务了,或者达到优雅关闭超时时间,开始对Reactor进行关闭
            //如果当前Reactor不是关闭状态则将Reactor的状态设置为ST_SHUTTING_DOWN
            for (;;) {
                int oldState = state;
                if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                        SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                    break;
                }
            }
             try {
                for (;;) {
                    //此时Reactor线程虽然已经退出,而此时Reactor的状态为shuttingdown,但任务队列还在
                    //用户在此时依然可以提交任务,这里是确保用户在最后的这一刻提交的任务可以得到执行。
                    if (confirmShutdown()) {
                        break;
                    }
                }
                 for (;;) {
                    // 当Reactor的状态被更新为SHUTDOWN后,用户提交的任务将会被拒绝
                    int oldState = state;
                    if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                        break;
                    }
                }
                 // 这里Reactor的状态已经变为SHUTDOWN了,不会在接受用户提交的新任务了
                // 但为了防止用户在状态变为SHUTDOWN之前,也就是Reactor在SHUTTINGDOWN的时候 提交了任务
                // 所以此时Reactor中可能还会有任务,需要将剩余的任务执行完毕
                confirmShutdown();
            } finally {
                try {
                    //SHUTDOWN状态下,在将全部的剩余任务执行完毕后,则将Selector关闭
                    cleanup();
                } finally {
                    // 清理Reactor线程中的threadLocal缓存,并通知相应future。
                    FastThreadLocal.removeAll();
                     //ST_TERMINATED状态为Reactor真正的终止状态
                    STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                    
                    //使得awaitTermination方法返回
                    threadLock.countDown();
                     //统计一下当前reactor任务队列中还有多少未执行的任务,打出日志
                    int numUserTasks = drainTasks();
                    if (numUserTasks > && logger.isWarnEnabled()) {
                        logger.warn("An event executor terminated with " +
                                "non-empty task queue (" + numUserTasks + ')');
                    }
                     /**
                     * 通知Reactor的terminationFuture成功,在创建Reactor的时候会向其terminationFuture添加Listener
                     * 在listener中增加terminatedChildren个数,当所有Reactor关闭后 ReactorGroup关闭成功
                     * */                            terminationFuture.setSuccess(null);
                }
            }
        }
    }
});
    }
} 

流程走到 doStartThread 方法中的 finally{…} 语句块中的时候,这个时候表示在优雅关闭的静默期内,已经没有任务继续向 Reactor 提交了。或者关闭耗时已经超过了设定的优雅关闭最大超时时间。

现在正式来到了 Reactor 的关闭流程。在流程开始之前需要确保当前 Reactor 的状态为 ST_SHUTTING_DOWN 正在关闭状态。

注意此刻用户线程依然可以向 Reactor 提交任务。当 Reactor 的状态变为 ST_SHUTDOWN 或者 ST_TERMINATED 时,用户向 Reactor 提交的任务就会被拒绝,但是此时 Reactor 的状态为 ST_SHUTTING_DOWN ,依然可以接受用户提交过来的任务。

 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
  @Override
  public boolean isShutdown() { return state >= ST_SHUTDOWN;
  }

  private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
    startThread();
    //当Reactor的状态为ST_SHUTDOWN时,拒绝用户提交的异步任务,但是在优雅关闭ST_SHUTTING_DOWN状态时还是可以接受用户提交的任务的
    if (isShutdown()) {
        boolean reject = false;
        try {
            if (removeTask(task)) {
                reject = true;
            }
        } catch (UnsupportedOperationException e) {
        }
        if (reject) {
            reject();
        }
    }
}
 .........省略........
    }
} 

所以 Reactor 从工作循环 run 方法中退出随后流程一路走到这里来的这段时间,用户仍然有可能向 Reactor 提交任务,为了确保关闭流程的优雅,这里会在 for 循环中不停的执行 confirmShutdown() 方法直到所有的任务全部执行完毕。

随后会将 Reactor 的状态改为 ST_SHUTDOWN 状态,此时用户就不能在向 Reactor 提交任务了。如果此时在提交任务就会收到 RejectedExecutionException 异常。

大家这里可能会有疑问,Netty 在 Reactor 的状态变为 ST_SHUTDOWN 之后,又一次调用了 confirmShutdown() 方法,这是为什么呢?

其实这样做的目的是为了防止 Reactor 状态在变为 SHUTDOWN 之前,在这个极限的时间里,用户又向 Reactor 提交了任务,所以还需要最后一次调用 confirmShutdown() 将在这个极限时间内提交的任务执行完毕。

以上逻辑步骤就是真正优雅关闭的精髓所在,确保任务全部执行完毕,保证业务无损。

在我们优雅处理流程介绍完了之后,下面就是关闭 Reactor 的流程了:

Reactor 会在 SHUTDOWN 状态下,将 Selector 进行关闭。

 @Override
protected void cleanup() {
    try {
        selector.close();
    } catch (IOException e) {
        logger.warn("Failed to close a selector.", e);
    }
} 

清理 Reactor 线程中遗留的所有 ThreadLocal 缓存。

 FastThreadLocal.removeAll(); 

将 Reactor 的状态由 SHUTDOWN 改为 ST_TERMINATED 状态。 此时 Reactor 就算真正的关闭了

  STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); 

用户线程可能会调用 Reactor 的 awaitTermination 方法阻塞等待 Reactor 的关闭,当 Reactor 关闭之后会调用 threadLock.countDown() 使得用户线程从 awaitTermination 方法返回。

 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {  private final CountDownLatch threadLock = new CountDownLatch();
 @Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    
     ........省略.......
     //等待Reactor关闭
    threadLock.await(timeout, unit);
    return isTerminated();
}
 @Override
public boolean isTerminated() {
    return state == ST_TERMINATED;
}
} 

当这一切处理完毕之后,最后就会设置 Reactor 的 terminationFuture 为 success 。此时注册在 Reactor 的 terminationFuture 上的 listener 就会被回调。

这里还记得我们在 《Reactor 在 Netty 中的实现(创建篇)》 一文中介绍的,在 ReactorGroup 中的所有 Reactor 被挨个全部创建成功之后,会向所有 Reactor 的 terminationFuture 注册一个 terminationListener 。

在 terminationListener 中检测当前 ReactorGroup 中的所有 Reactor 是否全部完成关闭,如果已经全部关闭,则设置 ReactorGroup 的 terminationFuture 为Success。此刻 ReactorGroup 关闭流程结束,Netty 正式优雅谢幕完毕~~

 
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {  //Reactor线程组中的Reactor集合
private final EventExecutor[] children;
//记录关闭的Reactor个数,当Reactor全部关闭后,才可以认为关闭成功
private final AtomicInteger terminatedChildren = new AtomicInteger();
//ReactorGroup关闭future
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
  
    ........挨个创建Reactor........
     final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                //当所有Reactor关闭后 才认为是关闭成功
                terminationFuture.setSuccess(null);
            }
        }
    };
     for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
     ........省略........
}

} 

到现在为止,Netty 的整个优雅关闭流程,笔者就为大家详细介绍完了,下图为整个优雅关闭的完整流程图,大家可以对照下面这副总体流程图在回顾下我们前面介绍的源码逻辑。

Reactor优雅关闭总流程.png

6.5 Reactor 的状态变更流转

在本文的最后,笔者再来带着大家回顾下 Reactor 的状态变更流程。

Reactor的状态变更.png

  • 在 Reactor 被创建出来之后状态为 ST_NOT_STARTED。
  • 随着第一个异步任务的提交 Reactor 开始启动随后状态为 ST_STARTED 。
  • 当调用 shutdownGracefully 方法之后,Reactor 的状态变为 ST_SHUTTING_DOWN 。表示正在进行优雅关闭。此时用户仍可向 Reactor 提交异步任务。
  • 当 Reactor 中遗留的任务全部执行完毕之后,Reactor 的状态变为 ST_SHUTDOWN 。此时如果用户继续向 Reactor 提交异步任务,会被拒绝,并收到 RejectedExecutionException 异常。
  • 当 Selector 完成关闭,并清理掉 Reactor 线程中所有的 TheadLocal 缓存之后,Reactor 的状态变为 ST_TERMINATED 。

总结

到这里关于优雅关闭的前世今生笔者就位大家全部交代完毕了,信息量比较大,需要好好消化一下,很佩服大家能够一口气看到这里。

本文我们从进程优雅启停方案开始聊起,以优雅关闭的实现方案为起点,先是介绍了优雅关闭的底层基石-内核的信号量机制,从内核又聊到了 JVM 的 ShutdownHook 原理以及执行过程,最后通过三个知名的开源框架为案例,分别从 Spring 的优雅关闭机制聊到了 Dubbo 的优雅关闭,最后通过 Dubbo 的优雅关闭引出了 Netty 优雅关闭的详细实现方案,前后呼应。

好了,本文的内容就到这里了,大家辛苦了,相信大家认真看完之后一定会收获很大,我们下篇文章见~~~