需求
有一部分数据设置了超时时间,当到达该超时时间改数据还未处理时,推送数据给监控人员。由监控处理相关事情。
技术方案
1、维护一个有序链表,在生成改数据后,把该数据加入该有序链表中,保证链表头为最小的超时时间。
2、通过线程自旋,获取头部数据是否超时,超时取出数据,查询DB该数据是否已经被处理,未处理超时推送给监控,已经处理丢掉改数据。
考虑问题
- 因为是部署集群,没有调度器,在迭代更新时,重启服务器会导致服务器中数据丢失。
解决方案:在启动服务器时使用redis中incs命令(过期时间30s),当返回值为1时,该服务器从数据库中获取所有需要监控的数据,加入链表。其他服务器因为值不为1,不做处理。
- 在服务器运行过程中,其中一台服务器宕机,重启服务器后,当前服务器数据数据丢失,其他服务器正常,重启从数据中获取的数据保护其他服务器中数据。
解决方案:在处理超时数据的时候,同样适用redis的incs命令(过期时间30s),返回值为1的服务器处理数据,另外非1的是丢弃数据。
代码实现
队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| import java.io.Serializable; import java.util.concurrent.locks.ReentrantLock;
public class SynsLinkQueue<E extends Comparable<E>> implements Serializable { private transient volatile Node<E> head; private transient volatile Node<E> last; private transient volatile ReentrantLock lock = new ReentrantLock(); private transient int size = 0;
public SynsLinkQueue() { }
public int size() { return size; }
public boolean add(E e) { return offer(e); }
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (head == null) { last = head = new Node<>(null, e, null); size++; return true; } Node<E> node = last; Node<E> newNode = null; while (node != null) { if (e.compareTo(node.item) >= 0) { newNode = new Node<>(node, e, node.next); if (node.next != null){ node.next.prev = newNode; } else { last = newNode; } node.next = newNode; break; } else { node = node.prev; } } if (newNode == null) { newNode = new Node<>(null, e, head); head.prev = newNode; head = newNode; }
size++; } finally { lock.unlock(); } return true; }
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); E e; try { if (size != 0) { Node<E> node = head; e = node.item; head = head.next; if (head != null)head.prev = null; size--; } else { e = null; } } finally { lock.unlock(); } return e; }
public E peek() { return head == null ? null : head.item; }
private static class Node<E> { E item; Node<E> next; Node<E> prev;
Node(Node<E> prev, E element, Node<E> next) { this.item = element; this.next = next; this.prev = prev; } }
}
|
推送消息抽象类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| import com.fn.cache.client.CacheClient; import com.rt.picking.common.fcm.dto.PushMessageBody; import com.rt.picking.common.service.outapi.FCMCaller; import com.rt.picking.common.util.SynsLinkQueue; import com.rt.picking.soa.dto.PickTimeoutDto; import org.springframework.beans.factory.annotation.Autowired;
import java.util.Date;
public abstract class TimeoutMessageService implements Runnable { private static SynsLinkQueue<PickTimeoutDto> pickTimeoutLink = new SynsLinkQueue<>(); @Autowired private CacheClient cacheClient;
public void addLinkDate(){ pickTimeoutLink.add(calculatePickTimeout()); }
public abstract PickTimeoutDto calculatePickTimeout();
public abstract PushMessageBody pushPickAppMessage(String pickingId);
@Override public void run() { int i = 1; while (true){ try { PickTimeoutDto timeoutDto = pickTimeoutLink.peek(); if (timeoutDto.getTimeout().compareTo(new Date()) <= 0){ timeoutDto = pickTimeoutLink.poll(); PushMessageBody pushMessageInfo = pushPickAppMessage(timeoutDto.getPickingId()); if (pushMessageInfo != null && cacheClient.incr("temp_" + timeoutDto.getPickingId(), 60) == 2){ } } else { Thread.sleep(i * 1000); i = (i = i << 1) > 10 ? 1: i; } } catch (Exception e) { e.printStackTrace(); } } }
}
|
最后方案被否决,理由太复杂。最终解决方案,使用定时器,定时查询超时数据。