定时消息推送

需求

有一部分数据设置了超时时间,当到达该超时时间改数据还未处理时,推送数据给监控人员。由监控处理相关事情。

技术方案

1、维护一个有序链表,在生成改数据后,把该数据加入该有序链表中,保证链表头为最小的超时时间。
2、通过线程自旋,获取头部数据是否超时,超时取出数据,查询DB该数据是否已经被处理,未处理超时推送给监控,已经处理丢掉改数据。

考虑问题

  • 因为是部署集群,没有调度器,在迭代更新时,重启服务器会导致服务器中数据丢失。
    解决方案:在启动服务器时使用redis中incs命令(过期时间30s),当返回值为1时,该服务器从数据库中获取所有需要监控的数据,加入链表。其他服务器因为值不为1,不做处理。
  • 在服务器运行过程中,其中一台服务器宕机,重启服务器后,当前服务器数据数据丢失,其他服务器正常,重启从数据中获取的数据保护其他服务器中数据。
    解决方案:在处理超时数据的时候,同样适用redis的incs命令(过期时间30s),返回值为1的服务器处理数据,另外非1的是丢弃数据。

代码实现

队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129

import java.io.Serializable;
import java.util.concurrent.locks.ReentrantLock;

/**
* 同步链式队列
* Created by whhxz on 2016/12/30.
*/
public class SynsLinkQueue<E extends Comparable<E>> implements Serializable {
private transient volatile Node<E> head; //链表头部
private transient volatile Node<E> last; //链表尾部
private transient volatile ReentrantLock lock = new ReentrantLock();
private transient int size = 0;

public SynsLinkQueue() {
}

/**
* 返回列表数量
* @return
*/
public int size() {
return size;
}

/**
* 添加链表
* @param e
* @return
*/
public boolean add(E e) {
return offer(e);
}

/**
* 插入链表
* @param e
* @return
*/
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (head == null) {
last = head = new Node<>(null, e, null);
size++;
return true;
}
Node<E> node = last;
Node<E> newNode = null;
while (node != null) {
if (e.compareTo(node.item) >= 0) {
newNode = new Node<>(node, e, node.next);
if (node.next != null){
node.next.prev = newNode;
} else {
last = newNode;
}
node.next = newNode;
break;
} else {
node = node.prev;
}
}
if (newNode == null) {
newNode = new Node<>(null, e, head);
head.prev = newNode;
head = newNode;
}

size++;
} finally {
lock.unlock();
}
return true;
}

/**
* 取出优先级链表头部
* @return
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
E e;
try {
if (size != 0) {
Node<E> node = head;
e = node.item;
head = head.next;
if (head != null)head.prev = null;
size--;
} else {
e = null;
}
} finally {
lock.unlock();
}
return e;
}

/**
* 读取链表头部
* @return
*/
public E peek() {
return head == null ? null : head.item;
}

/**
* 队列节点
* @param <E>
*/
private static class Node<E> {
E item;
Node<E> next;
Node<E> prev;

Node(Node<E> prev, E element, Node<E> next) {
this.item = element;
this.next = next;
this.prev = prev;
}
}

}

推送消息抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

import com.fn.cache.client.CacheClient;
import com.rt.picking.common.fcm.dto.PushMessageBody;
import com.rt.picking.common.service.outapi.FCMCaller;
import com.rt.picking.common.util.SynsLinkQueue;
import com.rt.picking.soa.dto.PickTimeoutDto;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Date;

/**
* 推送消息
* Created by whhxz on 2017/1/4.
*/
public abstract class TimeoutMessageService implements Runnable {
private static SynsLinkQueue<PickTimeoutDto> pickTimeoutLink = new SynsLinkQueue<>();
@Autowired
private CacheClient cacheClient;
/**
* 拣货单
*/
public void addLinkDate(){
pickTimeoutLink.add(calculatePickTimeout());
}

public abstract PickTimeoutDto calculatePickTimeout();

public abstract PushMessageBody pushPickAppMessage(String pickingId);


@Override
public void run() {
int i = 1;
while (true){
try {
PickTimeoutDto timeoutDto = pickTimeoutLink.peek();
if (timeoutDto.getTimeout().compareTo(new Date()) <= 0){
timeoutDto = pickTimeoutLink.poll();
PushMessageBody pushMessageInfo = pushPickAppMessage(timeoutDto.getPickingId());
if (pushMessageInfo != null && cacheClient.incr("temp_" + timeoutDto.getPickingId(), 60) == 2){
//推送消息
}
} else {
//阶梯休眠
Thread.sleep(i * 1000);
i = (i = i << 1) > 10 ? 1: i;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

}

最后方案被否决,理由太复杂。最终解决方案,使用定时器,定时查询超时数据。