Dubbo-聊聊通信模块设计( 二 )


TransporterTransporter接口是Dubbo在Client和Server上又封装的一层,我们可以看到改接口被@SPI以及@Adaptive注解修饰,因此这个是个可扩展的接口,默认使用Netty的扩展,@Adaptive表示可以动态生成该适配的类,根据设置的值确定具体实现的类 。
@SPI("netty")public interface Transporter {    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})    RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})    Client connect(URL url, ChannelHandler handler) throws RemotingException;}Transporter的实现类有主要有以下几种,每个对应的具体的NIO的实现都在其各自的包中,这样可以通过灵活配置来进行切换不同的实现 。为了验证是否正确,我们简单再来看一下RemotingServer的实现,RemotingServer的实现中,包含每个具体NIO框架的实现,因此这里更加印证Transporter的的抽象,让我们可以通过Dubbo SPI修改具体Transporter扩展实现,从而切换到不同的Client和 RemotingServer实现,从而达到NIO库切换,这里我们无需修改任何代码,真正的做到开放-闭合的原则 。
Transporters
Transporters该类是一种门面模式的设计,主要是解决和多个不同子模块直接进行交互的问题,通过该类设计,将公共的行为Transporter对象的创建以及ChannelHandler的处理,大家可以直接依赖Transporters类,这部分调用是在Dubbo协议初始化时候发起的,这部分我们到时候在细讲,这个章节暂时先不讲解 。但是这里需要在这个看一下关于ChannelHandler的处理,此处传入了多个ChannelHandler,将多个ChannelHandler包装成为ChannelHandlerDispatcher,ChannelHandlerDispatcher实现ChannelHandler,内部维护了一个 CopyOnWriteArraySet,对外提供操作ChannelHandler方法,此处主要是为了引出后续Handler的处理流程,后续一层处理模型的源头都在这里 。
到这里我们大概对Dubbo的通讯模型有了一个轮廓,我们来进行一个简单的总结,可以参考下图:

  1. 上层通过会Transporters获取到具体的Transporter扩展实现,然后通过Transporter获取Client和 RemotingServer实现;
  2. Client与RemotingServer都是通过Channel进行交互,Channel使用ChannelHandler进行数据传输,此外通过Codec2进行编解码;
Buffer设计
image.png
接口设计ChannelBuffer的设计类似于Netty的Buffer的设计,大致可以分为五类,对于具体的实现我们在后面AbstractChannelBuffer等实现类里面进行讲解 。接下来我们来看一下ChannelBufferFactory,该接口都是用来创建ChannelBuffer的,并且每个具体的实现都是单例的,可以理解为一个简单工厂的设计,可以有不同类型的ChannelBuffer的实现 。
AbstractChannelBufferAbstractChannelBuffer维护两类索引,一类用于读写,另外一类用于读写标记;关于读写类索引就是记录当前读到什么位置以及写到什么位置了,标记类索引就是为了做数据备份和回滚使用,为了对缓冲区重复利用 。该类的方法都主要是利用四个属性来操作,用来检测是否有数据可读或者还是否有空间可写等方法,做一些前置条件的校验以及索引的设置,具体的实现都是需要子类来实现 。
    @Override    public void readBytes(byte[] dst, int dstIndex, int length) {        //检查位置是否足够        checkReadableBytes(length);        //此处可以理解为将readerIndex后移length个字节读取到dst数组中        //也就是数组dst的dstIndex~dstIndex+length位置        getBytes(readerIndex, dst, dstIndex, length);        //readerIndex后移length个字节        readerIndex += length;    }    @Override    public void readBytes(byte[] dst, int dstIndex, int length) {        //检查位置是否足够        checkReadableBytes(length);        //此处可以理解为将readerIndex后移length个字节读取到dst数组中        //也就是数组dst的dstIndex~dstIndex+length位置        getBytes(readerIndex, dst, dstIndex, length);        //readerIndex后移length个字节        readerIndex += length;    }    @Override    public void writeBytes(byte[] src, int srcIndex, int length) {        //将src数组中srcIndex~srcIndex+length位置的数据写到当前的buffer中        setBytes(writerIndex, src, srcIndex, length);        //将当前的writerIndex后移length        writerIndex += length;    }

经验总结扩展阅读