高性能进阶之路——Java异步调用实现原理详解

Java
429
0
0
2023-06-14

接下来进入到大家比较喜欢的 高性能系列 ,主题内容包括, 消息队列 缓存 分布式部署架构 等,在上一篇文章- # 秒杀系统架构图该怎么画?手把手教你! ,讲解了博主 凄惨 的经历,因此在学习相关技术的时候,我们要将其运用到我们实际的项目中,在 高性能篇 结束后,将进入 架构图2.0版本

什么是异步

同步调用:调用方在调用过程中,持续等待返回结果。

异步调用 :调用方在调用过程中,不直接等待返回结果,而是执行其他任务,结果返回形式通常为回调函数。

脱离 IO ,单独讨论 同步 异步 ,我们更加容易去理解它的原理,同步和异步其实属于一种 通信机制 ,表达的是,我们在通信过程中,是主动去询问,还是等对方主动反馈。体现在同步 (主动) 和异步 (被动) 上。

进程内异步调用

1、Thread

Java进程 内最简单的异步调用方式,就是通过 new Thread().start() 的方式,启动新的线程进行任务的执行( CPU调度 )。

 public static void main(String[] args) {
    System.out.println("煲水");
    //创建新的线程
    Thread  thread 1= new  Thread (()->{
        try {
            Thread.sleep();
            System.out.println("水开了,"+Thread.currentThread().getName());
        }catch (Exception e){
            e.printStackTrace();
        }
    });
    thread.start();
    System.out.println("运动");
}

1.1、start() 和 run()

在上述实例代码中,我们虽然采用了实现 Runnable 接口的方式,进行新线程的实现,但是在方法启动时,并没有使用 run() 方法,而是使用了 start() 方法。

run():使用当前线程执行 run()方法调用,可以理解时同步调用

start() 方法在调用时,在代码逻辑中,会调用到一个本地方法 start0

下载 JDK源码 后,可以看到 Thread 类 有个 registerNatives 本地方法,该方法主要的作用就是注册一些本地方法供 Thread 类使用,如 start0(),stop0() 等等,可以说,所有操作本地线程的本地方法都是由它注册的。

可以看出 Java 线程 调用 start->start0 的方法,实际上会调用到 jvm _StartThread 方法,通过调用 new JavaThread(&thread_entry,sz) 完成线程的创建。

jvm.cpp 中,有如下代码段:

在创建完线程后,通过 thread_entry 完成 run() 方法的调用

1.2、Future

Future 的调用方式,属于 同步非阻塞 , 主要原因在于,在获取异步线程处理结果时,需要主线程主动去获取,异步线程并没有通过 主动通知 的方式,将数据结构进行 更新 回调

 public static void main(String[] args) throws Exception {
    System.out.println("煲水");
    FutureTask<String> futureTask = new FutureTask(()->{
        try {
            Thread.sleep();
            System.out.println("水开了,"+Thread.currentThread().getName());
        }catch (Exception e){
            e.printStackTrace();
        }
        return "water";
    });
    //创建新的线程
    Thread thread= new Thread(futureTask);
    thread.start();
    System.out.println("运动");
    Thread.sleep();
    //阻塞等待数据
    String result= futureTask.get(, TimeUnit.SECONDS);
    System.out.println("喝水," + result);
}
Future 的实现原理

类的继承关系图如下,可以看到 FutureTask ,实现了 Runnable 接口,那么在重写的 run() 方法中,可以看到,在调用 call() 方法获取到结果后,通过 CAS 的方式,更新到 成员变量 中。

任务调用结果更新:

1.3、ThreadPoolExecutor

 public static void main(String[] args) throws Exception {
    ExecutorService executors = Executors.newFixedThreadPool();
    System.out.println("煲水");
    Future<String> future = executors.submit(() -> {
        try {
            Thread.sleep();
            System.out.println("水开了," + Thread.currentThread().getName());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "water";
    });
    System.out.println("运动");

    String result = future.get(, TimeUnit.SECONDS);
    System.out.println("喝水," + result);
}

上面讲解了 FutureTask 的实现原理后,这里再对比 submit() execute() ,就比较容易理解了,在 submit() 方法中,将 Callable<T> 实现类,封装成了 FutureTask , 然后再进行实际的调用:

高性能进阶之路——Java异步调用实现原理详解

高性能进阶之路——Java异步调用实现原理详解

1.4、总结

核心区别在于 start() run() , start()是启动一条 新的线程 的同时,完成 run() 方法,这时候是一个 异步操作 ;如果直接执行 run() 方法, 则会在 当前线程 直接执行,是一个 同步阻塞操作

Future 的调用方式,则是一个 同步非阻塞 处理,在提交了 任务 后,不阻塞主线程的继续执行,在到了一定时间后,主线程可以通过 get() 方法,获取 异步任务 处理结果。

ThreadPoolExecutor 则是维护了一个可复用的线程池,解决了 资源复用 性能耗时 的问题, Java线程 默认大小为 1MB ,线程的 创建 销毁 都会占用内存和GC耗时;而线程的无限制创建, 则会带来 CPU负载过高 ,每个线程分配的时间都很少,导致处理效率低。

2、EventBus

 public class JiulingTest {
    public static void main(String[] args) throws Exception {
        System.out.println("开始");
        //使用异步事件总线
        EventBus eventBus  = new AsyncEventBus(Executors.newFixedThreadPool());
        // 向上述EventBus对象中注册一个监听对象
        eventBus.register(new EventListener());
        // 使用EventBus发布一个事件,该事件会给通知到所有注册的监听者
        eventBus.post(new Event("煲水"));
        System.out.println("运动");
    }
}
// 事件,监听者监听的事件的包装对象
class Event {
    //事件动作
    public String action;

    Event(String action) {
        this.action = action;
    }
}

// 监听者
 class EventListener {
    // 监听的方法,必须使用注解声明,且只能有一个参数,实际触发一个事件的时候会根据参数类型触发方法
    @Subscribe
    public void listen(Event event) {
        try {
            System.out.println("Event listener receive message:  " + event.action + " threadName:" + Thread.currentThread().getName());
            Thread.sleep();
            System.out.println("水开了!");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

2.1、 观察者模式

EventBus 中,通过 @Subscribe 定义了抽象观察者的行为, 通过 入口 区分不同的事件 监听动作 ,如上述的示例代码中, listen(Event event) 只会观察这个类的事件。

 /**
 * Returns all subscribers for the given listener grouped by the type of event they subscribe to.
 */ private  Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
  Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
  Class<?> clazz = listener.getClass();
  //遍历 @Subscribe 的方法
  for (Method method : getAnnotatedMethods(clazz)) {
    Class<?>[] parameterTypes = method.getParameterTypes();
    Class<?> eventType = parameterTypes[];
    //然后根据 参数类型,也就是事件类型,进行归类
    methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
  }
  return methodsInListener;
}

然后在进行事件发布的时候,通过调用 EventBus.post() 方法,便利找到所有的监听方法:

 public void post(Object event) {
//从上述归类的Map 中,找到所有的观察者方法
  Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
  if (eventSubscribers.hasNext()) {
  //事件分发,具体调用
    dispatcher.dispatch(event, eventSubscribers);
  } else if (!(event instanceof DeadEvent)) {
    // the event had no subscribers and was not itself a DeadEvent
    post(new DeadEvent(this, event));
  }
}

2.2、AsyncEventBus

在示例代码中,我们使用的是 new AsyncEventBus(Executors.newFixedThreadPool(10)) 构建的异步事件总线。

由下往上倒推,我们先看 Listern ,是如何执行事件处理方法的,这里比较好 理解 ,通过 线程池 完成任务的调用,具体实现是 通过反射的方式调用 @Subscribe 注解的方法。

那么这里的 executor 是怎么来的呢?

this.executor = bus.executor(); //从事件总线传递过来

回到 EventBus 中,我们可以看到 构造函数 并没有提供初始化线程池的入口,那么默认线程池的创建,可以跟踪到

这个线程池的 execute 方法,并没有创建新的线程执行 Runnable 方法 ,而是使用 当前线程 执行(具体逻辑参考 1.1 )。 因此 EventBus 是不支持异步事件处理的!

dispatchEvent 方法中,比较直接可以看到整体设计中,是支持异步事件的,我们需要做的就是将 Executor 修改成一个合理的线程池, 而 AsyncEventBus 恰恰提供了这个能力。

 /**
 * Creates a new AsyncEventBus that will use {@code executor} to dispatch events.
 *
 * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
 *     down the executor after the last event has been posted to this event bus.
 */public AsyncEventBus(Executor executor) {
  super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}

3、Spring Event

Spring Event Event Bus 默认都是同步执行,支持通过设置 Executors 的方式修改成异步事件。

高性能进阶之路——Java异步调用实现原理详解

核心组件:

  • 事件类:定义事件,继承 ApplicationEvent 的类成为一个事件类。
  • 发布者:发布事件,通过 ApplicationEventPublisher 发布事件。
  • 监听者:监听并处理事件,实现 ApplicationListener 接口或者使用 @EventListener 注解。

由于代码过多,可以直接 github 下载 进行阅读,这里只贴部分关键代码:

在发布事件方法: AbstractApplicationContext#publishEvent

会走到 下图中的 SimpleApplicationEventMulticaster#multicastEvent 执行具体任务的调度。 这里的设计与 上面的 EventBus 如出一辙,在执行时,通过区分线程池进行实际的调度,从而决定 同步|异步 !

3.1、异步之ApplicationEventMulticaster

修改 ApplicationEventMulticaster 设置初始线程池, 和 EventBus 的解决思路一致:

 @Order()
@ bean 
public ApplicationEventMulticaster applicationEventMulticaster() {
    SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
    eventMulticaster.setTaskExecutor(Executors.newFixedThreadPool());
    return eventMulticaster;
}

在Spring 上下文初始化的时候,会将这一个bean,加载到上下文中,

高性能进阶之路——Java异步调用实现原理详解

存在的问题: 由于将整个上下文的 ApplicationEventMulticaster 都替换了,那么在事件处理的流程上,所有的事件都会以异步的方式进行,那么风险的把控就很难做好。不建议,但能用( 毕竟经受过考验

3.2、异步之@Async

通过实现 AsyncConfigurer 接口,自定义线程池,对切面方法,执行反射代理

高性能进阶之路——Java异步调用实现原理详解

高性能进阶之路——Java异步调用实现原理详解

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

核心原理

高性能进阶之路——Java异步调用实现原理详解

进程间异步调用

Dubbo 异步调用

rpc 框架中,我们普遍使用的都是 同步调用 模式,但是在 Dubbo 的底层实现中,反而是以 异步调用 的方式实现的。先来简单看看 调用链路

高性能进阶之路——Java异步调用实现原理详解

核心代码在

com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke

消息队列异步 解耦

在介绍 EventBus 的时候, 我查看了很多文章,都将 EventBus 设计模式 描述为 发布-订阅 模式。首先这个描述是错误的,然后我们来对比一下他们的区别:

从表面上看:

  • 观察者模式里,只有两个角色 —— 观察者 + 被观察者
  • 而发布订阅模式里,却不仅仅只有发布者和订阅者两个角色,还有一个经常被我们忽略的 —— 经纪人 Broker

往更深层次讲:

  • 观察者和被观察者,是松耦合的关系
  • 发布者和订阅者,则完全不存在耦合

发布-订阅 模式:

消息队列 能够帮我们做到 解耦 的效果,通过消息中间件,如 RocketMQ kafka rabbitMQ 等; 完成消息的接收和推送,从而达到 异步处理 的效果。