工作问题-并发事务

现在开发过程中有个需求,商品没货了需要补货,简略版如下:
1、发现商品没货,生成补货单
2、如果现在有未处理补货单,先删除当前补货单,之后生成新的补货单,如果当前的补货单已经开始处理,那么忽略
3、提供API接口供修改状态

现在数据库设计,状态分为:0:待处理,1:已经认领,2:处理完毕,4:取消
简略版数据库如下:

1
2
3
4
5
6
7
8
9
10
11
create table test.item
(
id int auto_increment
primary key,
sku_code varchar(20) not null,
num int not null,
status int not null,
constraint item_id_uindex
unique (id)
)
;

项目验证

创建Spring项目,配置SpringMVC,Spring、Mybatis

数据库操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<mapper namespace="com.whh.mapper.ItemMapper">
<!--查询列表-->
<!--通过ID查询数据-->
<select id="query" resultType="com.whh.vo.SearchItemVo">
select id, sku_code as skuCode,num, status from item
<trim prefix="where" suffixOverrides="and">
<if test="id != null">
id = #{id} and
</if>
<if test="skuCode != null and skuCode != ''">
sku_code = #{skuCode} and
</if>
<if test="num != null">
num = #{num} and
</if>
<if test="status != null">
status = #{status} and
</if>
<if test="notStatus != null">
status != #{notStatus} and
</if>
</trim>
</select>
<!--新增数据,插入对象ID会自动设置为自增的ID-->
<insert id="insert" parameterType="com.whh.pojo.Item" useGeneratedKeys="true" keyProperty="id">
INSERT INTO item (sku_code,num, status) VALUES (#{skuCode},#{num}, #{status})
</insert>
<!--关系数据-->
<update id="update" parameterType="com.whh.pojo.Item">
update item set
<trim suffixOverrides=",">
<if test="skuCode != null">
sku_code = #{skuCode},
</if>
<if test="num != null">
num = #{num},
</if>
<if test="status != null">
status = #{status}
</if>
</trim>
where id = #{id}
</update>
</mapper>

先展示错误的示范:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Transactional(rollbackFor = Exception.class)
public int createTask(String skuCode, Integer num) {
int result = 0;
//查询数据库是否存在
SearchItemVo params = new SearchItemVo();
params.setSkuCode(skuCode);
params.setNotStatus(4);
List<Item> itemList = itemMapper.query(params);
//数据库不存在时insert
if (itemList == null || itemList.isEmpty()) {
Item item = new Item(skuCode, num, 0);
result = itemMapper.insert(item);
} else {
Item item = itemList.get(0);
//数据库存在,且状态为0,设置状态为4,同时生成新的数据
item.setStatus(4);
result = itemMapper.update(item);
if (result == 1) {
item.setStatus(0);
item.setNum(num);
result = itemMapper.insert(item);
}
}
return result;
}

编写Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:applicationContext.xml")
public class ItemServiceTest {
@Autowired
private ItemService itemService;
@Test
public void createTask() throws Exception {
Random random = new Random();
for (int i = 0; i < 10; i++) {
int num = random.nextInt(100);
itemService.createTask("K000001", num);

}
}
}

启动Test测试,查询数据库数据。

数据库中同一个商品非4状态的只有一条。
这样存在一个问题,如果出现并发访问,那么数据库中数据就会查询问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
public void concurrentCreateTask() throws Exception {
final String skuCode = "K000001";
int threadNum = 10;
Thread[] threads = new Thread[10];
CountDownLatch downLatch = new CountDownLatch(threadNum);
//设置多线程创建
IntStream.range(0, threadNum).forEach((i) -> threads[i] = new Thread(() -> {
Random random = new Random();
IntStream.range(0, 100).forEach((j) -> itemService.createTask(skuCode, random.nextInt(10)));
downLatch.countDown();
}));
//启动线程
Arrays.stream(threads).forEach((Thread::start));
downLatch.await();

//查询验证最终生成的非4数量
SearchItemVo searchItemVo = new SearchItemVo();
searchItemVo.setSkuCode(skuCode);
searchItemVo.setNotStatus(4);
List<Item> itemList = itemMapper.query(searchItemVo);
assert itemList != null && itemList.size() == 0;
}

启动Test开始测试,结论是肯定的,测试不通过。

分析原因

因为在代码中,我们是先查询的数据,之后在做的判断,在并发访问时,可能出现多线程同时查询,都为发现数据库无数据,之后执行后续的插入逻辑。后面判断状态也是同理。

解决办法:
INSERT ... ON DUPLICATE KEY UPDATE,表示如果存在就更新,如果不存在就插入,刚刚好这需求之前的要求。
但是此处并不适合这个,因为ON DUPLICATE KEY UPDATE判断重复是依据唯一索引,但是在数据库中,sku_codestatus无法建立唯一索引,因为可能会出现一个sku_code对应多个重复的status

那先解决如果存在在插入数据库。
改进sql,在mapper中新增方法

1
2
3
4
5
6
7
8
9
10
<insert id="insertNotExist" parameterType="com.whh.pojo.Item" useGeneratedKeys="true" keyProperty="id">
INSERT INTO item (sku_code, num, status) SELECT
#{skuCode},
#{num},
#{status}
FROM DUAL
WHERE NOT exists(SELECT id
FROM item
WHERE sku_code = #{skuCode} AND status != 4)
</insert>
  • DUAL为临时表。

通过查询skuCodestatus != 4,使用not exists,获取返回的第一条数据,然后进行插入。

在Mysql中not exists只会返回true或者false,如果返回的是true表示不存在该sku_codestatus不为4的数据;反之则返回false,那么将无法插入数据。
测试该方法是生效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
@Transactional
@Rollback
public void insertNotExist() throws Exception {
String skuCode = "K000002";
Item item = new Item(skuCode, 10, 0);
//判断第一次插入是否成功
assert itemMapper.insertNotExist(item) != 0;
//判断第二次插入是否从
assert itemMapper.insertNotExist(item) == 0;
SearchItemVo searchItemVo = new SearchItemVo();
searchItemVo.setNotStatus(4);
searchItemVo.setSkuCode(skuCode);
//判断数据中非4的数据是否只有1条
assert itemMapper.query(searchItemVo).size() == 1;
}

运行单元测试,验证该方法是生效的。

解决了数据只有不存在时才插入。
下一步就是如果存在数据状态为0,那么需要修改为4
Mapper新增update操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<update id="updateInfo">
UPDATE item set
<trim suffixOverrides=",">
<if test="newInfo.skuCode != null">
sku_code = #{newInfo.skuCode},
</if>
<if test="newInfo.num != null">
num = #{newInfo.num},
</if>
<if test="newInfo.status != null">
status = #{newInfo.status}
</if>
</trim>
<trim prefix="where" suffixOverrides="and">
<if test="oldInfo.id != null">
id = #{oldInfo.id} and
</if>
<if test="oldInfo.skuCode != null">
sku_code = #{oldInfo.skuCode} and
</if>
<if test="oldInfo.num != null">
num = #{oldInfo.num} and
</if>
<if test="oldInfo.status != null">
status = #{oldInfo.status} and
</if>
<if test="oldInfo.id == null and oldInfo.skuCode == null and oldInfo.num == null and oldInfo.status">
false
</if>
</trim>
</update>
  • 注:最后设置如果传入的查询条件为null,那么就不做更新,避免因为没有查询条件导致更新整张表。

在一般情况下,更新时只是通过主键直接更新值,这样在并发情况下会出现多个线程更新,且都更新成功。
如:现在一个线程更新状态为4,生成新数据,另一个线程更新状态为1,设置为认领,这样就会导致上一个线程更新后,下一个线程又直接修改了值,导致数据出现错误。
所以在处理这种情况时需要加入状态进行判断(CAS)。

改进代码

修改之前逻辑为新代码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Transactional(rollbackFor = Exception.class)
public int createTaskImprove(String skuCode, Integer num) {
int result = 0;
Item item = new Item(skuCode, num, 0);
result = itemMapper.insertNotExist(item);
if (result == 0) {
//构造旧数据用于查询,
Item oldItem = new Item(skuCode, null, 0);
Item newItem = new Item(skuCode, null, 4);
result = itemMapper.updateInfo(oldItem, newItem);
if (result != 0){
result = itemMapper.insertNotExist(item);
}
}
return result;
}

开启并发测试,理论上是应该OK了,但是启动单元测试后,出现了错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
Exception in thread "Thread-5" Exception in thread "Thread-9" Exception in thread "Thread-11" org.springframework.dao.DeadlockLoserDataAccessException: 
### Error updating database. Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
### The error may involve com.whh.mapper.ItemMapper.insertNotExist-Inline
### The error occurred while setting parameters
### SQL: INSERT INTO item (sku_code, num, status) SELECT ?, ?, ? FROM DUAL WHERE NOT exists(SELECT id FROM item WHERE sku_code = ? AND status != 4)
### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
; SQL []; Deadlock found when trying to get lock; try restarting transaction; nested exception is com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:263)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:73)
at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:74)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:421)
at com.sun.proxy.$Proxy20.insert(Unknown Source)
at org.mybatis.spring.SqlSessionTemplate.insert(SqlSessionTemplate.java:254)
at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:52)
at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:53)
at com.sun.proxy.$Proxy24.insertNotExist(Unknown Source)
at com.whh.service.ItemService.createTaskImprove(ItemService.java:66)
at com.whh.service.ItemService$$FastClassBySpringCGLIB$$a83be025.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:651)
at com.whh.service.ItemService$$EnhancerBySpringCGLIB$$bc7432b6.createTaskImprove(<generated>)
at com.whh.service.ItemServiceTest.lambda$null$3(ItemServiceTest.java:80)
at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)
at com.whh.service.ItemServiceTest.lambda$null$4(ItemServiceTest.java:80)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:409)
at com.mysql.jdbc.Util.getInstance(Util.java:384)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1064)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4232)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4164)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2615)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2776)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2838)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2082)
at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1307)
at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_execute(FilterChainImpl.java:2931)
at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_execute(FilterEventAdapter.java:440)
at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_execute(FilterChainImpl.java:2929)
at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_execute(FilterEventAdapter.java:440)
at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_execute(FilterChainImpl.java:2929)
at com.alibaba.druid.proxy.jdbc.PreparedStatementProxyImpl.execute(PreparedStatementProxyImpl.java:131)
at com.alibaba.druid.pool.DruidPooledPreparedStatement.execute(DruidPooledPreparedStatement.java:493)
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.ibatis.logging.jdbc.PreparedStatementLogger.invoke(PreparedStatementLogger.java:59)
at com.sun.proxy.$Proxy27.execute(Unknown Source)
at org.apache.ibatis.executor.statement.PreparedStatementHandler.update(PreparedStatementHandler.java:45)
at org.apache.ibatis.executor.statement.RoutingStatementHandler.update(RoutingStatementHandler.java:73)
at org.apache.ibatis.executor.SimpleExecutor.doUpdate(SimpleExecutor.java:49)
at org.apache.ibatis.executor.BaseExecutor.update(BaseExecutor.java:115)
at org.apache.ibatis.executor.CachingExecutor.update(CachingExecutor.java:75)
at org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:170)
at org.apache.ibatis.session.defaults.DefaultSqlSession.insert(DefaultSqlSession.java:157)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:408)
... 15 more

从错误上看,出现了死锁,通过命令show engine innodb status;查询数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159

=====================================
2018-04-01 19:17:11 700003efe000 INNODB MONITOR OUTPUT
=====================================
Per second averages calculated from the last 32 seconds
-----------------
BACKGROUND THREAD
-----------------
srv_master_thread loops: 84 srv_active, 0 srv_shutdown, 24429 srv_idle
srv_master_thread log flush and writes: 24513
----------
SEMAPHORES
----------
OS WAIT ARRAY INFO: reservation count 474
OS WAIT ARRAY INFO: signal count 548
Mutex spin waits 10228, rounds 38291, OS waits 49
RW-shared spins 1411, rounds 29858, OS waits 247
RW-excl spins 201, rounds 6233, OS waits 70
Spin rounds per wait: 3.74 mutex, 21.16 RW-shared, 31.01 RW-excl
------------------------
LATEST DETECTED DEADLOCK
------------------------
2018-04-01 19:17:02 700003fca000
*** (1) TRANSACTION:
TRANSACTION 170407, ACTIVE 0 sec setting auto-inc lock
mysql tables in use 2, locked 2
LOCK WAIT 4 lock struct(s), heap size 1184, 300 row lock(s)
MySQL thread id 2659, OS thread handle 0x700003cde000, query id 49518 localhost 127.0.0.1 root executing
INSERT INTO item (sku_code, num, status) SELECT
'K000001',
2,
0
FROM DUAL
WHERE NOT exists(SELECT id
FROM item
WHERE sku_code = 'K000001' AND status != 4)
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
TABLE LOCK table `test`.`item` trx id 170407 lock mode AUTO-INC waiting
*** (2) TRANSACTION:
TRANSACTION 170402, ACTIVE 0 sec inserting
mysql tables in use 2, locked 2
7 lock struct(s), heap size 1184, 303 row lock(s)
MySQL thread id 2660, OS thread handle 0x700003fca000, query id 49503 localhost 127.0.0.1 root executing
INSERT INTO item (sku_code, num, status) SELECT
'K000001',
4,
0
FROM DUAL
WHERE NOT exists(SELECT id
FROM item
WHERE sku_code = 'K000001' AND status != 4)
*** (2) HOLDS THE LOCK(S):
TABLE LOCK table `test`.`item` trx id 170402 lock mode AUTO-INC
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 96 page no 3 n bits 368 index `PRIMARY` of table `test`.`item` trx id 170402 lock_mode X insert intention waiting
Record lock, heap no 1 PHYSICAL RECORD: n_fields 1; compact format; info bits 0
0: len 8; hex 73757072656d756d; asc supremum;;

*** WE ROLL BACK TRANSACTION (1)
------------
TRANSACTIONS
------------
Trx id counter 170788
Purge done for trx's n:o < 170787 undo n:o < 0 state: running but idle
History list length 1521
LIST OF TRANSACTIONS FOR EACH SESSION:
---TRANSACTION 170407, not started
MySQL thread id 2659, OS thread handle 0x700003cde000, query id 49518 localhost 127.0.0.1 root cleaning up
---TRANSACTION 170404, not started
MySQL thread id 2661, OS thread handle 0x700003dee000, query id 49513 localhost 127.0.0.1 root cleaning up
---TRANSACTION 170408, not started
MySQL thread id 2660, OS thread handle 0x700003fca000, query id 49546 localhost 127.0.0.1 root cleaning up
---TRANSACTION 170296, not started
MySQL thread id 2666, OS thread handle 0x700003e76000, query id 49252 localhost 127.0.0.1 root cleaning up
---TRANSACTION 170401, not started
MySQL thread id 2658, OS thread handle 0x700003d66000, query id 49500 localhost 127.0.0.1 root cleaning up
---TRANSACTION 170311, not started
MySQL thread id 2667, OS thread handle 0x700003f42000, query id 49284 localhost 127.0.0.1 root cleaning up
---TRANSACTION 170787, not started
MySQL thread id 2663, OS thread handle 0x700003d22000, query id 50365 localhost 127.0.0.1 root cleaning up
---TRANSACTION 170403, not started
MySQL thread id 2662, OS thread handle 0x700003f86000, query id 49512 localhost 127.0.0.1 root cleaning up
---TRANSACTION 170320, not started
MySQL thread id 2665, OS thread handle 0x700003eba000, query id 49308 localhost 127.0.0.1 root cleaning up
---TRANSACTION 170405, not started
MySQL thread id 2664, OS thread handle 0x700003daa000, query id 49514 localhost 127.0.0.1 root cleaning up
---TRANSACTION 168759, not started
MySQL thread id 2507, OS thread handle 0x700003efe000, query id 50372 localhost 127.0.0.1 root init
show engine innodb status
---TRANSACTION 169405, not started
MySQL thread id 2498, OS thread handle 0x700003e32000, query id 46830 localhost 127.0.0.1 root cleaning up
--------
FILE I/O
--------
I/O thread 0 state: waiting for i/o request (insert buffer thread)
I/O thread 1 state: waiting for i/o request (log thread)
I/O thread 2 state: waiting for i/o request (read thread)
I/O thread 3 state: waiting for i/o request (read thread)
I/O thread 4 state: waiting for i/o request (read thread)
I/O thread 5 state: waiting for i/o request (read thread)
I/O thread 6 state: waiting for i/o request (write thread)
I/O thread 7 state: waiting for i/o request (write thread)
I/O thread 8 state: waiting for i/o request (write thread)
I/O thread 9 state: waiting for i/o request (write thread)
Pending normal aio reads: 0 [0, 0, 0, 0] , aio writes: 0 [0, 0, 0, 0] ,
ibuf aio reads: 0, log i/o's: 0, sync i/o's: 0
Pending flushes (fsync) log: 0; buffer pool: 0
631 OS file reads, 5249 OS file writes, 4212 OS fsyncs
0.00 reads/s, 0 avg bytes/read, 11.03 writes/s, 7.66 fsyncs/s
-------------------------------------
INSERT BUFFER AND ADAPTIVE HASH INDEX
-------------------------------------
Ibuf: size 1, free list len 0, seg size 2, 0 merges
merged operations:
insert 0, delete mark 0, delete 0
discarded operations:
insert 0, delete mark 0, delete 0
Hash table size 276671, node heap has 1 buffer(s)
7.12 hash searches/s, 1.25 non-hash searches/s
---
LOG
---
Log sequence number 5988246706
Log flushed up to 5988246706
Pages flushed up to 5988246706
Last checkpoint at 5988246706
0 pending log writes, 0 pending chkp writes
3847 log i/o's done, 7.38 log i/o's/second
----------------------
BUFFER POOL AND MEMORY
----------------------
Total memory allocated 137363456; in additional pool allocated 0
Dictionary memory allocated 174719
Buffer pool size 8191
Free buffers 7672
Database pages 518
Old database pages 209
Modified db pages 0
Pending reads 0
Pending writes: LRU 0, flush list 0, single page 0
Pages made young 0, not young 0
0.00 youngs/s, 0.00 non-youngs/s
Pages read 446, created 72, written 2372
0.00 reads/s, 0.03 creates/s, 9.19 writes/s
Buffer pool hit rate 1000 / 1000, young-making rate 0 / 1000 not 0 / 1000
Pages read ahead 0.00/s, evicted without access 0.00/s, Random read ahead 0.00/s
LRU len: 518, unzip_LRU len: 0
I/O sum[0]:cur[0], unzip sum[0]:cur[0]
--------------
ROW OPERATIONS
--------------
0 queries inside InnoDB, 0 queries in queue
0 read views open inside InnoDB
Main thread id 123145362894848, state: sleeping
Number of rows inserted 2531, updated 2545, deleted 2095, read 1304134
3.62 inserts/s, 3.62 updates/s, 0.00 deletes/s, 4112.25 reads/s
----------------------------
END OF INNODB MONITOR OUTPUT
============================

重现deadlock。

1
2
3
4
5
6
7
8
INSERT INTO item (sku_code, num, status) SELECT
'K000001',
8,
0
FROM DUAL
WHERE NOT exists(SELECT id
FROM item
WHERE sku_code = 'K000001' AND status != 4);

现在数据库中没有改条数据,启动3个session,设置事务不自动提交。步骤:
1、session1,执行sql,输入插入一条数据,但是事务未提交
2、seesion2,执行sql,
3、session3,执行sql,
4、session1回滚,这时就会发现session3中出现了deadlock。

参考:两个INSERT发生死锁原因剖析

解决办法,修改SQL如下:

1
2
3
4
5
6
7
8
9
10
<insert id="insertNotExist" parameterType="com.whh.pojo.Item" useGeneratedKeys="true" keyProperty="id">
INSERT DELAYED INTO item (sku_code, num, status) SELECT
#{skuCode},
#{num},
#{status}
FROM DUAL
WHERE NOT exists(SELECT id
FROM item
WHERE sku_code = #{skuCode} AND status != 4 for update)
</insert>

在select的后面添加for update,用于锁住数据,避免发生竞争导致死锁。这里通过上述启动3个事务方便执行,然后回滚的例子,并不会出现deadlock。

  • 此处需要添加索引,因为for update默认通过索引来锁数据。

重新运行单元测试,启动正常。

此处记录一个未解问题:
之前想优化下此处逻辑,先直接更新状态0–>4,然后在插入。如下:

1
2
3
4
5
6
7
8
9
10
11
@Transactional()
public int createTaskImprove1(String skuCode, Integer num) {
int result = 0;
Item oldItem = new Item(skuCode, null, 0);
Item newItem = new Item(skuCode, null, 4);
result = itemMapper.updateInfo(oldItem, newItem);

Item item = new Item(skuCode, num, 0);
result = itemMapper.insertNotExist(item);
return result;
}

此处在并发的时候是有问题的(业务也有问题,会出现数据丢失的情况),还是会出现死锁,难道是因为使用update的时候数据库的锁和insert ... select设置的锁,导致互锁?暂时还未弄明白。

解决上述问题后,又出现一个问题,当并发执行insert ... select时,其他线程会等待该线程事务提交后才会执行,这样就会导致效率很低。
通过sql

1
2
3
4
5
6
7
8
9
10
select r.trx_isolation_level, r.trx_id waiting_trx_id,r.trx_mysql_thread_id waiting_trx_thread,
r.trx_state waiting_trx_state,lr.lock_mode waiting_trx_lock_mode,lr.lock_type waiting_trx_lock_type,
lr.lock_table waiting_trx_lock_table,lr.lock_index waiting_trx_lock_index,r.trx_query waiting_trx_query,
b.trx_id blocking_trx_id,b.trx_mysql_thread_id blocking_trx_thread,b.trx_state blocking_trx_state,
lb.lock_mode blocking_trx_lock_mode,lb.lock_type blocking_trx_lock_type,lb.lock_table blocking_trx_lock_table,
lb.lock_index blocking_trx_lock_index,b.trx_query blocking_query
from information_schema.innodb_lock_waits w inner join information_schema.innodb_trx b on b.trx_id=w.blocking_trx_id
inner join information_schema.innodb_trx r on r.trx_id=w.requesting_trx_id
inner join information_schema.innodb_locks lb on lb.lock_trx_id=w.blocking_trx_id
inner join information_schema.innodb_locks lr on lr.lock_trx_id=w.requesting_trx_id

这样可以查询出等待的锁,可以得出锁住的是之前创建的索引。

在测试的时候,因为每次都是直接删除掉数据库数据后进行操作的,这样就会出现上述的等待锁的情况。后来发现当我数据库中存在K000006数据时,两个事物分别插入K000001K000007并不会出现锁等待,提交事务后数据都插入了数据库,后来实验分别插入K000006两边的数据,结果都未发现锁等待的情况。这里猜测应该是数据库中索引使用的是B+Tree结构。这时删掉之前建立的索引,都会导致锁等待。

分布式流水号

在应用生成分布式流水号,有两种方式,一种是通过数据表生成,一种是通过缓存生成。

Mysql生成流水号

在原先生成分布式流水号是通过先select ... for update来锁定数据库中某条值,之后对该值进行更新操作,这种就会出现一个问题,如果刚刚开始数据库就没有值,并发访问时,会出现同时对数据进行插入操作。如果建立了唯一索引数据并不会出现问题。

设计数据库表如下:

1
2
3
4
5
6
7
8
9
10
11
create table test.serial_number
(
group_code varchar(10) null comment '组,用于区分业务',
group_key varchar(20) not null comment '依据key生成流水号',
scope varchar(20) not null comment '范围,如20180808表示天,2018080808表示小时',
sign varchar(5) not null comment '重新计算标示,Y年,M月,D天,H小时,G全局',
num int default '0' not null comment '流水号',
constraint serial_number_group_code_group_key_scope_step_uindex
unique (group_code, group_key, scope, sign)
)
;

创建表,同时对group_code、group_key、scope、sign、num建立唯一索引。
mapper创建db操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<insert id="insertWithUpdateIncr">
INSERT INTO serial_number (group_code, group_key, scope, sign, num)
VALUES (#{serialNumber.groupCode}, #{serialNumber.groupKey}, #{serialNumber.scope}, #{serialNumber.sign}, #{inrc})
ON DUPLICATE KEY UPDATE num = num + #{inrc}
</insert>

<select id="selectNum" parameterType="com.whh.pojo.SerialNumber" resultType="java.lang.Integer">
SELECT num
FROM serial_number
WHERE
group_code = #{groupCode}
AND group_key = #{groupKey}
AND scope = #{scope}
AND sign = #{sign}
</select>

insertWithUpdateIncr用来对数据进行更新和插入操作,如果存在指定数据,那么就对数据进行更新,更新的值为传入的值为原始值+传入值,如果不存在该数据,那么对该数据进行插入操作。(数据库默认num为0,为了避免生成默认值,需要在插入时指定传入的值,可以避免数据获取失败)
selectNum用于插入更新后,查询数据库最新值。

创建映射对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class SerialNumber {
private String groupCode;
private String groupKey;
private String scope;
private SerialSignEnum sign;
private Integer num;
}
public enum SerialSignEnum {
G(null) {
@Override
public String scope() {
return "";
}
},
Y(366 * 24 * 60 * 60) {
@Override
public String scope() {
return new SimpleDateFormat("yyyy").format(new Date());
}
},
M(32 * 24 * 60 * 6) {
@Override
public String scope() {
return new SimpleDateFormat("yyyyMM").format(new Date());
}

},
D(24 * 60 * 60) {
@Override
public String scope() {
return new SimpleDateFormat("yyyyMMdd").format(new Date());
}
},
H(60 * 60) {
@Override
public String scope() {
return new SimpleDateFormat("yyyyMMdd").format(new Date());
}
};

private Integer expire;

SerialSignEnum(Integer expire) {
this.expire = expire;
}

public Integer getExpire() {
return expire;
}

public void setExpire(Integer expire) {
this.expire = expire;
}

public abstract String scope();
}

创建service进行操作

1
2
3
4
5
6
7
8
9
10
11
@Service
public class SerialNumberService {
@Autowired
private SerialNumberMapper serialNumberMapper;

@Transactional(propagation = Propagation.REQUIRES_NEW,rollbackFor = Exception.class)
public Integer mysqlSerialNumber(SerialNumber serialNumber, int inrc) {
serialNumberMapper.insertWithUpdateIncr(serialNumber, inrc);
return serialNumberMapper.selectNum(serialNumber);
}
}

如上,直接对数据库数据进行插入更新操作,之后对数据进行查询,获取最新值。
这种在并发的时候也保证数据正常是因为之前设置了唯一索引,在对数据进行插入更新操作时会对当前数据进行加锁,此时如果存在其他事务中对该数据进行操作就会进入锁等待,直到上个事务释放锁,所以之后的查询操作保证了数据为当前更新后的最新值。如果是对不同数据进行操作,不会发生锁竞争。
注:此处事务传播级别采用的是Propagation.REQUIRES_NEW,创建一个于原先事务无关的事务,这样避免了因调用端事务执行时间过长导致其他线程一直等待。这种缺点在于如果在调用生成流水号后调用方出现异常,那么会导致序列化不连续。

编写单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:applicationContext.xml")
public class SerialNumberServiceTest {
@Autowired
private SerialNumberService serialNumberService;
@Autowired
private SerialNumberMapper serialNumberMapper;

@Test
public void mysqlSerialNumber() throws Exception {
final SerialNumber serialNumber = new SerialNumber("ITEM", "1001_K000011", SerialSignEnum.D);
final int incr = 1;
Integer oldNum = serialNumberMapper.selectNum(serialNumber);
if (oldNum == null)oldNum =0;
serialNumberService.mysqlSerialNumber(serialNumber, incr);
assert serialNumberMapper.selectNum(serialNumber) == oldNum + incr;

}

@Test
public void mysqlSerialNumberSingleKey() throws Exception {
long startTime = System.currentTimeMillis();
final SerialNumber serialNumber = new SerialNumber("ITEM", "1001_K00005", SerialSignEnum.D);
Integer oldNum = serialNumberMapper.selectNum(serialNumber);
final int incr = 10;
final int threadNum = 10;
final int threadTime = 100;
Thread[] threads = new Thread[threadNum];
CountDownLatch downLatch = new CountDownLatch(threadNum);

Integer[] serialNums = new Integer[threadNum * threadTime];
IntStream.range(0, threadNum).forEach((i) -> threads[i] = new Thread(() -> {
//线程并发执行,返回值存入集合,空间换时间
IntStream.range(0, threadTime).forEach((j) -> serialNums[i * threadTime + j] = serialNumberService.mysqlSerialNumber(serialNumber, incr));
downLatch.countDown();
}));
Arrays.stream(threads).forEach(Thread::start);
downLatch.await();
System.out.println("耗时:" + (System.currentTimeMillis() - startTime));
//验证最终值是否和理想一致
assert serialNumberMapper.selectNum(serialNumber) == oldNum + threadNum * threadTime * incr;

//验证是否重复生成
Set<Integer> set = new HashSet<>(Arrays.asList(serialNums));
assert set.size() == serialNums.length;
}

@Test
public void mysqlSerialNumberRandomKey() throws Exception {
long startTime = System.currentTimeMillis();
final int incr = 1;
final int threadNum = 10;
final int threadTime = 1000;
final int itemNum = 20;

final SerialNumber[] serialNumbers = new SerialNumber[itemNum];
CountDownLatch downLatch = new CountDownLatch(threadNum);
//随机设置值
IntStream.range(0, itemNum).forEach((i) ->{
Random random = new Random();
DecimalFormat format = new DecimalFormat("00000");
serialNumbers[i] = new SerialNumber("ITEM", "1001_K" + format.format(random.nextInt(itemNum) + 1), SerialSignEnum.D);
});
//记录线程返回值
final Integer[] signNums = new Integer[threadNum * threadTime];
//记录线程请求
final SerialNumber[] signSerialNumbers = new SerialNumber[threadNum * threadTime];
final Thread[] threads = new Thread[threadNum];
IntStream.range(0, threadNum).forEach((i) -> threads[i] = new Thread(() -> {
IntStream.range(0, threadTime).forEach((j) -> {
//线程随机选择需要执行条件
Random random = new Random();
SerialNumber serialNumber = serialNumbers[random.nextInt(20)];
//记录执行的请求、返回值
signNums[i * threadTime + j] = serialNumberService.mysqlSerialNumber(serialNumber, incr);
signSerialNumbers[i * threadTime + j] = serialNumber;
});
downLatch.countDown();
}));
Arrays.stream(threads).forEach(Thread::start);

downLatch.await();
System.out.println("耗时:" + (System.currentTimeMillis() - startTime));

//汇总返回数据
Map<String, List<Integer>> serialNumMap = new HashMap<>();
for (int i = 0; i < signNums.length; i++) {
List<Integer> numList = serialNumMap.computeIfAbsent(signSerialNumbers[i].getGroupKey(), k -> new ArrayList<>());
numList.add(signNums[i]);
}
//验证展示是否符合需求
serialNumMap.forEach((key, value) ->{
Set<Integer> set = new HashSet<>(value);
assert set.size() == value.size();
System.out.printf("key:" + key);
value.sort(Integer::compareTo);
System.out.println(" num: " + value);

});
}
}

创建3个测试用例:
1、普通调用
2、多线程并发调同一条数据
3、多线程并发调随机调用不同数据

redis生成流水号

添加redis、spring-data-redis依赖,service新增方法。
spring配置redis如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"/>

<bean id="jedisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="hostName" value="localhost"/>
<property name="port" value="6379"/>
<property name="timeout" value="1800"/>
<property name="poolConfig" ref="jedisPoolConfig"/>
<property name="usePool" value="true"/>
</bean>

<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory"/>
</bean>
1
2
3
4
5
6
7
8
9
10
11
12
13
@Autowired
private RedisTemplate redisTemplate;

public Long redisSerialNumber(SerialNumber serialNumber, final int inrc){
return (Long)redisTemplate.execute((RedisCallback<Long>) connection -> {
byte[] keyBytes = (serialNumber.getGroupCode() + "_" + serialNumber.getGroupKey() + "_" + serialNumber.getScope()).getBytes();
Long incr = connection.incrBy(keyBytes, inrc);
if (serialNumber.getSign().getExpire() != null && incr == inrc){
connection.expire(keyBytes, serialNumber.getSign().getExpire());
}
return incr;
});
}

这里通过redis中incrBy方法对值进行递增,因为redis是单线程,所以保证线程的并发。
在构造key时,因为使用了scope确定了时间,当时间到下一个重新计算时,key会发生改变,计数又会从0开始。同时对

添加测试用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@Test
public void redisSerialNumber() throws Exception {
final SerialNumber serialNumber = new SerialNumber("ITEM", "1001_K000011", SerialSignEnum.D);
Long num = serialNumberService.redisSerialNumber(serialNumber, 10);
System.out.println(num);
}

@Test
public void redisSerialNumberSingleKey() throws Exception {
long startTime = System.currentTimeMillis();
final SerialNumber serialNumber = new SerialNumber("ITEM", "1001_K00005", SerialSignEnum.D);
final int incr = 1;
final int threadNum = 10;
final int threadTime = 100;
Thread[] threads = new Thread[threadNum];
CountDownLatch downLatch = new CountDownLatch(threadNum);

Long[] serialNums = new Long[threadNum * threadTime];
IntStream.range(0, threadNum).forEach((i) -> threads[i] = new Thread(() -> {
//线程并发执行,返回值存入集合,空间换时间
IntStream.range(0, threadTime).forEach((j) -> serialNums[i * threadTime + j] = serialNumberService.redisSerialNumber(serialNumber, incr));
downLatch.countDown();
}));
Arrays.stream(threads).forEach(Thread::start);
downLatch.await();
System.out.println("耗时:" + (System.currentTimeMillis() - startTime));

//验证是否重复生成
Set<Long> set = new HashSet<>(Arrays.asList(serialNums));
assert set.size() == serialNums.length;
Arrays.sort(serialNums);
System.out.println(Arrays.toString(serialNums));
}

@Test
public void redisSerialNumberRandomKey() throws Exception {
long startTime = System.currentTimeMillis();
final int incr = 1;
final int threadNum = 10;
final int threadTime = 10000;
final int itemNum = 20;

final SerialNumber[] serialNumbers = new SerialNumber[itemNum];
CountDownLatch downLatch = new CountDownLatch(threadNum);
//随机设置值
IntStream.range(0, itemNum).forEach((i) -> {
Random random = new Random();
DecimalFormat format = new DecimalFormat("00000");
serialNumbers[i] = new SerialNumber("ITEM", "1001_K" + format.format(random.nextInt(itemNum) + 1), SerialSignEnum.D);
});
//记录线程返回值
final Long[] signNums = new Long[threadNum * threadTime];
//记录线程请求
final SerialNumber[] signSerialNumbers = new SerialNumber[threadNum * threadTime];
final Thread[] threads = new Thread[threadNum];
IntStream.range(0, threadNum).forEach((i) -> threads[i] = new Thread(() -> {
IntStream.range(0, threadTime).forEach((j) -> {
//线程随机选择需要执行条件
Random random = new Random();
SerialNumber serialNumber = serialNumbers[random.nextInt(20)];
//记录执行的请求、返回值
signNums[i * threadTime + j] = serialNumberService.redisSerialNumber(serialNumber, incr);
signSerialNumbers[i * threadTime + j] = serialNumber;
});
downLatch.countDown();
}));
Arrays.stream(threads).forEach(Thread::start);

downLatch.await();
System.out.println("耗时:" + (System.currentTimeMillis() - startTime));

//汇总返回数据
Map<String, List<Long>> serialNumMap = new HashMap<>();
for (int i = 0; i < signNums.length; i++) {
List<Long> numList = serialNumMap.computeIfAbsent(signSerialNumbers[i].getGroupKey(), k -> new ArrayList<>());
numList.add(signNums[i]);
}
//验证展示是否符合需求
serialNumMap.forEach((key, value) -> {
Set<Long> set = new HashSet<>(value);
assert set.size() == value.size();
System.out.printf("key:" + key);
value.sort(Long::compareTo);
System.out.println(" num: " + value);

});
}

测试用例和之前一样,只是调用的方法改了。

两种流水号生成优缺点

mysql:使用方便,不需要而外维护其他应用,因为需要维护redis服务器比较麻烦。缺点是生成速度较慢,适合压力不大的地方。
redis:使用相对麻烦,需要专门维护redis服务器,优点是速度极快。

合并两个功能

需求是需要生成流水号,然后和之前业务合并插入数据库。
在原先的item表中添加serial_num字段,mappper新增插入sql insertNotExist2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Autowired
private SerialNumberService serialNumberService;
@Transactional
public int createItem(String skuCode, Integer num){
//构造流水号
SerialNumber serialNumber = new SerialNumber("ITEM", "1001", SerialSignEnum.D);
DecimalFormat format = new DecimalFormat("0000000");
String serialNum = serialNumber.getScope() + format.format(serialNumberService.mysqlSerialNumber(serialNumber, 1));

//生成插入数据
Item item = new Item(skuCode, num, 0);
item.setSerialNum(serialNum);
int result = itemMapper.insertNotExist2(item);
if (result == 0){
Item newItem = new Item(skuCode, null, 4);
Item oldItem = new Item(skuCode, null, 0);
result = itemMapper.updateInfo(oldItem, newItem);
if (result != 0){
result = itemMapper.insertNotExist2(item);
}
}

return result;
}

如上,先生成流水号,然后对业务数据进行插入。
创建单元测试如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void createItem() throws Exception {
int threadNum = 20;
final String skuCode = "K000001";
Thread[]threads = new Thread[threadNum];
CountDownLatch downLatch = new CountDownLatch(threadNum);
IntStream.range(0, threadNum).forEach((i) -> threads[i] = new Thread(() ->{
itemService.createItem(skuCode, 3);
downLatch.countDown();
}));
Arrays.stream(threads).forEach(Thread::start);
downLatch.await();

}

启动20个线程,对数据库数据进行操作。

此处结果是一直等待,直到sql等待超时。

分析原因如下:
在之前使用数据库理解池使用的是最大20个数据库连接,因为启动了20个线程,消耗了掉了连接池中的连接,当执行到生成流水号时,因为之前流水号设置的事务传播级别是Propagation.REQUIRES_NEW,所以会向连接池申请新的连接,但是因为连接池中连接已经消耗完毕,只能等待连接释放,但此时连接池又全部被占用,这样互相等待导致超时发生。

Spring事务传播级别:
方法a有事务,方法b有事务

  • REQUIRED(默认):支持当前已经存在的事务,如果还没有事务,就创建一个新事务。
  • SUPPORTS:支持当前事务,如果没有事务那么就不在事务中运行。
  • MANDATORY:支持当前已经存在的事务,如果还没有事务,就抛出一个异常。
  • REQUIRES_NEW:挂起当前事务,创建一个新事务,如果还没有事务,就简单地创建一个新事务,和之前创建的事务无关。
  • NOT_SUPPORTED:强制不在事务中运行,如果当前存在一个事务,则挂起该事务。
  • NEVER:强制要求不在事务中运行,如果当前存在一个事务,则抛出异常。
  • NESTED:在当前事务中创建一个嵌套事务,如果还没有事务,那么就简单地创建一个新事务。和REQUIRES_NEW区别是在于,该处创建的事务和之前事务是嵌套关系,在嵌套事务提交后,如果调用方抛出异常,子事务还是会回滚。

参考:Spring事务管理中@Transactional的propagation参数

解决办法

修改事务传播级别为默认,这样导致的缺陷是流水号生成的事务较长,这样流水号生成速度会比较慢。采用redis生成流水号。