环境
并发编程基本概念
为了利用多核处理器,采用多线程编程能节省大量运行时间,带来的缺点是会导致程序复杂度的提升。
并行、并发
并行:两个任务同时运行,如启动多个线程,每个线程运行各自的任务,在多核处理器中,不同的任务同时运行,可视为并行。(单核系统不存在并行)
并发:多个线程同时访问一个任务。
同步、异步
同步:调用一个方法,必须等待该方法返回后,才能继续下一步。
异步:调用方法后,必须等待方法的返回,继续执行下一步。在Java中被调用的方法,一般会在另一个线程里面继续执行。
临界区
用于多线程的互斥访问。如果有多个线程试图同时访问临界区,那么在有一个线程进入临界区后,其他试图访问的线程将被挂起,直到进入临界区的线程离开。临界区在被释放后,其他线程可以继续抢占,并以此达到对临界区的互斥访问。
阻塞、非阻塞
阻塞:在一个线程占用了临界区的资源,其他线程想要进入临界区需等进入临界区的线程释放资源,这时是其他等待的线程被阻塞挂起。
非阻塞:没有线程妨碍其他线程执行。
死锁、饥饿、活锁
死锁:单两个线程持有自己当前的锁,同时等待获取对方的锁,导致死锁。
饥饿:在非公平模式下,如果存在一个低优先级任务,同时存在大量高优先级任务,会导致任务低优先级任务长期获取不到资源,称之为饥饿。不过当高优先级任务完成后,低优先级任务还是有机会执行。
活锁:活锁、死锁本质上是一样的,原因是在获取临界区资源时,并发多个进程/线程声明资源占用(加锁)的顺序不一致,死锁是加不上就死等,活锁是加不上就放开已获得的资源重试(tryLock)。
死锁举例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import java.util.concurrent.TimeUnit;
class TaskA{ public synchronized void method(TaskB b) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } b.method(); } public synchronized void method(){ System.out.println("任务A"); } }
class TaskB{ public synchronized void method(){ System.out.println("任务B"); } public synchronized void method(TaskA a){ a.method(); } }
public class Main { public static void main(String[] args) throws Exception{ TaskA taskA = new TaskA(); TaskB taskB = new TaskB();
new Thread(() -> taskA.method(taskB)).start(); new Thread(()-> taskB.method(taskA)).start(); } }
|
打开jvisualvm(JAVA_HOME/bin/jvisualvm)可以看到线程死锁了。
饥饿举例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| import javax.swing.*; import java.awt.*; import java.util.concurrent.TimeUnit;
class ProcessThread implements Runnable{ JProgressBar progressBar;
public JProgressBar getProgressBar() { return progressBar; }
public ProcessThread(String name) { progressBar = new JProgressBar(); progressBar.setString(name); progressBar.setStringPainted(true); }
@Override public void run() { int c = 0; while (true){ synchronized (Main.shareObj){ if (c == 100){ c = 0; } c = c+ 10; progressBar.setValue(c); try { TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) { e.printStackTrace(); } } } } }
public class Main { public static final Object shareObj = new Object();
public static void main(String[] args) throws Exception{ JFrame starvation = new JFrame("Starvation"); starvation.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE); starvation.setSize(300, 200); starvation.setLayout(new FlowLayout(FlowLayout.LEFT));
for (int i = 0; i < 5; i++) { ProcessThread processThread = new ProcessThread("Thread-" + i); starvation.add(processThread.getProgressBar()); Thread thread = new Thread(processThread); thread.start(); }
starvation.setLocationRelativeTo(null); starvation.setVisible(true); } }
|
- 该例子可能并不恰当,不过饥饿很好理解。
- 通过图形化展示比较有趣,参考:http://www.logicbig.com/tutorials/core-java-tutorial/java-multi-threading/thread-starvation/
活锁举例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
|
class BankAccount{ private String name; private double balance;
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public double getBalance() { return balance; }
public void setBalance(double balance) { this.balance = balance; }
public BankAccount(String name, double balance) { this.name = name; this.balance = balance; }
private Lock lock = new ReentrantLock();
boolean drawMoney(double amount){ if (this.lock.tryLock()){ try { TimeUnit.MILLISECONDS.sleep(100); balance -= amount; } catch (InterruptedException e) { e.printStackTrace(); return false; } finally { lock.unlock(); } return true; } return false; }
boolean saveMoney(double amount){ if (this.lock.tryLock()){ try { TimeUnit.MILLISECONDS.sleep(100); balance += amount; } catch (InterruptedException e) { e.printStackTrace(); return false; } finally { lock.unlock(); } return true; } return false; }
public boolean transfer(BankAccount destinationAccount, double amount){
if (this.drawMoney(amount)){ if (destinationAccount.saveMoney(amount)){ return true; } else { this.saveMoney(amount); } } return false; } }
class TransferMoney implements Runnable{ private BankAccount sourceAccount, destinationAccount; private double amount;
public TransferMoney(BankAccount sourceAccount, BankAccount destinationAccount, double amount) { this.sourceAccount = sourceAccount; this.destinationAccount = destinationAccount; this.amount = amount; }
@Override public void run() { while (!sourceAccount.transfer(destinationAccount, amount)){ System.out.println("转账失败!!!重试"); } System.out.printf("%s 余额 %.2f\t", sourceAccount.getName(), sourceAccount.getBalance()); System.out.printf("%s 余额 %.2f\t", destinationAccount.getName(), destinationAccount.getBalance()); System.out.println("转账成功!!!"); }
}
public class Main { public static void main(String[] args) throws Exception{ BankAccount accountMing = new BankAccount("小明", 10000.00); BankAccount accountRed = new BankAccount("小红", 20000.00); new Thread(new TransferMoney(accountMing, accountRed, 5000), "明->红").start(); TimeUnit.MILLISECONDS.sleep(50); new Thread(new TransferMoney(accountRed, accountMing, 5000), "红->明").start(); } }
|
- 活锁不易察觉,活锁是有可能自己解开的,一般情况下存在活锁时,CPU会增高,可以通过日志记录来排查活锁。
阿姆达尔定律、Gustafson
阿姆达尔定律:http://ifeve.com/amdahls-law/
参考https://www.cnblogs.com/756623607-zhang/p/6850848.html
Java线程
在Java中使用线程有两种方式
- 实现Runnable接口
- 继承Thread重写run方法
因为Java不支持多继承,多数情况下使用第一种方法。
Thread小析
Thread实际也实现了Runnable,同时拥有一个Runnable属性target,Thread实现的run方法实际是调用target的run方法。
1 2 3 4 5 6 7 8
| Thread.java
@Override public void run() { if (target != null) { target.run(); } }
|
在new Thread的时候,会调用init初始化:
- 设置线程名称
- 设置线程组(如未设置,默认为安全管理器SecurityManager所在的线程组,如果SecurityManager线程组不存在,则设置为当前线程所在的线程组)
- 权限校验(创建的线程对线程组的权限)
- 设置新线程预计堆栈大小(默认0表示忽略堆栈大小)
- 设置优先级、是否守护线程(默认同父线程)、目标类等相关操作
线程创建完成后,通过start启动线程。在启动线程前,可以通过setDaemon设置当前线程为守护线程。通过start启动后,最后会调用私有native方法start0,由虚拟机启动线程,调用run方法。在run方法执行完毕后,线程会自动关闭。
- 守护线程,当正在运行的线程都是守护线程时,Java 虚拟机退出。且守护线程创建的线程默认为守护线程,可在start前修改。
线程在创建到结束,总共有6个状态:
- NEW:线程创建完成,但还未启动。
- RUNNABLE:线程正在运行,在该状态时表示当前线程正在JVM中执行,但它可能正在等待操作系统中的其他资源,等待获取处理器调用
- BLOCKED:受阻塞并且正在等待监视器锁的某一线程的线程状态。处于受阻塞状态的某一线程正在等待监视器锁,以便进入一个同步的块/方法
- WAITING:不带超时线程等待状态,(不带超时的Object.wait、Thread.join、LockSupport.park都会导致线程进入该状态)
- TIMED_WAITING:带超时的等待状态(Thread.sleep、Object.wait、Thread.join、LockSupport.parkNanos、LockSupport.parkUntil都会导致线程进入该状态)
- TERMINATED:线程终止状态,表示线程已经执行完成。
Thread.interrupt()线程中断:
调用该方法后,并不是直接中断异常,如果线程内部未对中断进行处理,实际上中断无效。
1 2 3 4 5 6 7 8 9
| Thread thread = new Thread(() -> { int i = 1; for (int j = 1; j <= 10; j++) { i = i * j; } System.out.println(i); }); thread.start(); thread.interrupt();
|
上诉例子中,线程永远都会计算完成后才结束。
1 2 3 4 5 6 7 8 9 10 11 12 13
| Thread thread = new Thread(() -> { int i = 1; while (true){ if (Thread.interrupted()){ break; } i++; } System.out.println(i); }); thread.start(); TimeUnit.MILLISECONDS.sleep(1); thread.interrupt();
|
在该例子中,线程内部有对中断状态进行处理,当线程发出中断信号后,会退出循环。Thread.interrupted方法会清除当前线程的中断状态。
需要注意的是,当线程为休眠(sleep或者wait)状态时,如果发出中断信号,会导致抛出InterruptedException异常,必须捕获处理。
Thread.join()等待该线程终止:
当线程之间需要协同操作时,当前线程需要等待其他完成后才能继续操作,可以采用join方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Thread[] threads = new Thread[10]; threads[0] = Thread.currentThread(); for (int i = 1; i < threads.length; i++) { int j = i; threads[i] = new Thread(()->{ try { threads[j - 1].join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }, "Thread-" + i); } for (int i = 1; i < threads.length; i++) { threads[i].start(); }
|
在该例子中,每个线程都是等上一个线程完成后(join),在执行后续的操作。输出结果如下
1 2 3 4 5 6 7 8 9
| Thread-1 Thread-2 Thread-3 Thread-4 Thread-5 Thread-6 Thread-7 Thread-8 Thread-9
|
如果不使用join,每个线程执行顺序是被打乱的。实际上join内部是通过wait来实现。
Thread.yield()暂停当前正在执行的线程对象,并执行其他线程:
调用该方法后,表示使当前线程让出CPU,让CPU重新分配执行线程,该线程会与其他线程继续竞争CPU资源。
线程返回Callable
之前实现的Runnable没有返回值。如果需要线程执行完毕后由返回值,可以使用Callable。
通常Callable和Futura一起使用,Callable计算处理返回结果,通过Futura获取Callable的返回值。举例:
1 2 3 4 5 6 7
| FutureTask<String> futureTask = new FutureTask<>(() -> "call"); new Thread(futureTask).start();
TimeUnit.SECONDS.sleep(1);
System.out.println(futureTask.get());
|
采用FutureTask的原因是因为,FutureTask实现了Runnable、Future两个接口。
小析FutureTask
FutureTask有个Callable属性,由构造方法传入,或者由Executors生成Callable。outcome属性,存储Callable返回值
FutureTask拥有7个状态:
- NEW:新建
- COMPLETING:运行中
- NORMAL:正常
- EXCEPTIONAL:异常
- CANCELLED:取消
- INTERRUPTING:中断中
- INTERRUPTED:被中断
状态变化有4种情况:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
因为FutrueTask实现了Runnable接口,实际是通过JVM启动线程执行run方法。
FutrueTask的run方法实际上就是调用Callable的call方法,获取到方法返回值后,修改状态为COMPLETING,设置outcome为返回值,修改状态为NORMAL,通知其他等待的线程(在FutrueTask中有个链表维护等待的线程)。
FutrueTask通过get获取返回值,可以设置等待时间,之后会放入FutrueTask线程等待链表中。
FutrueTask其他方法如cancel之类的,可查看相关文档。
- 注:FutrueTask中状态的变化都是通过UNSAFE操作的,保证了线程的安全。
多线程异常
在使用Runnable的时候,如果线程内发生异常,并不会向主线程抛出异常,这样导致主线程无法感知子线程中异常。如果需要处理子线程异常,需要在run方法中try catch代码块。举例:
1 2 3 4 5 6 7 8 9
| try { new Thread(()->{ int i = 1/0; }).start(); }catch (Exception e){
System.out.println("发生异常"); }
|
1 2 3 4 5 6 7 8
| new Thread(() -> { try { int i = 1 / 0; } catch (Exception e) { System.out.println("发生异常"); } }).start();
|
在FutrueTask.run方法中,调用Callable.call被try catch包裹
1 2 3 4 5 6 7 8 9
| try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); }
|
catch后设置outcome为异常的值,同时设置当前状态为EXCEPTIONAL,当用户调用get时,判断当前状态,如果是异常状态,抛出该异常。
在开发过程中,经常需要统一处理相关异常。这可在Thread.setUncaughtExceptionHandler设置线程异常处理。如下:
1 2 3
| Thread thread = new Thread(() -> String.valueOf(1 / 0)); thread.setUncaughtExceptionHandler((t, e) -> System.out.println("发生了异常")); thread.start();
|