WHH

MY BLOG

Eureka Server (服务中心)

Eureka是一个服务注册和发现模块,基于 REST 的服务,用于定位服务,以实现云端中间层服务发现和故障转移。

阅读全文 »

一般使用Thread创建线程,如果频繁的创建销毁线程,系统资源比较浪费,线程启动调用的native方法,任务不能立即执行。
采用线程池可以通过重复利用已经创建的线程降低线程创建和销毁赵成的系统消耗,提高任务的响应速度,还可以对线程池中线程进行统一分配、调优和监控。

ThreadPoolExecutor

手动通过创建ThreadPoolExecutor创建线程池,其构造方法参数有:

  • corePoolSize:池中所保存的线程数,包括空闲线程。(不能小于0)
  • maximumPoolSize:池中允许的最大线程数。(不能小于等于0,不能小于corePoolSize)
  • keepAliveTime:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。(不能小于0)
  • unit:keepAliveTime 参数的时间单位
  • workQueue:执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。(不能为null)
  • threadFactory:执行程序创建新线程时使用的工厂。(不能为null)
  • handler:由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。(不能为null)

threadFactory可以默认使用Executors.defaultThreadFactory()、handler可以默认使用ThreadPoolExecutor.defaultHandler(使用的是内部类ThreadPoolExecutor.AbortPolicy)。

ThreadPoolExecutor有几个关键常量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//默认为RUNNING(11100000000000000000000000000000)
//通过计算3高位表示线程运行状态,低29位位线程池中线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//32 - 3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//00011111111111111111111111111111(后续用于计算线程状态以及线程数量)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
//运行状态存储在高阶
//11100000000000000000000000000000(32位)
private static final int RUNNING = -1 << COUNT_BITS;//接收新的任务并且也会处理已经提交等待的任务
//00000000000000000000000000000000 关闭状态
private static final int SHUTDOWN = 0 << COUNT_BITS;//不会接收新的任务,但会处理已经提交等待的任务
//00100000000000000000000000000000 停止状态
private static final int STOP = 1 << COUNT_BITS;//不接受新任务,不处理已经提交等待的任务,而且还会中断处理中的任务
//01000000000000000000000000000000 整理状态
private static final int TIDYING = 2 << COUNT_BITS;//所有的任务被终止,workCount为0,为此状态时将会调用terminated()方法
//01100000000000000000000000000000 结束状态
private static final int TERMINATED = 3 << COUNT_BITS;//terminated()调用完成
private static int ctlOf(int rs, int wc) { return rs | wc; }

提交任务使用execute:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 1. 如果运行的线程数corePoolSize,尝试开始一个新的线程,
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//获取线程池存储的值
int c = ctl.get();
//比较线程池中运行的线程数量与corePoolSize的值
if (workerCountOf(c) < corePoolSize) {
//如果少于corePoolSize,新增worker,成功直接返回,如果新增启动成功,直接返回
if (addWorker(command, true))
return;
//如果新增失败获取最新的ctl值
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
//如果当前线程状态为运行状态,把当前任务放入队列中

//获取最新状态
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//如果当前线程池不是运行状态,删除任务成功
//拒绝任务、删除队列中的任务
reject(command);
else if (workerCountOf(recheck) == 0)
//如果线程池中工作线程数量为0
//添加null,不占用corePoolSize,添加null的目的是为了启动一个新的worker线程用来可以消费队列中的任务
addWorker(null, false);
}
//如果队列已经满了无法放入
else if (!addWorker(command, false))
//尝试新增worker通过maximumPoolSize验证是否超过上限,这个时候启动新线程,但是不能超过maximumPoolSize
reject(command);
}
//core:true表示用corePoolSize作为范围,否则使用maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//获取线程池存储的值
int c = ctl.get();
//通过位运行计算当前线程池运行状态默认为111
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 在线程正常运行时二进制值为111,此时首位为1表示负数,所以rs小于等于SHUTDOWN(0)表示线程池此时为非RUNNING状态
// 如果状态不为SHUTDOWN或者firstTask不为空或者工作队列为空,直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//新增线程工作数量For + CAS
for (;;) {
//获取当前线程工作数量
int wc = workerCountOf(c);
//如果数量大于CAPACITY存储上限,或者运行数量是否超过设置最大值
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//操作设置最大值直接返回
return false;
//CAS尝试修改工作数量,成功直接跳出最外层循环(AtomicInteger.compareAndSet)
if (compareAndIncrementWorkerCount(c))
break retry;
//重新获取值
c = ctl.get(); // Re-read ctl
//CAS修改失败后,如果当前线程池状态已经发生了变化,跳出循环重试
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//新增工作线程完毕后执行,(只是修改ctl的值)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//Workder继承AQS,用于设置状态,通过线程池工厂创建线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取当前状态
int rs = runStateOf(ctl.get());
//当前状态小于0(运行状态)、或者当前状态为0且传入Runnable为null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//重新检查线程是否在运行
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
//如果worker中数量大于largestPoolSize(默认0),设置largestPoolSize为worker中数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
//如果添加成功,启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果线程启动失败
if (! workerStarted)
//处理添加失败的值,比如修改ctl中原本新增的值,从workers中删除之前添加的worker
addWorkerFailed(w);
}
return workerStarted;
}

execute其实就是修改ctl值,通过设置的值处理应该新启动线程还是放入队列中。

在创建Worker后,线程工厂创建线程启动调用Worker.run,之后调用的是runWorker方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取线程任务
Runnable task = w.firstTask;
w.firstTask = null;
//设置Worker状态为1
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果task为null直接从队列中获取task,这里就是之前addWorker(null, false)用处,启动一个线程从队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//中断处理
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//子类实现,当前类为空代码块
beforeExecute(wt, task);
Throwable thrown = null;
try {
//运行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//子类实现,当前类为空代码块
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理任务退出
processWorkerExit(w, completedAbruptly);
}
}

总结线程池新增任务:
在线程池中新增任务时:当前工作线程数量小于corePoolSize,启动新线程;如果当前线程数量大于corePoolSize,把线程放入任务队列中,放入成功后,验证当前工作线程数是否为0,如果为0启动一个新线程处理任务;如果线程放入队列中失败,表示任务放入速度大于任务完成速度,这个时候启动新线程,但是总数不能大于maximumPoolSize。

对于submit,其实就是把Callback封装成FutureTask,然后调用execute,返回封装的FutureTask

线程池队列

ArrayBlockingQueue:数组阻塞队列,需要指定大小
LinkedBlockingQueue:链表队列,默认为Integer.MAX_VALUE
PriorityBlockingQueue:优先级阻塞队列
SynchronousQueue:同步阻塞队列

线程池拒绝策略RejectedExecutionHandler

AbortPolicy(默认):直接抛出异常
CallerRunsPolicy:通过调用者所在线程执行任务,其实就是传递过来的Runnable.run,直接调用run方法
DiscardOldestPolicy:它放弃最旧的未处理请求(下一个任务),然后重试 execute;如果执行程序已关闭,则会丢弃该任务。也就是直接获取线程池中下一个任务然后不处理,重新执行execute
DiscardPolicy:不处理,丢弃当前未处理任务

在使用工具类Executors,基本上也是直接使用线程池,需要注意使用的队列。

在使用线程池的时候需要注意如果在线程中放入了数据也就是使用了ThreadLocal,因为线程在执行完任务后并不会销毁,可能其他任务获取到了当前线程设置的值,建议在使用完毕后删除ThreadLocal中的数据。

参考:

  • http://blog.csdn.net/xiaoxufox/article/details/52278508
  • http://cmsblogs.com

公司准备使用sonar作为代码质量管理,在代码中string不允许直接定义和环境相关的String,只能从配置文件中读取key,需要自己写一个sonar规则。
因为检测的是java文件,直接用java开发个自定义组件。
查看各种规则可以使用的语言:Support of Custom Rules by Language

编写规则的时候官方有demo:Sonar Java Custom Rules

阅读全文 »

通常在项目日志查看过程中都是直接登录服务器,找到服务器通过一些命令查看日志。有时候在定位问题的时候比较麻烦,不能快速找到问题日志。

这次尝试在本地搭建简单的ELK查询日志相关信息。

Elasticsearch:用于日志搜索查询
Logstash:用于日志收集
Kibana:用于页面展示

阅读全文 »

在平常工作中有时候需要查看方法的调用时间,或者需要知道某个业务的调用逻辑,需要些大量侵入式代码来完成。现在可以使用javaagent在main方法前执行,然后加载的类,通过字节码技术,在类中加入需要的代码。

阅读全文 »

jdk1.7中ConcurrentHashMap与jdk1.8中实现不一致,1.7中采用的事分段锁,此处使用的是1.8,利用CAS+Synchronized来保证并发更新的安全,当然底层采用数组+链表+红黑树的存储结构。

前期概念

在ConcurrentHashMap中有几个重要内部类:

  • Node:普通的节点,key-value形式,用于链表节点存储。
  • TreeNode:红黑树节点,Node子类。
  • TreeBin:红黑树,通过传入TreeNode构造一颗红黑树。
  • ForwardingNode:Node子类,辅助节点,用于扩容操作。

在ConcurrentHashMap中,最开始是采用链表存储数据,当数据过长(TREEIFY_THRESHOLD默认长度8),就会转换为红黑树来处理,将原有Node节点包装成TreeNode放入TreeBin中,然后由TreeBin完成红黑树的转换。

阅读全文 »

在使用ReentrantReadWriteLock时,获取写锁时,不能存在任何其他锁。如果存在读超大与写,可能会出现获取写锁线程一直处在等待状态导致饥饿。
在JDK1.8中引入StampedLock,StampedLock控制锁有3种状态:写,读,乐观读。

所谓的乐观读模式,也就是若读的操作很多,写的操作很少的情况下,你可以乐观地认为,写入与读取同时发生几率很少,因此不悲观地使用完全的读取锁定,程序可以查看读取资料之后,是否遭到写入执行的变更,再采取后续的措施(重新读取变更信息,或者抛出异常) ,这一个小小改进,可大幅度提高程序的吞吐量!!

阅读全文 »

在多线程操作过程中,如果写少读多,采用ReentrantLock(排他锁),会比较浪费资源,这种情况下可以采用Java读写锁。

ReadWriteLock

ReadWriteLock接口,定义了readLock、writeLock

ReentrantReadWriteLock

重入读写锁,ReadWriteLock子类。对于ReadWriteLock,内部主要有:

  • ReentrantReadWriteLock.ReadLock readerLock:读锁,Lock接口实现
  • ReentrantReadWriteLock.WriteLock writerLock:写锁,Lock接口实现
  • Sync sync:同步方法块,抽象接口AbstractQueuedSynchronizer子类,同时有NonfairSync、FairSync两个子类。
阅读全文 »

在Java线程基础中使用过synchronized,在JDK5一起,同步都是基于synchronized,在场景非常复杂的地方,使用synchronized不方便,JDK5引入了Lock。在包java.util.concurrent.locks中就有锁相关的类。

AbstractOwnableSynchronizer

抽象独占同步锁。提供设置当前拥有独占访问的线程,获取设置的独占线程。
只能子类构造。

AbstractQueuedSynchronizer

抽象队列同步,继承AbstractOwnableSynchronizer。通过队列先进先出来实现等待队列的阻塞,内部维护一个线程链表Node。在获取锁失败后,会生成Node节点,并放入链表末尾,直到等待超时或者线程中断或被上一个节点唤醒。其节点等待状态waitStatus有:

  • 0:节点刚被初始化状态,或者可能已经完成状态
  • CANCELLED(1):取消状态,需要取消该节点时设置的状态
  • SIGNAL(-1):节点等待状态
  • CONDITION(-2):节点在等待队列中,在调用Condition.signal()之后会变为SIGNAL状态进入等待获取锁队列
  • PROPAGATE(-3):共享同步状态

AbstractQueuedSynchronizer内部通过Unsafe.compareAndSet(原子操作int)来操作内存,保证线程的同步。其他API可查看相关文档。

AbstractQueuedLongSynchronizer

同AbstractQueuedSynchronizer,只不过AbstractQueuedLongSynchronizer内部通过long字段来实现原子操作。当创建需要 64 位状态的多级别锁和屏障等同步器时使用。

阅读全文 »
0%