Disruptor是一个支持多线程高并发的高性能的有界队列,且无锁和线程安全;
(这里和ArrayBlockingQueue作区分,虽然同为有界队列,但是ABQ是加锁来实现线程安全的)
内部采用一个ringbuff的数据结构,其实就是个环形数组结构。数组长度2^n,通过位运算,加快定位的速度。数组下标index采用long类型,下标采取递增的形式。所以这个环形数组中每个位置都是一直变化的,通过下标对环形数组取模运算来找到实际操作的位置。不删除ringbuff中的元素,只进行覆盖。
比如说:ringbuff的长度为1024,下标为5和下标为1029所对应的位置都是 数组为5的位置,即 index mod 1024 。
Disruptor采用无锁设计,对于每个生产者和消费者会先申请可以操作的元素在数组中的位置(ringbuff.next()),申请到之后,直接在该位置写入或者读取数据。下面详细介绍disruptor的无锁设计:
对于单线程:
1、申请写入m个元素;2、若是有m个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;3、若是返回的正确,则生产者开始写入元素。
对于多线程:
为防止多个生产者往同一个位置写入数据,Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。如果失败则重新申请空间。
如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。元素就绪状态才会去进行读取。
解决伪共享:
数据在缓存中不是以独立的项来存储的,如不是一个单独的变量,也不是一个单独的指针。缓存是由缓存行组成的,通常是64字节。伪共享问题,指在多线程(多核)环境下,如果一个变量共享了多个缓存行,或者说一个变量存在于多个CPU内核的缓存行中,那么在一个线程修改了这一变量后,其余共享这个变量的线程的缓存行都会失效。
Disruptor框架是如何解决伪共享问题的?Disruptor 为了解决伪共享问题,使用的方法是 缓存行填充 。这是一种以空间换时间的策略,主要思想就是通过往对象中填充无意义的变量,来保证整个对象 独占缓存行 。在Disruptor中有一个重要的类Sequence,该类包装了一个volatile修饰的long类型数据value,无论是Disruptor中的基于数组实现的缓冲区RingBuffer,还是生产者,消费者,都有各自独立的Sequence,RingBuffer缓冲区中,Sequence标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用RingBuffer.next()来获得下一个可使用的相对位置。对于生产者和消费者来说,Sequence标示着它们的事件序号。Sequence类中的value它的前后空间都由8个long型的变量填补了,对于一个大小为64字节的缓存行,它刚好被填补满。这样做每次把变量value读进高速缓存中时,都能把缓存行填充满(对于大小为64个字节的缓存行来说,如果缓存行大小大于64个字节,那么还是会出现伪共享问题),保证每次处理数据时都不会与其他变量发生冲突。
本文暂时没有评论,来添加一个吧(●'◡'●)