服务端启动
使用docker启动服务端测试
1 2
| docker pull emqx:5.8.6 docker run --rm -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx:5.8.6
|
登录后台 http://localhost:18083 账号:admin 密码:public
客户端使用
添加相关依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency>
|
订阅者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| static class Subscribe implements Runnable, MqttCallback { private Logger log; private String topic = "testtopic/#";
private MqttClient client;
public Subscribe(String name, String broker) throws MqttException { log = LoggerFactory.getLogger(name); client = new MqttClient(broker, name, new MemoryPersistence()); }
@Override public void run() { try { client.setCallback(this); client.connect(); client.subscribe(topic, 0); } catch (MqttException e) { throw new RuntimeException(e); } }
@Override public void connectionLost(Throwable throwable) { log.error("connection lost"); }
@Override public void messageArrived(String s, MqttMessage mqttMessage) { log.info("msg:{} {}", s, new String(mqttMessage.getPayload())); }
@Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("deliveryComplete:{}", iMqttDeliveryToken.isComplete()); } }
|
发布者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| static class Publish extends Thread { private Logger log; private String topic; private MqttClient client; private String name;
public Publish(String name, String broker, String topic) throws MqttException { this.topic = topic; this.name = name; log = LoggerFactory.getLogger(name); client = new MqttClient(broker, name, new MemoryPersistence()); }
@Override public void run() { try { client.connect(); while (true) { String content = "i'am " + name + new Random().nextInt(10000); log.info("send smg:{}", content); MqttMessage message = new MqttMessage(content.getBytes()); client.publish(topic, message); TimeUnit.MILLISECONDS.sleep(3000 + new Random().nextInt(1000)); } } catch (MqttException | InterruptedException e) { throw new RuntimeException(e); } } }
|
项目启动
1 2 3 4 5 6 7 8 9 10 11 12 13
| String broker = "tcp://192.168.88.1:1883";
Subscribe sub1 = new Subscribe("sub-1", broker); sub1.run(); Subscribe sub2 = new Subscribe("sub-2", broker); sub2.run();
Publish pub1 = new Publish("pub-1", broker, "testtopic/1"); Publish pub2 = new Publish("pub-2", broker, "testtopic/2");
pub1.start(); pub2.start();
|