一般使用Thread创建线程,如果频繁的创建销毁线程,系统资源比较浪费,线程启动调用的native方法,任务不能立即执行。 采用线程池可以通过重复利用已经创建的线程降低线程创建和销毁赵成的系统消耗,提高任务的响应速度,还可以对线程池中线程进行统一分配、调优和监控。
ThreadPoolExecutor 手动通过创建ThreadPoolExecutor创建线程池,其构造方法参数有:
corePoolSize:池中所保存的线程数,包括空闲线程。(不能小于0)
maximumPoolSize:池中允许的最大线程数。(不能小于等于0,不能小于corePoolSize)
keepAliveTime:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。(不能小于0)
unit:keepAliveTime 参数的时间单位
workQueue:执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。(不能为null)
threadFactory:执行程序创建新线程时使用的工厂。(不能为null)
handler:由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。(不能为null)
threadFactory可以默认使用Executors.defaultThreadFactory()、handler可以默认使用ThreadPoolExecutor.defaultHandler(使用的是内部类ThreadPoolExecutor.AbortPolicy)。
ThreadPoolExecutor有几个关键常量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int ctlOf (int rs, int wc) { return rs | wc; }
提交任务使用execute:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); } private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
execute其实就是修改ctl值,通过设置的值处理应该新启动线程还是放入队列中。
在创建Worker后,线程工厂创建线程启动调用Worker.run,之后调用的是runWorker方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
总结线程池新增任务: 在线程池中新增任务时:当前工作线程数量小于corePoolSize,启动新线程;如果当前线程数量大于corePoolSize,把线程放入任务队列中,放入成功后,验证当前工作线程数是否为0,如果为0启动一个新线程处理任务;如果线程放入队列中失败,表示任务放入速度大于任务完成速度,这个时候启动新线程,但是总数不能大于maximumPoolSize。
对于submit,其实就是把Callback封装成FutureTask,然后调用execute,返回封装的FutureTask
线程池队列 ArrayBlockingQueue:数组阻塞队列,需要指定大小 LinkedBlockingQueue:链表队列,默认为Integer.MAX_VALUE PriorityBlockingQueue:优先级阻塞队列 SynchronousQueue:同步阻塞队列
线程池拒绝策略RejectedExecutionHandler AbortPolicy(默认):直接抛出异常 CallerRunsPolicy:通过调用者所在线程执行任务,其实就是传递过来的Runnable.run,直接调用run方法 DiscardOldestPolicy:它放弃最旧的未处理请求(下一个任务),然后重试 execute;如果执行程序已关闭,则会丢弃该任务。也就是直接获取线程池中下一个任务然后不处理,重新执行execute DiscardPolicy:不处理,丢弃当前未处理任务
在使用工具类Executors,基本上也是直接使用线程池,需要注意使用的队列。
在使用线程池的时候需要注意如果在线程中放入了数据也就是使用了ThreadLocal,因为线程在执行完任务后并不会销毁,可能其他任务获取到了当前线程设置的值,建议在使用完毕后删除ThreadLocal中的数据。
参考:
http://blog.csdn.net/xiaoxufox/article/details/52278508
http://cmsblogs.com