工作问题-并发事务
现在开发过程中有个需求,商品没货了需要补货,简略版如下:
1、发现商品没货,生成补货单
2、如果现在有未处理补货单,先删除当前补货单,之后生成新的补货单,如果当前的补货单已经开始处理,那么忽略
3、提供API接口供修改状态
现在数据库设计,状态分为:0:待处理,1:已经认领,2:处理完毕,4:取消
简略版数据库如下:
1 | create table test.item |
项目验证
创建Spring项目,配置SpringMVC,Spring、Mybatis
数据库操作如下:
1 | <mapper namespace="com.whh.mapper.ItemMapper"> |
先展示错误的示范:
1 |
|
编写Test
1 |
|
启动Test测试,查询数据库数据。
数据库中同一个商品非4状态的只有一条。
这样存在一个问题,如果出现并发访问,那么数据库中数据就会查询问题
1 |
|
启动Test开始测试,结论是肯定的,测试不通过。
分析原因
因为在代码中,我们是先查询的数据,之后在做的判断,在并发访问时,可能出现多线程同时查询,都为发现数据库无数据,之后执行后续的插入逻辑。后面判断状态也是同理。
解决办法:INSERT ... ON DUPLICATE KEY UPDATE
,表示如果存在就更新,如果不存在就插入,刚刚好这需求之前的要求。
但是此处并不适合这个,因为ON DUPLICATE KEY UPDATE
判断重复是依据唯一索引,但是在数据库中,sku_code
和status
无法建立唯一索引,因为可能会出现一个sku_code
对应多个重复的status
。
那先解决如果存在在插入数据库。
改进sql,在mapper中新增方法
1 | <insert id="insertNotExist" parameterType="com.whh.pojo.Item" useGeneratedKeys="true" keyProperty="id"> |
DUAL
为临时表。
通过查询skuCode
和status != 4
,使用not exists
,获取返回的第一条数据,然后进行插入。
在Mysql中not exists
只会返回true
或者false
,如果返回的是true
表示不存在该sku_code
且status
不为4的数据;反之则返回false
,那么将无法插入数据。
测试该方法是生效
1 |
|
运行单元测试,验证该方法是生效的。
解决了数据只有不存在时才插入。
下一步就是如果存在数据状态为0,那么需要修改为4
Mapper新增update操作:
1 | <update id="updateInfo"> |
- 注:最后设置如果传入的查询条件为null,那么就不做更新,避免因为没有查询条件导致更新整张表。
在一般情况下,更新时只是通过主键直接更新值,这样在并发情况下会出现多个线程更新,且都更新成功。
如:现在一个线程更新状态为4,生成新数据,另一个线程更新状态为1,设置为认领,这样就会导致上一个线程更新后,下一个线程又直接修改了值,导致数据出现错误。
所以在处理这种情况时需要加入状态进行判断(CAS)。
改进代码
修改之前逻辑为新代码,如下:
1 | Transactional(rollbackFor = Exception.class) |
开启并发测试,理论上是应该OK了,但是启动单元测试后,出现了错误。
1 | Exception in thread "Thread-5" Exception in thread "Thread-9" Exception in thread "Thread-11" org.springframework.dao.DeadlockLoserDataAccessException: |
从错误上看,出现了死锁,通过命令show engine innodb status;
查询数据库
1 |
|
重现deadlock。
1 | INSERT INTO item (sku_code, num, status) SELECT |
现在数据库中没有改条数据,启动3个session,设置事务不自动提交。步骤:
1、session1,执行sql,输入插入一条数据,但是事务未提交
2、seesion2,执行sql,
3、session3,执行sql,
4、session1回滚,这时就会发现session3中出现了deadlock。
参考:两个INSERT发生死锁原因剖析
解决办法,修改SQL如下:
1 | <insert id="insertNotExist" parameterType="com.whh.pojo.Item" useGeneratedKeys="true" keyProperty="id"> |
在select的后面添加for update
,用于锁住数据,避免发生竞争导致死锁。这里通过上述启动3个事务方便执行,然后回滚的例子,并不会出现deadlock。
- 此处需要添加索引,因为
for update
默认通过索引来锁数据。
重新运行单元测试,启动正常。
此处记录一个未解问题:
之前想优化下此处逻辑,先直接更新状态0–>4,然后在插入。如下:
1 |
|
此处在并发的时候是有问题的(业务也有问题,会出现数据丢失的情况),还是会出现死锁,难道是因为使用update
的时候数据库的锁和insert ... select
设置的锁,导致互锁?暂时还未弄明白。
解决上述问题后,又出现一个问题,当并发执行insert ... select
时,其他线程会等待该线程事务提交后才会执行,这样就会导致效率很低。
通过sql
1 | select r.trx_isolation_level, r.trx_id waiting_trx_id,r.trx_mysql_thread_id waiting_trx_thread, |
这样可以查询出等待的锁,可以得出锁住的是之前创建的索引。
在测试的时候,因为每次都是直接删除掉数据库数据后进行操作的,这样就会出现上述的等待锁的情况。后来发现当我数据库中存在K000006
数据时,两个事物分别插入K000001
和K000007
并不会出现锁等待,提交事务后数据都插入了数据库,后来实验分别插入K000006
两边的数据,结果都未发现锁等待的情况。这里猜测应该是数据库中索引使用的是B+Tree
结构。这时删掉之前建立的索引,都会导致锁等待。
分布式流水号
在应用生成分布式流水号,有两种方式,一种是通过数据表生成,一种是通过缓存生成。
Mysql生成流水号
在原先生成分布式流水号是通过先select ... for update
来锁定数据库中某条值,之后对该值进行更新操作,这种就会出现一个问题,如果刚刚开始数据库就没有值,并发访问时,会出现同时对数据进行插入操作。如果建立了唯一索引数据并不会出现问题。
设计数据库表如下:
1 | create table test.serial_number |
创建表,同时对group_code、group_key、scope、sign、num
建立唯一索引。
mapper创建db操作:
1 | <insert id="insertWithUpdateIncr"> |
insertWithUpdateIncr
用来对数据进行更新和插入操作,如果存在指定数据,那么就对数据进行更新,更新的值为传入的值为原始值+传入值,如果不存在该数据,那么对该数据进行插入操作。(数据库默认num为0,为了避免生成默认值,需要在插入时指定传入的值,可以避免数据获取失败)selectNum
用于插入更新后,查询数据库最新值。
创建映射对象:
1 | public class SerialNumber { |
创建service进行操作
1 |
|
如上,直接对数据库数据进行插入更新操作,之后对数据进行查询,获取最新值。
这种在并发的时候也保证数据正常是因为之前设置了唯一索引,在对数据进行插入更新操作时会对当前数据进行加锁,此时如果存在其他事务中对该数据进行操作就会进入锁等待,直到上个事务释放锁,所以之后的查询操作保证了数据为当前更新后的最新值。如果是对不同数据进行操作,不会发生锁竞争。
注:此处事务传播级别采用的是Propagation.REQUIRES_NEW
,创建一个于原先事务无关的事务,这样避免了因调用端事务执行时间过长导致其他线程一直等待。这种缺点在于如果在调用生成流水号后调用方出现异常,那么会导致序列化不连续。
编写单元测试:
1 |
|
创建3个测试用例:
1、普通调用
2、多线程并发调同一条数据
3、多线程并发调随机调用不同数据
redis生成流水号
添加redis、spring-data-redis依赖,service新增方法。
spring配置redis如下
1 | <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"/> |
1 |
|
这里通过redis中incrBy
方法对值进行递增,因为redis是单线程,所以保证线程的并发。
在构造key时,因为使用了scope确定了时间,当时间到下一个重新计算时,key会发生改变,计数又会从0开始。同时对
添加测试用例:
1 |
|
测试用例和之前一样,只是调用的方法改了。
两种流水号生成优缺点
mysql:使用方便,不需要而外维护其他应用,因为需要维护redis服务器比较麻烦。缺点是生成速度较慢,适合压力不大的地方。
redis:使用相对麻烦,需要专门维护redis服务器,优点是速度极快。
合并两个功能
需求是需要生成流水号,然后和之前业务合并插入数据库。
在原先的item表中添加serial_num
字段,mappper新增插入sql insertNotExist2
。
1 |
|
如上,先生成流水号,然后对业务数据进行插入。
创建单元测试如下:
1 |
|
启动20个线程,对数据库数据进行操作。
此处结果是一直等待,直到sql等待超时。
分析原因如下:
在之前使用数据库理解池使用的是最大20个数据库连接,因为启动了20个线程,消耗了掉了连接池中的连接,当执行到生成流水号时,因为之前流水号设置的事务传播级别是Propagation.REQUIRES_NEW
,所以会向连接池申请新的连接,但是因为连接池中连接已经消耗完毕,只能等待连接释放,但此时连接池又全部被占用,这样互相等待导致超时发生。
Spring事务传播级别:
方法a有事务,方法b有事务
- REQUIRED(默认):支持当前已经存在的事务,如果还没有事务,就创建一个新事务。
- SUPPORTS:支持当前事务,如果没有事务那么就不在事务中运行。
- MANDATORY:支持当前已经存在的事务,如果还没有事务,就抛出一个异常。
- REQUIRES_NEW:挂起当前事务,创建一个新事务,如果还没有事务,就简单地创建一个新事务,和之前创建的事务无关。
- NOT_SUPPORTED:强制不在事务中运行,如果当前存在一个事务,则挂起该事务。
- NEVER:强制要求不在事务中运行,如果当前存在一个事务,则抛出异常。
- NESTED:在当前事务中创建一个嵌套事务,如果还没有事务,那么就简单地创建一个新事务。和REQUIRES_NEW区别是在于,该处创建的事务和之前事务是嵌套关系,在嵌套事务提交后,如果调用方抛出异常,子事务还是会回滚。
参考:Spring事务管理中@Transactional的propagation参数
解决办法
修改事务传播级别为默认,这样导致的缺陷是流水号生成的事务较长,这样流水号生成速度会比较慢。采用redis生成流水号。