netty源码解析
① Netty 源码解析 ——— ChannelConfig 和 Attribute
嗯,本文与其说是ChannelConfig、Attribute源码解析,不如说是对ChannelConfig以及Attribute结构层次的分析。因为这才是它们在Netty中使用到的重要之处。
在 Netty 源码解析 ——— 服务端启动流程 (下) 中说过,当我们在构建NioServerSocketChannel的时候同时会构建一个NioServerSocketChannelConfig对象赋值给NioServerSocketChannel的成员变量config。
而这一个NioServerSocketChannelConfig是当前NioServerSocketChannel配置属性的集合。NioServerSocketChannelConfig主要用于对NioServerSocketChannel相关配置的设置(如,网络的相关参数配置),比如,配置Channel是否为非阻塞、配置连接超时时间等等。
NioServerSocketChannelConfig其实是一个ChannelConfig实例。ChannelConfig表示为一个Channel相关的配置属性的集合。所以NioServerSocketChannelConfig就是针对于NioServerSocketChannel的配置属性的集合。
ChannelConfig是Channel所需的公共配置属性的集合,如,setAllocator(设置用于channel分配buffer的分配器)。而不同类型的网络传输对应的Channel有它们自己特有的配置,因此可以通过扩展ChannelConfig来补充特有的配置,如,ServerSocketChannelConfig是针对基于TCP连接的服务端ServerSocketChannel相关配置属性的集合,它补充了针对TCP服务端所需的特有配置的设置setBacklog、setReuseAddress、setReceiveBufferSize。
DefaultChannelConfig作为ChannelConfig的默认实现,对ChannelConfig中的配置提供了默认值。
接下来,我们来看一个设置ChannelConfig的流程:
serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);
我们可以在启动服务端前通过ServerBootstrap来进行相关配置的设置,该选项配置会在Channel初始化时被获取并设置到Channel中,最终会调用底层ServerSocket.setReuseAddress方法来完成配置的设置。
ServerBootstrap的init()方法:
首先对option和value进行校验,其实就是进行非空校验。
然后判断对应的是哪个常量属性,并进行相应属性的设置。如果传进来的ChannelOption不是已经设定好的常量属性,则会打印一条警告级别的日志,告知这是未知的channel option。
Netty提供ChannelOption的一个主要的功能就是让特定的变量的值给类型化。因为从’ChannelOption<T> option’和’T value’可以看出,我们属性的值类型T,是取决于ChannelOption的泛型的,也就属性值类型是由属性来决定的。
这里,我们可以看到有个ChannelOption类,它允许以类型安全的方式去配置一个ChannelConfig。支持哪一种ChannelOption取决于ChannelConfig的实际的实现并且也可能取决于它所属的传输层的本质。
可见ChannelOption是一个Consant扩展类,Consant是Netty提供的一个单例类,它能安全去通过’==’来进行比较操作。通过ConstantPool进行管理和创建。
常量由一个id和name组成。id:表示分配给常量的唯一数字;name:表示常量的名字。
如上所说,Constant是由ConstantPool来进行管理和创建的,那么ConstantPool又是个什么样的类了?
首先从constants中get这个name对应的常量,如果不存在则调用newConstant()来构建这个常量tempConstant,然后在调用constants.putIfAbsent方法来实现“如果该name没有存在对应的常量,则插入,否则返回该name所对应的常量。(这整个的过程都是原子性的)”,因此我们是根据putIfAbsent方法的返回来判断该name对应的常量是否已经存在于constants中的。如果返回为null,则说明当前创建的tempConstant就为name所对应的常量;否则,将putIfAbsent返回的name已经对应的常量值返回。(注意,因为ConcurrentHashMap不会允许value为null的情况,所以我们可以根据putIfAbsent返回为null则代表该name在此之前并未有对应的常量值)
正如我们前面所说的,这个ConstantPool<ChannelOption<Object>> pool(即,ChannelOption常量池)是ChannelOption的一个私有静态成员属性,用于管理和创建ChannelOption。
这些定义好的ChannelOption常量都已经存储数到ChannelOption的常量池(ConstantPool)中了。
注意,ChannelOption本身并不维护选项值的信息,它只是维护选项名字本身。比如,“public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");”👈这只是维护了“SO_RCVBUF”这个选项名字的信息,同时泛型表示选择值类型,即“SO_RCVBUF”选项值为Integer。
好了,到目前为止,我们对Netty的ChannelOption的设置以及底层的实现已经分析完了,简单的来说:Netty在初始化Channel时会构建一个ChannelConfig对象,而ChannelConfig是Channel配置属性的集合。比如,Netty在初始化NioServerSocketChannel的时候同时会构建一个NioServerSocketChannelConfig对象,并将其赋值给NioServerSocketChannel的成员变量config,而这个config(NioServerSocketChannelConfig)维护了NioServerSocketChannel的所有配置属性。比如,NioServerSocketChannelConfig提供了setConnectTimeoutMillis方法来设置NioServerSocketChannel连接超时的时间。
同时,程序可以通过ServerBootstrap或Boostrap的option(ChannelOption<T> option, T value)方法来实现配置的设置。这里,我们通过ChannelOption来实现配置的设置,ChannelOption中已经将常用的配置项预定义为了常量供我们直接使用,同时ChannelOption的一个主要的功能就是让特定的变量的值给类型化。因为从’ChannelOption<T> option’和’T value’可以看出,我们属性的值类型T,是取决于ChannelOption的泛型的,也就属性值类型是由属性来决定的。
一个attribute允许存储一个值的引用。它可以被自动的更新并且是线程安全的。
其实Attribute就是一个属性对象,这个属性的名称为AttributeKey<T> key,而属性的值为T value。
我们可以通过程序ServerBootstrap或Boostrap的attr方法来设置一个Channel的属性,如:
serverBootstrap.attr(AttributeKey.valueOf("userID"), UUID.randomUUID().toString());
当Netty底层初始化Channel的时候,就会将我们设置的attribute给设置到Channel中:
如上面所说,Attribute就是一个属性对象,这个属性的名称为AttributeKey<T> key,而属性的值为T value。
而AttributeKey也是Constant的一个扩展,因此也有一个ConstantPool来管理和创建,这和ChannelOption是类似的。
Channel类本身继承了AttributeMap类,而AttributeMap它持有多个Attribute,这些Attribute可以通过AttributeKey来访问的。所以,才可以通过channel.attr(key).set(value)的方式将属性设置到channel中了(即,这里的attr方法实际上是AttributeMap接口中的方法)。
AttributeKey、Attribute、AttributeMap间的关系:
AttributeMap相对于一个map,AttributeKey相当于map的key,Attribute是一个持有key(AttributeKey)和value的对象。因此在map中我们可以通过AttributeKey key获取Attribute,从而获取Attribute中的value(即,属性值)。
Q:ChannelHandlerContext和Channel都提供了attr方法,那么它们设置的属性作用域有什么不同了?
A:在Netty 4.1版本之前,它们两设置的属性作用域确实存在着不同,但从Netty 4.1版本开始,它们两设置的属性的作用域已经完全相同了。
若文章有任何错误,望大家不吝指教:)
圣思园《精通并发与Netty》
② Netty核心组件之NioEventLoop(一)
在接下来几篇文章,我会通过Netty的源码深入讲解NioEventLoop的实现机制。
特别说明:基于4.1.52版本的源码
先来看下NioEventLoop的类关系图和重要的属性,对其有一个整体的感知,便于后面详细分析。
首先来看NioEventLoop的构造函数
默认情况下,会创建MPSC,即多生产者单消费者的队列,这里最终会用到JCTools库,这里不过多介绍,感兴趣的可以自己去了解。
如果设置了优化开关(默认优化选项是开启的),则通过反射的方式从Selector中获取selectedKeys和publicSelectedKeys,将这两个成员设置为可写,通过反射,使用Netty构造的selectedKeySet将原生JDK的selectedKeys替换掉。
我们知道使用Java原生NIO接口时,需要先调Selector的select方法,再调selectedKeys方法才可以获得有IO事件准备好的SelectionKey集合。这里优化过后,只通过一步select调用,就可以从selectedKeySet获得需要的SelectionKey集合。
另外,原生Java的SelectionKey集合是一个HashSet,这里优化过后的SelectedSelectionKeySet底层是一个数组,效率更高。
EventLoop的职责可以用下面这张图形象的表示
下面详细解析:
每次循环,都会检测任务队列和IO事件,如果任务队列中没有任务,则直接返回SelectStrategy.SELECT;如果任务队列中有任务,则会调用非阻塞的 selectNow 检测有IO事件准备好的Channel数。
nextScheledTaskDeadlineNanos 方法返回下一个将要被执行的定时任务的截止时间
NioEventLoop的定时任务队列是一个优先级队列,队列中存储的是ScheledFutureTask对象
通过ScheledFutureTask的 compareTo 方法可以看出,优先级队列中的元素是以任务的截止时间来排序的,队首元素的截止时间最小,当截止时间相同时,以任务ID排序,ID小的排在前面。
当定时任务ScheledFutureTask执行后,会根据 periodNanos 的取值决定是否要将任务重新放回队列。从netty的注释可以清晰看到:
看下ScheledFutureTask的 run 方法
当任务的执行时间还未到,则判断任务是否已经取消,如果已取消则移除任务,否则重新加入队列。对于只执行一次的任务,执行完了不会再放回队列。其他的任务,则根据 periodNanos 的类型,重新计算截止时间,重新放回队列,等待下次调度。
定时任务的优先级队列到此介绍完毕,接着看NioEventLoop的 run 方法
在调用 select 之前,再次调用 hasTasks() 判断从上次调用该方法到目前为止是否有任务加入,多做了一层防护,因为调用 select 时,可能会阻塞,这时,如果任务队列中有任务就会长时间得不到执行,所以须小心谨慎。
如果任务队列中还是没有任务,则会调用 select 方法。在这个方法中会根据入参 deadlineNanos 来选择调用NIO的哪个select方法:
到这里,可能有人要问了: 在上面的方法中,如果调用了Java NIO的无参的 select 方法,就会进入阻塞,除非检测到Channel的IO事件,那么在检测到IO事件之前,加入到任务队列中的任务怎么得到执行呢?
好,你想,在检测到IO事件之前,可以退出阻塞的方法是什么?对,调用 wakeup 方法。那么我们来搜一下NioEventLoop中有调用Selector的 wakeup 方法的地方吗:
还真搜到了,再看一下这个方法被调用的地方
看到SingleThreadEventExecutor的 execute 方法了吗,就是说在调 execute 方法,向EventLoop提交任务时,会将EventLoop线程从Java NIO的select阻塞中唤醒。
到这里,NioEventLoop的run方法的职责之一:检测Channel的IO事件就讲解完毕。
至于IO事件的处理以及任务队列中任务的处理会在后面的文章中解析,敬请期待。
在本文中,对Netty的NioEventLoop进行了深入的解读,并且详细讲解了它的三大职责之一:检测Channel的IO事件的机制。
NioEventLoop是Netty最核心的概念,内部运行机制很复杂,在接下来的两篇文章中会继续分析。
③ Netty源码-内存泄漏检测toLeakAwareBuffer
Netty在实现 ByteBuf 时采用了引用计数法进行 ByteBuf 的回收,使用引用计数法进行回收的 ByteBuf 都扩展了 类,在使用 时需要调用 .retain 方法递增引用计数器,在使用完毕时则需要调用 .release 方法递减引用计数器,当计数器为 0 时,会进行 ByteBuf 的回收工作:池化的 ByteBuf 不会进行实际的内存释放,会将占用的内存归还给内存池,非池化的 ByteBuf 则会直接释放内存(为了叙述简单,后面释放内存则指真正释放内存或者将内存归还给内存池)。
通过上面的描述可知, ByteBuf 的正确回收依赖 retain 和 release 方法的正确调用,内存提前释放(即在使用 ByteBuf 时没有调用 retain 方法,导致提前释放)应用会报错,用户也能及时感知到;但是如果使用完 ByteBuf 忘了调用 release 则会导致内存不能及时得到回收,造成内存泄漏,且内存泄漏用户无法及时感知,久而久之就会发生OOM。为了解决这种问题,Netty采用了内存泄漏检测机制,发生内存泄漏时会通过日志将内存泄漏信息打印出来,报告给用户。
Netty的内存泄漏检测使用了 WeakReference ,即弱引用,了解过Java四种引用类型(强、软、弱、虚)和引用队列( ReferenceQueue )的读者知道,弱引用持有的对象会在虚拟机触发GC时(不管回收之后内存是否够用)被回收掉,如果使用具有引用队列参数的构造函数实例化 WeakReference 时,弱引用持有的对象在GC被回收时,弱引用自身会被放入引用队列。
为了后面能更好的理解Netty内存泄漏检测的细节,下面先看几个弱引用的例子,在下面的几个例子中,我们使用的数据类和自定义的弱引用类子类如下:
好了,三个例子已经介绍完毕,后面在介绍Netty内存泄漏检测时就使用了这里的例子结果,在具体介绍时会和这里的例子一一对应。
Netty中将普通 ByteBuf 转为具有内存泄漏检测功能的 ByteBuf 是通过 AbstractByteBufAllocator.toLeakAwareBuffer 方法实现的,我们直接在Eclipse中看该方法的调用层次即可知道Netty在哪里对 ByteBuf 进行了转换,该方法调用如下图所示:
可见池化内存分配器在分配heap或者direct ByteBuf 时都进行了转换,非池化内存分配器仅在分配direct ByteBuf 时进行了转换。个人理解时采用池化内存需要特别关注内存释放,否则为了实现池化内存预先分配的一大块内存会因为没有释放被很快分配完,造成后面没有内存进行分配。非池化分配的直接内存也需要特别注意释放,放置内存泄漏;非池化分配的heap内存(其实就是一个 byte 数组)则可以在对象被回收时同时被回收掉,发生内存泄漏的可能性较小。
本节介绍Netty中内存泄漏检测相关的类,仅做一个大致介绍,类中的重要方法我们放在后面介绍。
主要负责使用 track 方法对指定的 ByteBuf 进行内存检测泄漏进行追踪,并返回负责追踪的 ResourceLeakTracker 类实例,同时在调用 track 方法时,也会根据指定的检测级别汇报最近的内存泄漏检测结果。该类由工厂类 ResourceLeakDetectorFactory 负责实例化,默认的实现为 ResourceLeakDetector ,在 ResourceLeakDetectorFactory 类的默认实现 中,也会根据用户是否配置了 io.netty.customResourceLeakDetector 来决定采用默认实现 ResourceLeakDetector 还是使用用户自定义的 ResourceLeakDetector ,用户自定义的 ResourceLeakDetector 必须是其子类。
默认实现为 DefaultResourceLeak , DefaultResourceLeak 实现了 ResourceLeakTracker 和 ResourceLeak 接口,同时也继承了类 WeakReference ,是一个弱引用实现。首先,同上面 例2 的结果一样,如果在使用 ByteBuf 时忘了调用 .release 方法,那么将不会调用 DefaultResourceLeak.clear 方法去手动清空该弱引用持有的实际对象,在发生GC时,会由垃圾收集器对弱引用持有的实际对象进行回收,即发生了内存泄漏,同时该弱引用自身也会被加入到引用队列中,该引用队列是 ResourceLeakDetector 的成员域,上面介绍 ResourceLeakDetector 类时说到该类会在用户 track 指定 ByteBuf 是汇报检测结果,该类的汇报数据来源就是引用队列。 DefaultResourceLeak 同时还提供了 record 方法可以让用户在指定时机选择调用,这个方法可以记录用户的调用轨迹(堆栈)。 Record 同时也是一种单链表,在 DefaultResourceLeak 中就使用单链表记录用户的调用轨迹。
DefaultResourceLeak 供用户记录程序调用轨迹的类,也就是 DefaultResourceLeak.record 方法返回的对象,继承自 Throwable ,因此可以使用 Throwable.getStackTrace 方法获得调用轨迹信息,打印在内存泄漏报告中可以让用户更好的排除内存泄漏问题。
在上面介绍 ResourceLeakTracker 时,说到其默认实现为 DefaultResourceLeak , DefaultResourceLeak 提供了 record 方法记录用户的调用轨迹,用户可在调用 ByteBuf 方法时调用 record 方法记录调用轨迹,调用的频率越多,后面在汇报内存泄漏情况时就能打印出越详细的信息,这样也能更方便的排查问题。
Netty提供了两个 ByteBuf 的封装类供选择,就对应不同的 record 调用频率,每个封装类都持有 ResourceLeakTracker 对象,Netty根据配置的内存检测级别(下一节介绍相关配置参数)使用不同的 ByteBuf 封装类。
Netty提供的两个 ByteBuf 封装类就是 和 , 是 的子类, 类仅仅持有 ResourceLeakTracker 对象,但是看其源码,发现没有调用过 record 方法,所以只能知道是否发生了内存泄漏时,无法打印出任何调用轨迹信息。 作为 的子类,在 ByteBuf 的多个方法中调用了 record 方法,所以在发生内存泄漏时,能够打印出比较详细的调用轨迹信息。
在 类中使用了配置参数 io.netty.leakDetection.acquireAndReleaseOnly 来控制是否只是在调用增加或减少引用计数器的方法时才调用 record 方法记录调用轨迹,默认为false。 中 retain 和 release 方法因为改变了引用计数器就直接调用了 record 方法,而该类中的其他方法则根据 io.netty.leakDetection.acquireAndReleaseOnly 的配置决定是否调用 record 方法,这里为了节省篇幅就不列出 类中调用 record 的方法了,读者可自行查看。
在介绍相关配置参数之前,我们先看下Netty提供的内存泄漏检测级别:
Level.ADVANCED 和 Level.PARANOID 使用的 ByteBuf 包装类都是 ,我们上面介绍 ResourceLeakDetector 类时提到该类使用 track 方法对指定的 ByteBuf 进行内存检测泄漏进行追踪,并返回负责追踪的 ResourceLeakTracker 类实例,同时在调用 track 方法时,也会根据指定的检测级别汇报最近的内存泄漏检测结果。如果内存泄漏检测级别为 Level.PARANOID 时则每次调用 track 方法都会进行内存泄漏报告;如果级别为 Level.ADVANCED 或者 Level.SIMPLE 则会以一定频率进行内存泄漏报告,而不是每次 track 都进行报告。
是否关闭Netty内存泄漏检测功能,默认为false。如果该参数配置为false,则默认的内存泄漏检测级别根据此参数的配置为 Level.DISABLED ,否则默认的级别为 Level.SIMPLE 。
配置内存泄漏检测级别的参数,用于老版本的配置参数。
新的内存泄漏检测级别参数,如果没有配置,则会采用老版本参数配置的级别作为最终配置。
在第4节介绍内存泄漏检测相关类时,我们介绍过 DefaultResourceLeak 提供了 record 方法记录用户的调用轨迹,如果当前保存的调用轨迹记录数 Record 大于参数 io.netty.leakDetection.targetRecords 配置的值,那么会以一定的概率(1/2^n)删除头结点之后再加入新的记录,当然也有可能不删除头结点直接新增新的记录。
该参数的默认为4。
上面介绍过,在 类中使用了配置参数 io.netty.leakDetection.acquireAndReleaseOnly 来控制是否只是在调用增加或减少引用计数器的方法时才调用 record 方法记录调用轨迹,默认为false。
在介绍 ResourceLeakDetector 类时提到过,默认的 ResourceLeakDetector 类就是 ResourceLeakDetector ,但是用户可以使用参数 io.netty.customResourceLeakDetector 来决定采用默认实现 ResourceLeakDetector 还是使用用户自定义的 ResourceLeakDetector 。
我们在第二节介绍了Netty中将普通 ByteBuf 转为具有内存泄漏检测功能的 ByteBuf 是通过 AbstractByteBufAllocator.toLeakAwareBuffer 方法实现的。
这里我们先看下该方法的源码:
上面的源码中是调用 AbstractByteBuf.leakDetector.track(buf) 返回 ResourceLeakTracker 类对象的,这里我们看下默认的 ResourceLeakDetector 中 track 方法实现:
我们看到 AbstractByteBufAllocator.toLeakAwareBuffer 对 ResourceLeakDetector.track 返回的 DefaultResourceLeak 和传入的 ByteBuf 对象进行封装,返回了具有内存泄漏检测功能的 ByteBuf 封装类 或其子类 。如果应用程序在使用 ByteBuf 正确调用了 retain 和 release 方法,则在引用计数器为0时,则会清除弱引用持有的实际对象,发生GC时, DefaultResourceLeak 也不会被放入引用队列中(见前面第2节 例3 结果)。
但是如果应用程序在使用 ByteBuf 没有正确调用 retain 和 release 方法,则不会清除弱引用持有的实际对象,此时如果实际上已经没有强引用指向该 ByteBuf ,那么在发生GC时,垃圾收集器会回收该 ByteBuf ,而弱引用 DefaultResourceLeak 会被放入引用队列中(见前面第2节 例2 结果),加入到引用队列中的就是识别到的发生内存泄漏的 ByteBuf 。在 ResourceLeakDetector.track 方法中调用的 reportLeak 输出的就是引用队列中的弱引用 DefaultResourceLeak :
到这里,已经基本上介绍完Netty内存检测的实现原理,下面我们再看下 DefaultResourceLeak.record 是如何记录调用轨迹的:
最后我们再看下 Record 是如何输出调用轨迹的,前面我们说到 Record 继承自类 Throwable ,因此可使用 getStackTrace 方法获取实例化该对象时的调用轨迹,所以上面在输出内存泄漏报告时就调用了 Record.toString 方法:
④ Netty源码_UnpooledDirectByteBuf详解
本篇文章我们讲解缓存区 ByteBuf 八大主要类型中两种,未池化直接缓冲区 UnpooledDirectByteBuf 和 未池化不安全直接缓冲区 UnpooledUnsafeDirectByteBuf 。
UnpooledDirectByteBuf 一个基于 NIO ByteBuffer 的缓冲区。
建议使用 UnpooledByteBufAllocator.directBuffer(int, int) , Unpooled.directBuffer(int) 和 Unpooled.wrappedBuffer(ByteBuffer) ;而不是显式调用构造函数。
有四个成员属性:
通过 allocateDirect(initialCapacity) 方法创建一个新的 NIO 缓存区实例来初始化此缓存区对象。
利用现有的 NIO 缓存区创建此缓存区。
通过 NIO 缓存区 buffer 对应方法获取基本数据类型数据。
根据目标缓存区 dst 类型不同,处理的方式也不同。
你会发现这些方法都是获取此缓存区对应 NIO 缓存区 ByteBuffer 对象,调用 ByteBuffer 对象的方法,与 IO 流的交互,进行数据传输
和 get 系列方法一样, set 系列的实现也是靠 NIO 缓存区 ByteBuffer 对应方法。
剩余方法也几乎都是和 NIO 缓存区 ByteBuffer 有关,而且也不难,就不做过多介绍了。
UnpooledDirectByteBuf 主要是通过 NIO 缓存区 buffer 来存储数据。而它获取和设置数据,也都是通过 NIO 缓存区对应方法实现的。
光看介绍,和 UnpooledDirectByteBuf 没有任何区别。它也是 UnpooledDirectByteBuf 的子类。
那么 UnpooledUnsafeDirectByteBuf 和 UnpooledDirectByteBuf 不同处在那里呢?
通过复习 setByteBuffer 方法,获取 NIO 缓存区 buffer 对应的直接内存地址。
通过 UnsafeByteBufUtil 对应方法,直接从内存地址获取对应基本类型数据。
通过 UnsafeByteBufUtil 对应方法,直接向内存地址设置对应基本类型数据。
只有这个类型 hasMemoryAddress() 方法才会返回 true 。
UnpooledUnsafeDirectByteBuf 就是通过直接从内存地址中获取和设置数据的方式,提高性能。