1 相关代码,请见/下载于
https://github.com/Wasabi1234/concurrency
1 基本概念
1.1 并发
同时拥有两个或者多个线程,如果程序在单核处理器上运行多个线程将交替地换入或者换出内存,这些线程是同时“存在"的,每个线程都处于执行过程中的某个状态,如果运行在多核处理器上,此时,程序中的每个线程都将分配到一个处理器核上,因此可以同时运行.
1.2 高并发( High Concurrency)
互联网分布式系统架构设计中必须考虑的因素之一,通常是指,通过设计保证系统能够同时并行处理很多请求.
1.3 区别与联系
- 并发: 多个线程操作相同的资源,保证线程安全,合理使用资源
- 高并发:服务能同时处理很多请求,提高程序性能
2 CPU
2.1 CPU 多级缓存
- 为什么需要CPU ***
CPU的频率太快了,快到主存跟不上
如此,在处理器时钟周期内,CPU常常需要等待主存,浪费资源。所以***的出现,是为了缓解CPU和内存之间速度的不匹配问题(结构:cpu-> ***-> memory ). - CPU ***的意义
1) 时间局部性
如果某个数据被访问,那么在不久的将来它很可能被再次访问
2) 空间局部性
如果某个数据被访问,那么与它相邻的数据很快也可能被访问 - 2.2 缓存一致性(MESI)
- 用于保证多个 CPU *** 之间缓存共享数据的一致
- M-modified被修改
该缓存行只被缓存在该 CPU 的缓存中,并且是被修改过的,与主存中数据是不一致的,需在未来某个时间点写回主存,该时间是允许在其他CPU 读取主存中相应的内存之前,当这里的值被写入主存之后,该缓存行状态变为 E - E-exclusive独享
缓存行只被缓存在该 CPU 的缓存中,未被修改过,与主存中数据一致
可在任何时刻当被其他 CPU读取该内存时变成 S 态,被修改时变为 M态 - S-shared共享
该缓存行可被多个 CPU 缓存,与主存中数据一致 - I-invalid无效
- 乱序执行优化
处理器为提高运算速度而做出违背代码原有顺序的优化
并发的优势与风险

3 项目准备
3.1 项目初始化



3.2 并发模拟-Jmeter压测





3.3 并发模拟-代码
CountDownLatch

Semaphore(信号量)

以上二者通常和线程池搭配
下面开始做并发模拟
package com.mmall.concurrency;
import com.mmall.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @author shishusheng
* @date 18/4/1
*/
@Slf4j
@NotThreadSafe
public class ConcurrencyTest {
/**
* 请求总数
*/
public static int clientTotal = 5000;
/**
* 同时并发执行的线程数
*/
public static int threadTotal = 200;
public static int count = 0;
public static void main(String[] args) throws Exception {
//定义线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//定义信号量,给出允许并发的线程数目
final Semaphore semaphore = new Semaphore(threadTotal);
//统计计数结果
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
//将请求放入线程池
for (int i = 0; i < clienttotal i executorservice.execute -> {
try {
//信号量的获取
semaphore.acquire();
add();
//释放
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
//关闭线程池
executorService.shutdown();
log.info("count:{}", count);
}
/**
* 统计方法
*/
private static void add() {
count++;
}
}
运行发现结果随机,所以非线程安全
4线程安全性
4.1 线程安全性
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的
4.2 原子性
4.2.1 Atomic 包
- AtomicXXX:CAS,Unsafe.compareAndSwapInt
提供了互斥访问,同一时刻只能有一个线程来对它进行操作
package com.mmall.concurrency.example.atomic;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author shishusheng
*/
@Slf4j
@ThreadSafe
public class AtomicExample2 {
/**
* 请求总数
*/
public static int clientTotal = 5000;
/**
* 同时并发执行的线程数
*/
public static int threadTotal = 200;
/**
* 工作内存
*/
public static AtomicLong count = new AtomicLong(0);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clienttotal i executorservice.execute -> {
try {
System.out.println();
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
//主内存
log.info("count:{}", count.get());
}
private static void add() {
count.incrementAndGet();
// count.getAndIncrement();
}
}
package com.mmall.concurrency.example.atomic;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author shishusheng
* @date 18/4/3
*/
@Slf4j
@ThreadSafe
public class AtomicExample4 {
private static AtomicReference count = new AtomicReference<>(0);
public static void main(String[] args) {
// 2
count.compareAndSet(0, 2);
// no
count.compareAndSet(0, 1);
// no
count.compareAndSet(1, 3);
// 4
count.compareAndSet(2, 4);
// no
count.compareAndSet(3, 5);
log.info("count:{}", count.get());
}
}
- AtomicReference,AtomicReferenceFieldUpdater
- AtomicBoolean
- AtomicStampReference : CAS的 ABA 问题
4.2.2 锁
synchronized:依赖 JVM
- 修饰代码块:大括号括起来的代码,作用于调用的对象
- 修饰方法: 整个方法,作用于调用的对象
- 修饰静态方法:整个静态方法,作用于所有对象
package com.mmall.concurrency.example.count;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @author shishusheng
*/
@Slf4j
@ThreadSafe
public class CountExample3 {
/**
* 请求总数
*/
public static int clientTotal = 5000;
/**
* 同时并发执行的线程数
*/
public static int threadTotal = 200;
public static int count = 0;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clienttotal i executorservice.execute -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private synchronized static void add() {
count++;
}
}
synchronized 修正计数类方法
- 修饰类:括号括起来的部分,作用于所有对象
子类继承父类的被 synchronized 修饰方法时,是没有 synchronized 修饰的!!!
Lock: 依赖特殊的 CPU 指令,代码实现
4.2.3 对比
- synchronized: 不可中断锁,适合竞争不激烈,可读性好
- Lock: 可中断锁,多样化同步,竞争激烈时能维持常态
- Atomic: 竞争激烈时能维持常态,比Lock性能好; 只能同步一
个值
4.3 可见性
一个线程对主内存的修改可以及时的被其他线程观察到
###4.3.1 导致共享变量在线程间不可见的原因
- 线程交叉执行
- 重排序结合线程交叉执行
- 共享变量更新后的值没有在工作内存与主存间及时更新
4.3.2 可见性之synchronized
JMM关于synchronized的规定
- 线程解锁前,必须把共享变量的最新值刷新到主内存
- 线程加锁时,将清空工作内存***享变量的值,从而使
用共享变量时需要从主内存中重新读取最新的值(加锁与解锁是同一把锁)
4.3.3 可见性之volatile
通过加入内存屏障和禁止重排序优化来实现
- 对volatile变量写操作时,会在写操作后加入一条store
屏障指令,将本地内存中的共享变量值刷新到主内存 - 对volatile变量读操作时,会在读操作前加入一条load
屏障指令,从主内存中读取共享变量 - volatile使用
volatile boolean inited = false;
//线程1:
context = loadContext();
inited= true;
// 线程2:
while( !inited ){
sleep();
}
doSomethingWithConfig(context)
4.4 有序性
一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序
JMM允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性
4.4.1 happens-before 规则
5发布对象
5.1 安全发布对象
package com.mmall.concurrency.example.singleton;
import com.mmall.concurrency.annoations.NotThreadSafe;
/**
* 懒汉模式 -》 双重同步锁单例模式
* 单例实例在第一次使用时进行创建
* @author shishusheng
*/
@NotThreadSafe
public class SingletonExample4 {
/**
* 私有构造函数
*/
private SingletonExample4() {
}
// 1、memory = allocate() 分配对象的内存空间
// 2、ctorInstance() 初始化对象
// 3、instance = memory 设置instance指向刚分配的内存
// JVM和cpu优化,发生了指令重排
// 1、memory = allocate() 分配对象的内存空间
// 3、instance = memory 设置instance指向刚分配的内存
// 2、ctorInstance() 初始化对象
/**
* 单例对象
*/
private static SingletonExample4 instance = null;
/**
* 静态的工厂方法
*
* [@return](/profile/547241) */
public static SingletonExample4 getInstance() {
// 双重检测机制 // B
if (instance == null) {
// 同步锁
synchronized (SingletonExample4.class) {
if (instance == null) {
// A - 3
instance = new SingletonExample4();
}
}
}
return instance;
}
}
7 AQS
7.1 介绍
- 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
- 利用了一个int类型表示状态
- 使用方法是继承
- 子类通过继承并通过实现它的方法管理其状态{acquire 和release} 的方法操纵状态
- 可以同时实现排它锁和共享锁模式(独占、共享)
同步组件
CountDownLatch
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author shishusheng
* /
@Slf4j
public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadcount i final int threadnum='i;' exec.execute -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 指定时间内处理任务
* @author shishusheng
* /
@Slf4j
public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadcount i final int threadnum='i;' exec.execute -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(10, TimeUnit.MILLISECONDS);
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
}
}
Semaphore用法



CycliBarrier
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* [@author shishusheng](/profile/5)
* [/
@Slf4j
public class CyclicBarrierExample1 {](/profile/5)
[private static CyclicBarrier barrier = new CyclicBarrier](/profile/5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10 i final int threadnum='i;' thread.sleep1000 executor.execute -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* [@author shishusheng](/profile/5)
* [/
@Slf4j
public class CyclicBarrierExample2 {](/profile/5)
[private static CyclicBarrier barrier = new CyclicBarrier](/profile/5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10 i final int threadnum='i;' thread.sleep1000 executor.execute -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @author shishusheng
* /
@Slf4j
public class SemaphoreExample3 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
```
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadcount i final int threadnum='i;' exec.execute -> {
try {
// 尝试获取一个许可
if (semaphore.tryAcquire()) {
test(threadNum);
// 释放一个许可
semaphore.release();
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
9 线程池
9.1 newCachedThreadPool
9.2 newFixedThreadPool
9.3 newSingleThreadExecutor
看出是顺序执行的
9.4 newScheduledThreadPool
10 死锁
Java并发 - (并发基础)
1、什么是共享资源
- 堆是被所有线程共享的一块内存区域。在虚拟机启动时创建。此内存区域的唯一目的就是存放对象实例
- Java中几乎所有的对象实例都在这里分配内存。方法区与堆一样,也是各个线程共享的一块内存区域,它用于存储已被虚拟机加载的类型信息、常量、静态变量、即时编译器编译后的代码缓存等数据。
- 光看文字,会让我们觉得很抽象。如下图:
2、并发编程的难点
原子性问题
- 操作系统做任务切换(CPU切换),可以发生在任何一条CPU指令执行完成后;
- CPU能保证的原子操作是指令级别的,而不是高级语言的操作符(例如:n++)。
- 如下图,是n++编译后,被编译CPU执行的指令。
可见性问题
- 可见性是指一个线程对共享变量的修改,另外一个线程能够立刻看到。
- 可见性问题是由CPU的缓存导致的,多核CPU均有各自的缓存,这些缓存均要与内存进行同步。
有序性问题
- 在执行程序时。为了提高性能,编译器和处理器常常会对指令做重排序;
- 重排序不会影响单线程的执行结果,但是在并发情况下,可能会出现诡异的BUG。
- 参考地址:https://zhuanlan.zhihu.com/p/298448987
3、JMM
并发编程的关键目标
并发编程需要处理两个关键问题,即线程之间如何通信和同步。
- 通信:指线程之间以何种机制来交换信息;
- 同步:指程序中用于控制不同线程之间的操作发生的相对顺序的机制。
并发编程的内存模型
共有两种并发编程模型:共享内存模型、消息传递模型,Java采用的是前者。
- 在共享内存模型下,线程之间共享程序的公共状态,通过写-读内存中的公共状态进行隐式通信;
- 在共享内存模型下,同步是显示进行的,程序员必须显示指定某段代码需要在线程之间互斥执行。
这个内存模型:JMM
- 参考地址:https://javaguide.cn/java/concurrent/jmm.html#从-cpu-缓存模型说起
- 参考地址:https://xie.infoq.cn/article/739920a92d0d27e2053174ef2
- 参考地址:http://concurrent.redspider.group/RedSpider.html
JMM 是Java Memory Model的缩写,Java线程之间的通信由 JMM 控制,即 JMM决定一个线程对共享变量的写入何时对另一个线程可见。JMM定义了线程和主内存之间的抽象关系,通过控制主内存与每个本地内存(抽象概念)之间的交互,JMM为Java程序员提供了内存可见性的保证。
源代码与指令间的重排序
为了提高性能,编译器和处理器常常会对指令做重排序。重排序有3种类型,其中后2种都是处理器重排序。这些重排序可能会导致多线程程序出现内存可见性问题。
- 编译器优化重排序:编译器在不改变单线程程序语义的前提下可以重新安排语句的执行顺序。
- 指令级并行重排序︰现代处理器采用了指令级并行技术来将多条指令重叠执行,如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
- 内存系统的重排序:由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。
重排序对可见性的影响
参考下图,虽然处理器执行的顺序是A1->A2,但是从内存角度来看,实际发生的顺序是A2->A1。这里的关键是,由于写缓冲区仅对自己的处理器可见,它会导致处理器执行内存操作的顺序可能会与实际的操作执行顺序不一致。由于现代的处理器都会使用写缓冲区,因此它们都会允许对写 - 读操作执行重排序。
如何解决重排序带来的问题
对于编译器,JMM的编译器重排序规则会禁止特定类型的编译器重排序(比如volatile)。
对于处理器重排序,JMM的处理器重排序规则会要求编译器在生成指令序列时,插入特定类型的内存屏障(Memcry Barries /Memory Fence)指令,通过内存屏障指令来禁止特定类型的处理器重排序。
由于常见的处理器内存模型比JMM要弱, Java编译器在生成字节码时,会在执行指令序列的适当位置插入内存屏障来限制处理器的重排序。同时,由于各种处理器内存模型的强弱不同,为了在不同的处理器平台向程序员展示一个一致的内存模型,JMM在不同的处理器中需要插入的内存屏障的数量和种类也不同。
CPU内存屏障:
- LoadLoad: 禁止读和读的重排序;
- StoreStore:禁止写和写的重排序;
- LoadStore:禁止读和写的重排序;
- StoreLoad:禁止写和读的重排序。
Java内存屏障
java
public final class Unsafe {
public native void loadFerice();// LoadLoad + LoadStore
public native void storeFence();// StoreStore + LoadStore
public native void fullFence(); // loadFence() + storeFence() + StoreLoad
}
happens-before
JMM使用happens-before规则来阐述操作之间的内存可见性,以及什么时候不能重排序。在JMM中。如果一个操作执行的结果需要对另一个操作可见,那么这两个操作之间必须要存在happens-before关系。换个角度来说.如果A happens-before B,则意味着A的执行结果必须对B可见,也就是保证跨线程的内存可见性。其中,前4条规则与程序员密切相关。
- 1、程序顺序规则:一个线程中的每个操作, happens-before于该线程中的任意后续操作;
- 2、volatile变量规则:对一个volatile域的写, happens-before于任意后续对这个volatile域的读;
- 3、synchronized规则:对一个锁的解锁, happens-before于随后对这个锁的加锁;
- 4、传递性:若A happens-before B,且B happens-before C,则A happens-before C;
- 5、start()规则:若线程A执行Thread.start(),则线程A的start()操作happens-before于线程B中的任意操作;
- 6、 join()规则︰若线程A执行ThreadB.join()并成功返回,那么线程B中的任意操作happens-before于线程A从ThreadB.join()的成功返回。
4、Volatile
volatile的基本特性
- 可见性:对一个volatile变量的读,总是能看到对这个volatile变量最后的写入;
- 原子性:对任意单个volatile变量的读/写具有原子性,但类似vclatile++这种复合操作不具有原子性。
volatile的内存语义
- 写内存语义:当写一个volatile变量时,JMM会把该线程本地内存中的共享变量的值刷新到主内存;
- 读内存语义:当读一个volatile变量时,JMM会把该线程本地内存置为无效,使其从主内存中读取共享变量。
volatile的实现机制
为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。内存屏障插入策略非常保守,但它可以保证在任意处理器平台,任意的程序中都能得到正确的volatile内存语义。
- 在每个volatile写操作的前面插入一个StoreStore屏障;
- 在每个volatile写操作的后面插入一个StoreLoad屏障;
- 在每个volatile读操作的后面插入—个LoadLoad屏障;
- 在每个volatile读操作的后面插入一个LoadStore屏障。
volatile与锁的对比
volatile仅仅保证对单个volatile变量的读/写具有原子性,而锁的互斥执行的特性可以确保对整个临界区代码的执行具有原子性。在功能上锁比volatile更强大,在可伸缩性和执行性能上volatile更有优势。
5、锁
锁的内存语义
- 当线程释放锁时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存中;
- 当线程获取锁时,JMM会把该线程对应的本地内存置为无效。
锁的实现机制
- synchronized:采用CAS + Mark Word实现。存在锁升级的情况;
- Lock:采用CAS + volatile实现。存在锁降级的情况核心是AQS 。
本文暂时没有评论,来添加一个吧(●'◡'●)