Java 线程的实现
Java 线程在 JDK1.2 之前,是基于称为“绿色线程”的用户线程实现的,而在 JDK 1.2 中,线程模型替换为基于操作系统原生线程模型来实现,因此,在目前的 JDK 版本中,操作系统支持怎样的线程模型,在很大程度上决定了 Java 虚拟机的线程是怎样映射的,这点在不同平台上没有办法达成一致,虚拟机规范中也并未限定 Java 线程需要使用哪种线程模型来实现。
举个例子,对于 Sun JDK 来说,它的 Windows 版与 Linux 版都是使用一对一的线程模型实现的,一条 Java 线程就是映射到一条轻量级进程之中,因为 Windows 和 Linux 系统提供的线程模型就是一对一的。
总结:java 线程与操作系统线程是一对一的,在 linux 上通过调用 pthread 库创建线程。
参考:
线程的创建
- 实现 Runnable 接口
- 继承 Thread 类
- 实现 Callable
- 线程池
1 public static void main(String[] args) {
2 // 实现Runnable接口
3 // new 一个接口并实现,相当于new一个实现Runnable接口的类对象放进去
4 Thread t = new Thread(new Runnable() {
5 @Override
6 public void run() {
7 while (true) {
8 System.out.println("hello world");
9 try {
10 Thread.sleep(1000);
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 }
14 }
15 }
16 });
17
18 t.start();
19 }
1 static class MyThread extends Thread {
2 @Override
3 public void run() {
4 while (true) {
5 System.out.println("hello world");
6 try {
7 Thread.sleep(1000);
8 } catch (InterruptedException e) {
9 e.printStackTrace();
10 }
11 }
12 }
13 }
14
15 public static void main(String[] args) {
16 // 继承Thread类并覆盖run方法
17 MyThread t = new MyThread();
18 t.start();
19 }
1 static class CallableImpl implements Callable {
2 @Override
3 public Object call() throws Exception {
4 for (int i = 0; i < 10; i++) {
5 System.out.println("hello world - " + (i + 1));
6 }
7 return "call 执行完毕.";
8 }
9 }
10
11 public static void main(String[] args) throws ExecutionException, InterruptedException {
12 // 通过Callable创建
13 FutureTask ft = new FutureTask(new CallableImpl());
14 Thread t = new Thread(ft, "callable 线程");
15 t.start();
16 String str = (String)ft.get();
17 System.out.println(str);
18 }
1 // 通过线程池创建线程
2 public static void main(String[] args) {
3 ExecutorService es = Executors.newFixedThreadPool(1);
4 // ()->{} 可以是一个实现Runnable或Callable对象,此处使用lambda
5 es.submit(()->{
6 while (true){
7 System.out.println("hello world");
8 Thread.sleep(1000);
9 }
10 });
11 }
线程状态
线程状态切换图
状态标识:
状态 | 说明 |
---|---|
NEW | 刚创建, 还没调用 start 未启动 |
RUNNABLE | 运行(JVM 中)或者处在等待操作系统分配资源 |
BLOCKED | 阻塞状态,在等待 synchronized 锁时处在该状态,Object#wait()也处在该状态 |
WAITING | 等待,调用 sleep 方法,join 时处理器无时间执行,LockSupport#park 方法会使线程 |
TIMED_WAITING | 计时等待,执行方法:Thread#sleep(times)、Object#wait(times)、Thread#join()没有执行时间、LockSupport#parkNanos(times)、ockSupport#parkUntil(times) |
TERMINATED | 终止,线程被终止或自然结束 |
线程状态获取示例
1 static class MyThread extends Thread {
2 @Override
3 public void run() {
4 System.out.println(this.getState());
5
6 for (int i = 0; i < 10; i++) {
7 try {
8 Thread.sleep(500);
9 } catch (InterruptedException e) {
10 e.printStackTrace();
11 }
12 System.out.println(i);
13 }
14 }
15 }
16
17 public static void main(String[] args) {
18 Thread t = new MyThread();
19
20 System.out.println(t.getState());
21
22 t.start();
23 try {
24 t.join();
25 } catch (InterruptedException e) {
26 e.printStackTrace();
27 }
28
29 System.out.println(t.getState());
30 }
线程的优先级
每一个线程都有优先级,优先级的范围是 1~10,默认为 5。 线程的优先级在调度过程可能会起到作用,在一些操作系统上甚至会忽略线程的优先级。
1 /**
2 * The minimum priority that a thread can have.
3 */
4 public final static int MIN_PRIORITY = 1;
5
6 /**
7 * The default priority that is assigned to a thread.
8 */
9 public final static int NORM_PRIORITY = 5;
10
11 /**
12 * The maximum priority that a thread can have.
13 */
14 public final static int MAX_PRIORITY = 10;
守护线程
守护线程是为其他线程服务的线程。
设置方法
1setPriority(int);
原子性 & 同步
原子意为“不能够被进一步分割的最小粒子”,原子操作做不可被中断的一个或一组操作。
简单理解就是一个操作不可分割,如果执行就一次性执行完,中间不能有其他人操作,否则就会有问题。在 java 中++
运算符不是原子性的,因为它可以被分为三个操作,读取变量,为遍历赋值,写入变量,多线程环境下同时访问这个变量就会出现错误数据,在不加锁的情况下,一个变量++
操作就是非原子性的。
同步的关键是保证操作的原子性,以及线程之间操作数据的可见性,进而保证了线程安全。
synchronized
synchronized 的英意是:已同步、同步的。
通常把 synchronized 称为一把“锁”,这是一把重锁,为什么称为重锁?
因为 synchronized 保证互斥访问时是在操作系统“内核态”使用操作系统进行加锁的,锁的释放和线程的来回切换开销比较大。
synchronized 是可重入锁,可重入的意思是当一个线程持有了一个代码块的锁,如果在这个代码块中在调用了另一个同步方法,这个线程仍然可以这个方法(这两个代码块使用同一个对象进行加锁,如 this)。可重入使用计数方式,如果再次持有某个对象的锁就让计数值加一,待退出方法后值减一,退出最外层的方法锁就释放了。
1public class T_ReentrantSynchnorized {
2 // lock by this
3 synchronized static void innerMethod() {
4 System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
5 System.out.println("内部调用方法");
6 }
7
8 // lock by this
9 synchronized static void method() {
10 System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
11 System.out.println("method_1");
12 //
13 innerMethod();
14 System.out.println("method_2");
15 }
16
17 public static void main(String[] args) {
18 // 证明synchronized是可重入的
19 // 说明:启动10个线程执行加锁方法,方法加的是对象的锁,因此10个线程抢夺一个锁,强到后执行
20 // 如果synchronized不能重入,那么innerMethod不会执行
21 for (int i = 0; i < 10; i++) {
22 new Thread(() -> {
23 T_ReentrantSynchnorized.method();
24 System.out.println("--------------------");
25 },(i+1)+"").start();
26 }
27 }
28}
synchronized 在 JVM 是基于进入退出 Monitor 对象来实现的,方法和代码块的实现有差别,在字节码上表现出了差异。
synchronized 编译后的字节码后, 如果 synchronized 是加载方法上,那编译后的字节码 Monitor 监控的是整个方法,字节码方法上有 synchronized 标记。
如果是在方法内部使用 synchronized 对一部分代码加锁,编译后会在这个代码块前后添加monitorenter
monitorexit
,保证在访问这块代码是是原子操作,一个线程进入后其他线程需要等待。
编译后的字节码如下:
上个类 T 对方法直接加锁后的字节码。
1// java code
2 synchronized static void innerMethod() {
3 System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
4 System.out.println("内部调用方法");
5 }
6
7// bate code
8 static synchronized void innerMethod();
9 Code:
10 0: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
11 3: new #3 // class java/lang/StringBuilder
12 6: dup
13 7: invokespecial #4 // Method java/lang/StringBuilder."<init>":()V
14 10: ldc #5 // String Thread.currentThread().getName() =
使用对象对方法内的一部分代码加锁。
1// java code
2public class T {
3 private int count = 10;
4 private Object o = new Object();
5
6 public void m() {
7 synchronized(o) { //使用对象o对代码块加锁
8 count--;
9 System.out.println(Thread.currentThread().getName() + " count = " + count);
10 }
11 }
12}
13
14// m方法的byte code
15 public void m();
16 Code:
17 0: aload_0
18 ...
19 5: astore_1
20 6: monitorenter
21 7: aload_0
22 ...
23 54: aload_1
24 55: monitorexit
25 ...
26 60: aload_1
27 61: monitorexit
synchronized 的使用示例
1static class T{
2 // 一把锁,Java中任何一个对象都可以作为synchronized使用的锁
3 Object lock = new Object();
4
5 // 把的实例作为锁
6 synchronized void m(){
7 System.out.println("hello world");
8 }
9
10 // 把lock对象做为锁 ↑
11 void n(){
12 System.out.println("top");
13 synchronized (lock){
14 System.out.println("inner");
15 }
16 System.out.println("bottom");
17 }
18}
19
20static class Y{
21 // static方法加锁,类必须也是静态的,应为静态方法synchronized加锁使用的是类对象 即 Y.class
22 static synchronized void X(){
23 System.out.println("hello world");
24 }
25}
sychronized 使用总结:
- 在非静态类的方法加锁,则锁是该类的实例
- 在方法内部加锁要显示式指定一个对象,可以是自己创建的对象(如上边的 lock)或者 this,this 即实例对象,在任意时刻只能由一个线程访问一个实例
- 在静态类方法上加锁,则锁是类的对象(Y.class),在任意时刻只能由一个线程访问该类
基本数据类型包装类和字符串不要作为锁对象,他们在 jvm 中都有缓存,这样锁就会暴露,有可能这把锁在别处使用,结果可想而知。
JDK6 对 synchronized 的优化
JDK6 对 synchronized 加锁的方式优化后,synchronized 锁并不是那么重了,它加入了锁升级的方式来减少加锁、释放锁的开销。
java 中每个对象都可以做为锁,这是因为锁的信息就在对象的头信息上。
对象的头部信息称为 mark word,它的 Hotspot 的结构如下。
锁升级过程
无锁 -> 偏向锁 -> 轻量级锁/自旋锁 -> 重量级锁
- 无锁,一个对象被创建出来,它处于无锁的状态
- 当有一个线程访问这个锁对象会升级为偏向锁,这个线程的指针会被记录在这个锁对象的 mark word 里
- 当有两个两个线程同时竞争这个锁对象时,这个锁对象的锁就会变成轻量级锁,进行的操作如下:
- 在线程栈中创建 lock record 对象
- 将锁对象的 mark work 拷贝到 lock record
- 锁对象的 mark work 记录 lock record 指针
- 释放竞争线程 CAS 修改 lock record 中的线程指针指向只记线程的指针
- 修改成功的获得锁,其他线程进入自旋状态
- 当轻量级锁竞争过于激烈(到达阈值时),升级为重量级锁,锁对象 mark work 指向系统互斥量指针,获得锁的线程执行,其他线程阻塞
锁的对比
名称 | 加锁方式 | 优点 | 缺点 | 使用场景 |
---|---|---|---|---|
偏向锁 | mark work 标记线程指针 | 快,仅修改锁对象的 mark work,纳秒及开销 | 多线线程竞争会存在撤销开销 | 适用在只有一个线程同步快的场景 |
轻量级锁 | mark work 记录 lock record | 快,竞争线程不阻塞 | 自旋占用 CPU 资源 | 同步代码执行快,响应时间快的场景 |
重量级锁 | 操作系统锁 | 吞吐量高(阻塞线程不消耗 CPU) | 线程阻塞,响应时间慢 | 同步代码执行慢的场景 |
CPU 常见术语
- 内存屏障 - memory barriers,是一组 CPU 指令,用于实现对内存操作的顺序限制
- 缓存行 - cache line,是 CPU 高速缓存存储的单位,CPU 从内存中读取数据每次读取一行并缓存(在 L1 L2 L3 等 cache 中),缓存行通常大小为 64byte
- 原子操作 - auto operations,不可中断的一个或一组操作
- 缓存行填充 - cache line fill,假设缓存行大小为 64byte,当不足 64byte 是进行补充(补齐),正好填充为 64byte
- 缓存命中 - cache hit, CPU 读取数据,先在 cache 中查看,如果有就直接读取,而不是再到内存中取
- 写命中 - write hit,CPU 将操作数写回到内存时,先检查缓存中是否有这块数据,如果存在这个缓存行,则将操作数写入缓存而不是内存,这个操作称为写命中
volatile
volatile(易变的、不稳定的),这也是这个关键字在 Java、JVM、CPU 一些操作的语义。
volatile 并发编程中常用的一个关键字,但它并不是锁,因为 volatile 不能保证操作的原子性,它仅具有让变量在多个线程间具有可见性。
volatile 有两个作用:
- 阻止指令重排序
- 使共享变量在多个线程间可见 —— 增加可见性
注意点:volatile 只能保证其修饰的变量可见,比如引用或基本数据类型,并不能保证引用的内部数据可见。
可见性与不可见
在多核处理器上,线程并发执行,在访问变量时会把变量复制一份到当前线程(线程私有空间,缓存),待处理完再写回,当此线程在操作共享变量的时候他对外部变量的修改感知不到,因为它读取的是自己拷贝的(CPU 的缓存),这是线程对共享变量不可见。可见性,就是共享变量被修改后,其他线程能够及时读取到变量的新值。
对线程共享变量加 volatile、synchronized 都会使线程对共享变量可见。
volatile 如何保证可见性的?
依靠 CPU 缓存一致性协议和对总线加锁操作。
- 将处理器的缓存行写回内存(缓存一致性协议)
- 写回操作在其他 CPU 里缓存了该地址的数据无效(缓存一致性协议)
- 锁总线,只允许一个处理器访问缓存和内存
将处理器缓存写入内存是 JVM 向 CPU 发出的一个 Lock 指令,将变量所在的缓存行写入内存;另外 CPU 包保障缓存一致性是依靠 CPU 缓存协议的,具体操作为处理器通过嗅探总线上传过来的数据来检查缓存值是否过期,当处理器发现对应的行过期或对应内存地址被修改,就会将缓存设为无效,当处理器对数据进行操作是会重新从把对应内存的数据读取到缓存里。
CPU 如何保证缓存一致的?
- 锁总线(处理器 Lock 信号),屏蔽其他处理器,只允许一个处理器访问缓存和内存
- 锁缓存(缓存一致性协议),阻止其他处理器修改缓存区数据在内存的区域,其他处理器回写内存数据使之无效
通过 volatile 提升性能。缓存行填充可以提高 CPU 读写内存的效率,可以参考这篇文章: 剖析 Disruptor:为什么会这么快?(二)神奇的缓存行填充
在老版本 JDK7 里的 LinkedTransferQueue 就使用了缓存行填充来提升效率,但在 JDK8 版本中不再使用缓存行填充了。
ThreadLocal
先看 ThreadLocal 的使用示例。
1public class T {
2 public static void main(String[] args) {
3 final ThreadLocal<String> threadLocal = new ThreadLocal<>();
4 threadLocal.set("我是主线程main的保存的数据");
5
6 Thread t = new Thread(() -> {
7 threadLocal.set("这是线程t保存的数据");
8 System.out.println("threadLocal.get() = " + threadLocal.get());
9 }, "t");
10
11 t.start();
12 System.out.println("threadLocal.get() = " + threadLocal.get());
13 }
14}
ThreadLocal 的作用。本地线程 ThreadLocal 可以将当前线程(在哪个线程执行的方法里就是哪个线程)与一个变量相关联,可以简单的理解为 Map<K,V>,当前线程就是 K,设置和获取的数据就是 V。
基于 ThreadLocal 可以与变量绑定的特性,可以将线程的上下文存放进去,这样就可以随时拿到这个数据。典型的应用是 Spring 的@Transactional 的实现,想一下为什么我们没有指定 Connection 为什么事务就能回滚了?因为这里面用到了 ThreadLocal,并把 Connection 绑定到了当前线程,发生异常从中取出来然后撤销事务。
ThreadLocal 只是一个操作变量的工具,真正的数据不在 ThreadLocal 而是 Thread 对象上。
一个线程可以有多个 ThreadLocal,多个 ThreadLocal 维护(增/删)Thread 对象上的一个 Map 结构;一个 ThreadLocal 代表一个本地变量,因为存放变量的键是 ThreadLocal 对象。看完源码后这点会更清晰,先看张图。
ThreadLocal 源码分析。
1public class ThreadLocal<T> {
2
3 public ThreadLocal() {
4 }
5
6 /**
7 * 获取与当前线程相关联的对象
8 */
9 public T get() {
10 // 获取当前线程
11 Thread t = Thread.currentThread();
12 // 获取线程的threadLocals,threadLocals是一个ThreadLocalMap
13 ThreadLocalMap map = getMap(t);
14 if (map != null) {
15 // 从map中拿到数据
16 ThreadLocalMap.Entry e = map.getEntry(this);
17 if (e != null) {
18 @SuppressWarnings("unchecked")
19 T result = (T)e.value;
20 return result;
21 }
22 }
23 // 创建空一个ThreadLocalMap,并放入一个null
24 return setInitialValue();
25 }
26
27 // 初始化方法,初始创建一个ThreadLocalMap,然后添加个null
28 private T setInitialValue() {
29 T value = initialValue(); // null
30 Thread t = Thread.currentThread();
31 ThreadLocalMap map = getMap(t);
32 if (map != null)
33 map.set(this, value);
34 else
35 createMap(t, value);
36 return value;
37 }
38
39 // 设置
40 public void set(T value) {
41 // 获取当前线程
42 Thread t = Thread.currentThread();
43 // 获取当前线程的threadLocals,threadLocals是一个ThreadLocalMap
44 ThreadLocalMap map = getMap(t);
45 if (map != null)
46 map.set(this, value);
47 else
48 createMap(t, value);
49 }
50
51 public void remove() {
52 ThreadLocalMap m = getMap(Thread.currentThread());
53 if (m != null)
54 m.remove(this);
55 }
56
57 // threadLocals是一个ThreadLocalMap
58 ThreadLocalMap getMap(Thread t) {
59 return t.threadLocals;
60 }
61
62 /**
63 * Create the map associated with a ThreadLocal. Overridden in
64 * InheritableThreadLocal.
65 */
66 void createMap(Thread t, T firstValue) {
67 t.threadLocals = new ThreadLocalMap(this, firstValue);
68 }
69
70 /**
71 * Thread的对象中的ThreadLocals对象,是一个Map,键是ThreadLocal对象
72 */
73 static class ThreadLocalMap {
74
75 static class Entry extends WeakReference<ThreadLocal<?>> {
76 Object value;
77 Entry(ThreadLocal<?> k, Object v) {
78 super(k);
79 value = v;
80 }
81 }
82
83 /**
84 * 默认容量
85 */
86 private static final int INITIAL_CAPACITY = 16;
87
88 /**
89 * 存储数据数组
90 */
91 private Entry[] table;
92
93 private int size = 0;
94
95 /**
96 * resize、rehash时的容量,达到这个值时会进行resize
97 */
98 private int threshold; // Default to 0
99
100
101 ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
102 table = new Entry[INITIAL_CAPACITY];
103 int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
104 table[i] = new Entry(firstKey, firstValue);
105 size = 1;
106 setThreshold(INITIAL_CAPACITY);
107 }
108
109 /**
110 * Set the value associated with key.
111 */
112 private void set(ThreadLocal<?> key, Object value) {
113
114 Entry[] tab = table;
115 int len = tab.length;
116 int i = key.threadLocalHashCode & (len-1);
117
118 for (Entry e = tab[i];
119 e != null;
120 e = tab[i = nextIndex(i, len)]) {
121 ThreadLocal<?> k = e.get();
122
123 if (k == key) {
124 e.value = value;
125 return;
126 }
127
128 if (k == null) {
129 replaceStaleEntry(key, value, i);
130 return;
131 }
132 }
133
134 tab[i] = new Entry(key, value);
135 int sz = ++size;
136 if (!cleanSomeSlots(i, sz) && sz >= threshold)
137 rehash();
138 }
139
140 /**
141 * Remove the entry for key.
142 */
143 private void remove(ThreadLocal<?> key) {
144 Entry[] tab = table;
145 int len = tab.length;
146 int i = key.threadLocalHashCode & (len-1);
147 for (Entry e = tab[i];
148 e != null;
149 e = tab[i = nextIndex(i, len)]) {
150 if (e.get() == key) {
151 e.clear();
152 expungeStaleEntry(i);
153 return;
154 }
155 }
156 }
157 }
158
159}
Thread 中有一个 ThreadLocal 的 ThreadLocalMap,用来存储 ThreadLocal 放置(set)的数据。
1// 线程类
2public class Thread implements Runnable {
3
4 ...
5 private volatile String name;
6 private int priority;
7 private Thread threadQ;
8 private long eetop;
9
10 ...
11
12 /* ThreadLocal中的map */
13 ThreadLocal.ThreadLocalMap threadLocals = null;
14 ...
15}
(完)