Java教程:dubbo源码解析-网络通信(1)

Java
360
0
0
2023-07-23
标签   Dubbo

在之前的内容中,我们讲解了消费者端服务发现与提供者端服务暴露的相关内容,同时也知道消费者端通过内置的负载均衡算法获取合适的调用invoker进行远程调用。那么,本章节重点关注的就是远程调用过程即 网络通信 。

网络通信位于 Remoting 模块:

  • Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上;
  • Remoting 内部再划为 Transport 传输层 Exchange 信息交换层
  • Transport 层只负责单向消息传输,是对 Mina, netty , Grizzly 的抽象,它也可以扩展 UDP 传输;
  • Exchange 层是在传输层之上封装了 Request -Response 语义;

网络通信的问题:

  • 客户端与服务端连通性问题
  • 粘包拆包问题
  • 异步多线程数据一致问题

通信协议

dubbo内置,dubbo协议 ,rmi协议, Hessian 协议,http协议, WebService 协议, Thrift 协议, rest 协议,grpc协议, memcached 协议, Redis 协议等10种 通讯协议 。各个协议特点如下

dubbo协议

Dubbo 缺省协议采用单一长连接和 NIO 异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。

缺省协议,使用基于 mina 1.1.7 和 hessian 3.2.1 的 tbremoting 交互。

  • 连接个数:单连接
  • 连接方式:长连接
  • 传输协议: TCP
  • 传输方式:NIO 异步传输
  • 序列化:Hessian 二进制 序列化
  • 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用 dubbo 协议传输大文件或超大字符串。
  • 适用场景:常规远程服务方法调用

rmi协议

RMI 协议采用 JDK 标准的 Java .rmi.* 实现,采用阻塞式短连接和 JDK 标准序列化方式。

  • 连接个数:多连接
  • 连接方式:短连接
  • 传输协议:TCP
  • 传输方式: 同步传输
  • 序列化 :Java 标准二进制序列化
  • 适用范围:传入传出参数数据包大小混合,消费者与提供者个数差不多,可传文件。
  • 适用场景:常规远程服务方法调用,与原生RMI服务互操作

hessian协议

Hessian 协议用于集成 Hessian 的服务,Hessian 底层采用 Http 通讯,采用 Servlet 暴露服务,Dubbo 缺省内嵌 Jetty 作为服务器实现。

Dubbo 的 Hessian 协议可以和原生 Hessian 服务互操作,即:

  • 提供者用 Dubbo 的 Hessian 协议暴露服务,消费者直接用标准 Hessian 接口调用
  • 或者提供方用标准 Hessian 暴露服务,消费方用 Dubbo 的 Hessian 协议调用。
  • 连接个数:多连接
  • 连接方式:短连接
  • 传输协议:HTTP
  • 传输方式:同步传输
  • 序列化:Hessian二进制序列化
  • 适用范围:传入传出参数数据包较大,提供者比消费者个数多,提供者压力较大,可传文件。
  • 适用场景:页面传输,文件传输,或与原生hessian服务互操作

http协议

基于 HTTP 表单的远程调用协议,采用 Spring 的 HttpInvoker 实现

  • 连接个数:多连接
  • 连接方式:短连接
  • 传输协议:HTTP
  • 传输方式:同步传输
  • 序列化:表单序列化
  • 适用范围:传入传出参数数据包大小混合,提供者比消费者个数多,可用浏览器查看,可用表单或URL传入参数,暂不支持传文件。
  • 适用场景:需同时给应用程序和浏览器 JS 使用的服务。

webservice协议

基于 WebService 的远程调用协议,基于 Apache CXF 实现](#fn2)。

可以和原生 WebService 服务互操作,即:

  • 提供者用 Dubbo 的 WebService 协议暴露服务,消费者直接用标准 WebService 接口调用,
  • 或者提供方用标准 WebService 暴露服务,消费方用 Dubbo 的 WebService 协议调用。
  • 连接个数:多连接
  • 连接方式:短连接
  • 传输协议:HTTP
  • 传输方式:同步传输
  • 序列化: SOAP 文本序列化(http + xml)
  • 适用场景:系统集成,跨语言调用

thrift协议

当前 dubbo 支持 [1]的 thrift 协议是对 thrift 原生协议 [2] 的扩展,在原生协议的基础上添加了一些额外的头信息,比如 service name,magic number 等。

rest协议

基于标准的Java REST API——JAX-RS 2.0(Java API for RESTful Web Services的简写)实现的REST调用支持

g Rpc 协议

Dubbo 自 2.7.5 版本开始支持 gRPC 协议,对于计划使用 HTTP/2 通信,或者想利用 gRPC 带来的 Stream、反压、Reactive 编程等能力的开发者来说, 都可以考虑启用 g RPC 协议。

  • 为期望使用 gRPC 协议的用户带来服务治理能力,方便接入 Dubbo 体系
  • 用户可以使用 Dubbo 风格的,基于接口的编程风格来定义和使用远程服务

memcached协议

基于 memcached实现的 RPC 协议

redis协议

基于 Redis 实现的 RPC 协议

序列化

序列化就是将对象转成字节流,用于网络传输,以及将字节流转为对象,用于在收到字节流数据后还原成对象。序列化的优势有很多,例如安全性更好、可跨平台等。我们知道dubbo基于netty进行网络通讯,在 NettyClient.doOpen() 方法中可以看到Netty的相关类

 bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() {
        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("decoder", adapter.get decode r());
        pipeline.addLast("encoder", adapter.getEncoder());
        pipeline.addLast("handler", nettyHandler);
        return pipeline;
    }
}); 

然后去看NettyCodecAdapter 类最后进入ExchangeCodec类的encodeRequest方法,如下:

  protected  void  encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IO Exception  {
         serialization  serialization = getSerialization(channel);
        // header.
         byte [] header = new byte[HEADER_LENGTH]; 

是的,就是Serialization接口,默认是Hessian2Serialization序列化接口。

Java教程:dubbo源码解析-网络通信(1)

Dubbo序列化支持java、compactedjava、nativejava、 fastjson 、dubbo、fst、hessian2、kryo,protostuff其中默认hessian2。其中java、compactedjava、nativejava属于原生java的序列化。

  • dubbo序列化: 阿里 尚未开发成熟的高效java序列化实现,阿里不建议在生产环境使用它。
  • hessian2序列化:hessian是一种跨语言的高效二进制序列化方式。但这里实际不是原生的hessian2序列化,而是阿里修改过的,它是dubbo RPC默认启用的序列化方式。
  • json 序列化:目前有两种实现,一种是采用的阿里的fastjson库,另一种是采用dubbo中自己实现的简单json库,但其实现都不是特别成熟,而且json这种文本序列化性能一般不如上面两种二进制序列化。
  • java序列化:主要是采用JDK自带的Java序列化实现,性能很不理想。

最近几年,各种新的高效序列化方式层出不穷,不断刷新序列化性能的上限,最典型的包括:

  • 专门针对Java语言的:Kryo,FST等等
  • 跨语言的:Protostuff, ProtoBuf ,Thrift,Avro,MsgPack等等

这些序列化方式的性能多数都显著优于 hessian2 (甚至包括尚未成熟的dubbo序列化)。所以我们可以为 dubbo 引入 Kryo 和 FST 这两种高效 Java 来优化 dubbo 的序列化。

使用Kryo和FST非常简单,只需要在dubbo RPC的XML配置中添加一个属性即可:

 <dubbo:protocol name="dubbo" serialization="kryo"/> 

网络通信

dubbo中数据格式

解决 socket 中数据粘包拆包问题,一般有三种方式

  • 定长协议(数据包长度一致)
  • 定长的协议是指协议内容的长度是固定的,比如协议byte长度是50,当从网络上读取50个byte后,就进行decode解码操作。定长协议在读取或者写入时,效率比较高,因为数据缓存的大小基本都确定了,就好比数组一样,缺陷就是适应性不足,以RPC场景为例,很难估计出定长的长度是多少。
  • 特殊结束符(数据尾:通过特殊的字符标识#)
  • 相比定长协议,如果能够定义一个特殊字符作为每个协议单元结束的标示,就能够以变长的方式进行通信,从而在数据传输和高效之间取得平衡,比如用特殊字符 n 。特殊结束符方式的问题是过于简单的思考了协议传输的过程,对于一个协议单元必须要全部读入才能够进行处理,除此之外必须要防止用户传输的数据不能同结束符相同,否则就会出现紊乱。
  • 变长协议(协议头+payload模式)
  • 这种一般是自定义协议,会以定长加不定长的部分组成,其中定长的部分需要描述不定长的内容长度。
  • dubbo就是使用这种形式的数据传输格式

Dubbo 框架定义了私有的RPC协议,其中请求和响应协议的具体内容我们使用表格来展示。

Java教程:dubbo源码解析-网络通信(1)

Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如 魔数 (Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。

偏移量 (Bit)


字段


取值


0 ~ 7


魔数高位


0xda00


8 ~ 15


魔数低位


0xbb


16


数据包类型


0 – Response, 1 – Request


17


调用方式


仅在第16位被设为1的情况下有效,0 – 单向调用,1 – 双向调用


18


事件标识


0 – 当前数据包是请求或响应包,1 – 当前数据包是 心跳包


19 ~ 23


序列化器编号


2 – Hessian2Serialization 3 – JavaSerialization 4 – CompactedJavaSerialization 6 – FastJsonSerialization 7 – NativeJavaSerialization 8 – KryoSerialization 9 – FstSerialization


24 ~ 31


状态


20 – OK 30 – CLIENT_TIMEOUT 31 – SERVER_TIMEOUT 40 – BAD_REQUEST 50 – BAD_RESPONSE ……


32 ~ 95


请求编号


共8字节,运行时生成


96 ~ 127


消息体长度


运行时计算


消费方发送请求

(1)发送请求

为了便于大家阅读代码,这里以 DemoService 为例,将 sayHello 方法的整个调用路径贴出来。

 proxy#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —>  mock ClusterInvoker#invoke(Invocation)
      —>  abstract ClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多个 Filter 调用
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object,  Boolean )
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object) 

dubbo消费方,自动生成代码对象如下

 public class proxy implements ClassGenerator.DC, EchoService, DemoService {

     private  InvocationHandler handler;

    public String sayHello(String string) {
        // 将参数存储到 Object 数组中
        Object[] arrobject = new Object[]{string};
        // 调用 InvocationHandler 实现类的 invoke 方法得到调用结果
        Object object = this.handler.invoke(this, methods[], arrobject);
        // 返回调用结果
        return (String)object;
    }
} 

InvokerInvocationHandler 中的 invoker 成员变量类型为 MockClusterInvoker,MockClusterInvoker 内部封装了服务降级逻辑。下面简单看一下:

 public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    // 获取 mock 配置值
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    if (value.length() == || value.equalsIgnoreCase("false")) {
         // 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法,
        // 比如 FailoverClusterInvoker
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        // force:xxx 直接执行 mock 逻辑,不发起远程调用
        result = doMockInvoke(invocation, null);
    } else {
         // fail:xxx 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常
        try {
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
             // 调用失败,执行 mock 逻辑
            result = doMockInvoke(invocation, e);
        }
    }
    return result;
} 

考虑到前文已经详细分析过 FailoverClusterInvoker,因此本节略过 FailoverClusterInvoker,直接分析 DubboInvoker。

 public abstract class AbstractInvoker<T> implements Invoker<T> {
    
    public Result invoke(Invocation inv) throws RpcException {
        if (destroyed.get()) {
            throw new RpcException("Rpc invoker for service ...");
        }
        RpcInvocation invocation = (RpcInvocation) inv;
        // 设置 Invoker
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() >) {
            // 设置 attachment
            invocation.addAttachmentsIfAbsent(attachment);
        }
         Map <String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() !=) {
            // 添加 contextAttachments 到 RpcInvocation#attachment 变量中
            invocation.addAttachments(contextAttachments);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            // 设置异步信息到 RpcInvocation#attachment 中
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            // 抽象方法,由子类实现
            return doInvoke(invocation);
        } catch (InvocationTargetException e) {
            // ...
        } catch (RpcException e) {
            // ...
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }

    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
    
    // 省略其他方法
} 

上面的代码来自 AbstractInvoker 类,其中大部分代码用于添加信息到 RpcInvocation#attachment 变量中,添加完毕后,调用 doInvoke 执行后续的调用。doInvoke 是一个抽象方法,需要由子类实现,下面到 DubboInvoker 中看一下。

   @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        //将目标方法以及版本好作为参数放入到Invocation中
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        //获得客户端连接
        ExchangeClient currentClient; //初始化invoker的时候,构建的一个远程通信连接
        if (clients.length ==) { //默认
            currentClient = clients[];
        } else {
            //通过取模获得其中一个连接
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            //表示当前的方法是否存在返回值
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            //isOneway 为 true,表示“单向”通信
            if (isOneway) {//异步无返回值
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else { //存在返回值
                //是否采用异步
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                responseFuture.whenComplete((obj, t) -> {
                    if (t != null) {
                        asyncRpcResult.completeExceptionally(t);
                    } else {
                        asyncRpcResult.complete((AppResponse) obj);
                    }
                });
                RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
                return asyncRpcResult;
            }
        } 
        //省略无关代码
    } 

最终进入到HeaderExchangeChannel#request方法,拼装Request并将请求发送出去

 public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed tosend request " + request + ", cause: The channel " + this + " is closed!");
        }
        // 创建请求对象
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
        try {
            //NettyClient
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    } 

(2)请求编码

在netty启动时,我们设置了编解码器,其中通过ExchangeCodec完成编解码工作如下:

 public class ExchangeCodec extends TelnetCodec {

    // 消息头长度
    protected static final int HEADER_LENGTH =;
    // 魔数内容
    protected static final short MAGIC = (short)xdabb;
    protected static final byte MAGIC_HIGH = Bytes.shortbytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.shortbytes(MAGIC)[1];
    protected static final byte FLAG_REQUEST = (byte)x80;
    protected static final byte FLAG_TWOWAY = (byte)x40;
    protected static final byte FLAG_EVENT = (byte)x20;
    protected static final int SERIALIZATION_MASK =x1f;
    private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);

    public Short getMagicCode() {
        return MAGIC;
    }

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            // 对 Request 对象进行编码
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 对 Response 对象进行编码,后面分析
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);

        // 创建消息头字节数组,长度为
        byte[] header = new byte[HEADER_LENGTH];

        // 设置魔数
        Bytes.shortbytes(MAGIC, header);

        // 设置数据包类型(Request/Response)和序列化器编号
        header[] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        // 设置通信方式(单向/双向)
        if (req.isTwoWay()) {
            header[] |= FLAG_TWOWAY;
        }
        
        // 设置事件标识
        if (req.isEvent()) {
            header[] |= FLAG_EVENT;
        }

        // 设置请求编号,个字节,从第4个字节开始设置
        Bytes.longbytes(req.getId(), header, 4);

        // 获取 buffer 当前的写位置
        int savedWriteIndex = buffer.writerIndex();
        // 更新 writerIndex,为消息头预留 个字节的空间
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // 创建序列化器,比如 HessianObjectOutput
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            // 对事件数据进行序列化操作
            encodeEventData(channel, out, req.getData());
        } else {
            // 对请求数据进行序列化操作
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        
        // 获取写入的字节数,也就是消息体长度
        int len = bos.writtenBytes();
        checkPayload(channel, len);

        // 将消息体长度写入到消息头中
        Bytes.intbytes(len, header, 12);

        // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
        buffer.writerIndex(savedWriteIndex);
        // 从 savedWriteIndex 下标处写入消息头
        buffer.writeBytes(header);
        // 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }
    
    // 省略其他方法
} 

以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header 数组中。然后对 Request 对象的 data 字段执行序列化操作,序列化后的数据最终会存储到 ChannelBuffer 中。序列化操作执行完后,可得到数据序列化后的长度 len,紧接着将 len 写入到 header 指定位置处。最后再将消息头字节数组 header 写入到 ChannelBuffer 中,整个编码过程就结束了。本节的最后,我们再来看一下 Request 对象的 data 字段序列化过程,也就是 encodeRequestData 方法的逻辑,如下:

 public class DubboCodec extends ExchangeCodec implements Codec {
    
	protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;

        // 依次序列化 dubbo version、path、version
        out.writeUTF(version);
        out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
        out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

        // 序列化调用方法名
        out.writeUTF(inv.getMethodName());
        // 将参数类型转换为字符串,并进行序列化
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null)
            for (int i =; i < args.length; i++) {
                // 对运行时参数进行序列化
                out.writeObject(encodeInvocationArgument(channel, inv, i));
            }
        
        // 序列化 attachments
        out.writeObject(inv.getAttachments());
    }
} 

至此,关于服务消费方发送请求的过程就分析完了,接下来我们来看一下服务提供方是如何接收请求的。