目录
- 前言
- Paho Java 库实现
- spring boot集成mqtt
- 核心代码
- 总结
前言
在开发MQTT时有两种方式一种是使用Paho Java 原生库来完成,一种是使用spring boot 来完成。
Paho Java 库实现
Eclipse Paho Java Client (opens new window)是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 异步和同步 API
- 通过 Maven 安装 Paho Java
<dependency> | |
<groupId>org.eclipse.paho</groupId> | |
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> | |
<version>1.2.2</version> | |
</dependency> |
- Paho Java 使用示例
Java 体系中 Paho Java 是比较稳定、广泛应用的 MQTT 客户端库,本示例包含 Java 语言的 Paho Java 连接 EMQX Broker,并进行消息收发完整代码:
package io.emqx; | |
import org.eclipse.paho.client.mqttv3.MqttClient; | |
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; | |
import org.eclipse.paho.client.mqttv3.MqttException; | |
import org.eclipse.paho.client.mqttv3.MqttMessage; | |
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; | |
public class App { | |
public static void main(String[] args) { | |
String subTopic = "testtopic/#"; | |
String pubTopic = "testtopic/1"; | |
String content = "Hello World"; | |
int qos = 2; | |
String broker = "tcp://broker.emqx.io:1883"; | |
String clientId = "emqx_test"; | |
MemoryPersistence persistence = new MemoryPersistence(); | |
try { | |
MqttClient client = new MqttClient(broker, clientId, persistence); | |
// MQTT 连接选项 | |
MqttConnectOptions connOpts = new MqttConnectOptions(); | |
connOpts.setUserName("emqx_test"); | |
connOpts.setPassword("emqx_test_password".toCharArray()); | |
// 保留会话 | |
connOpts.setCleanSession(true); | |
// 设置回调 | |
client.setCallback(new PushCallback()); | |
// 建立连接 | |
System.out.println("Connecting to broker: " + broker); | |
client.connect(connOpts); | |
System.out.println("Connected"); | |
System.out.println("Publishing message: " + content); | |
// 订阅 | |
client.subscribe(subTopic); | |
// 消息发布所需参数 | |
MqttMessage message = new MqttMessage(content.getBytes()); | |
message.setQos(qos); | |
client.publish(pubTopic, message); | |
System.out.println("Message published"); | |
client.disconnect(); | |
System.out.println("Disconnected"); | |
client.close(); | |
System.exit(0); | |
} catch (MqttException me) { | |
System.out.println("reason " + me.getReasonCode()); | |
System.out.println("msg " + me.getMessage()); | |
System.out.println("loc " + me.getLocalizedMessage()); | |
System.out.println("cause " + me.getCause()); | |
System.out.println("excep " + me); | |
me.printStackTrace(); | |
} | |
} | |
} |
回调消息处理类 OnMessageCallback.java
package io.emqx; | |
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; | |
import org.eclipse.paho.client.mqttv3.MqttCallback; | |
import org.eclipse.paho.client.mqttv3.MqttMessage; | |
public class OnMessageCallback implements MqttCallback { | |
public void connectionLost(Throwable cause) { | |
// 连接丢失后,一般在这里面进行重连 | |
System.out.println("连接断开,可以做重连"); | |
} | |
public void messageArrived(String topic, MqttMessage message) throws Exception { | |
// subscribe后得到的消息会执行到这里面 | |
System.out.println("接收消息主题:" + topic); | |
System.out.println("接收消息Qos:" + message.getQos()); | |
System.out.println("接收消息内容:" + new String(message.getPayload())); | |
} | |
public void deliveryComplete(IMqttDeliveryToken token) { | |
System.out.println("deliveryComplete---------" + token.isComplete()); | |
} | |
} |
好的上述就实现了简单的 MQTT的连接和消息收发。
spring boot集成mqtt
spring boot 环境
spring-boot 版本 2.2.2 | |
spring-integration的版本为:5.4.3 | |
Spring Integration提供了入站适配器和出站适配器以支持MQTT协议。 |
Maven 依赖:
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt --> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-integration</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.integration</groupId> | |
<artifactId>spring-integration-mqtt</artifactId> | |
<version>5.4.3</version> | |
</dependency> |
配置文件 application.yml:
spring: | |
mqtt: | |
username: | |
password: | |
url: tcp://ip:port | |
clientId: clientId | |
topic: default | |
completionTimeout: 2000 |
核心代码
配置类
"spring.mqtt") | (prefix =|
public class MqttConfiguration { | |
private String username; | |
private String password; | |
private String url; | |
private String clientId; | |
private String topic = "TOPIC_DEFAULT"; | |
private Integer completionTimeout = 2000; | |
/** | |
* 注册MQTT客户端工厂 | |
* @return | |
*/ | |
public MqttPahoClientFactory mqttClientFactory(){ | |
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); | |
MqttConnectOptions options = new MqttConnectOptions(); | |
//如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: | |
// 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 | |
// 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 | |
options.setCleanSession(true); | |
//该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 | |
// 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 | |
options.setConnectionTimeout(0); | |
//此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 | |
options.setKeepAliveInterval(90); | |
//自动重新连接 | |
options.setAutomaticReconnect(true); | |
options.setUserName(this.getUsername()); | |
options.setPassword(this.getPassword().toCharArray()); | |
options.setServerURIs(new String[]{this.getUrl()}); | |
factory.setConnectionOptions(options); | |
return factory; | |
} | |
} | |
public class MqttInboundConfiguration { | |
private MqttConfiguration mqttConfig; | |
private MqttPahoClientFactory factory; | |
private MqttMessageReceiver mqttMessageReceiver; | |
/** | |
* 此处可以使用其他消息通道 | |
* Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 | |
* | |
* @return | |
*/ | |
public MessageChannel mqttInBoundChannel() { | |
return new DirectChannel(); | |
} | |
/** | |
* 适配器, 两个topic共用一个adapter | |
* 客户端作为消费者,订阅主题,消费消息 | |
* | |
* @param | |
* @param | |
* @return | |
*/ | |
public MessageProducerSupport mqttInbound() { | |
MqttPahoMessageDrivenChannelAdapter adapter = | |
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic()); | |
adapter.setCompletionTimeout(60000); | |
adapter.setConverter(new DefaultPahoMessageConverter()); | |
adapter.setRecoveryInterval(10000); | |
adapter.setQos(0); | |
adapter.setOutputChannel(mqttInBoundChannel()); | |
return adapter; | |
} | |
/** | |
* mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 | |
* | |
* @return | |
*/ | |
"mqttInBoundChannel") | (inputChannel =|
public MessageHandler mqttMessageHandler() { | |
return this.mqttMessageReceiver; | |
} | |
} |
数据接收
public class MqttMessageReceiver implements MessageHandler { | |
public void handleMessage(Message<?> message) throws MessagingException { | |
try { | |
MessageHeaders headers = message.getHeaders(); | |
//获取消息Topic | |
String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); | |
log.info("[获取到的消息的topic :]{} ", receivedTopic); | |
//获取消息体 | |
String payload = (String) message.getPayload(); | |
log.info("[获取到的消息的payload :]{} ", payload); | |
//todo .... | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
public class MqttOutboundConfiguration { | |
private MqttConfiguration mqttConfig; | |
private MqttPahoClientFactory factory; | |
public MessageChannel mqttOutboundChannel() { | |
return new DirectChannel(); | |
} | |
public MessageHandler mqttOutbound() { | |
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( | |
mqttConfig.getClientId()+"-"+System.currentTimeMillis() + System.currentTimeMillis(), factory); | |
messageHandler.setDefaultQos(0); | |
//开启异步 | |
messageHandler.setAsync(true); | |
messageHandler.setDefaultTopic(mqttConfig.getTopic()); | |
return messageHandler; | |
} | |
} |
发送者
@Component | |
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") | |
public interface MqttGateway { | |
/** | |
* 发送mqtt消息 | |
* @param topic 主题 | |
* @param payload 内容 | |
* @return void | |
*/ | |
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); | |
/** | |
* 发送包含qos的消息 | |
* @param topic 主题 | |
* @param qos 对消息处理的几种机制。 | |
* * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br> | |
* * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br> | |
* * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 | |
* @param payload 消息体 | |
* @return void | |
*/ | |
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); | |
/** | |
* 发送包含qos的消息 | |
* @param topic 主题 | |
* @param qos 对消息处理的几种机制。 | |
* * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br> | |
* * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br> | |
* * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 | |
* @param payload 消息体 | |
* @return void | |
*/ | |
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); | |
} | |
@Component | |
@AllArgsConstructor | |
public class MqttMessageSender { | |
private MqttGateway mqttGateway; | |
/** | |
* 发送mqtt消息 | |
* @param topic 主题 | |
* @param message 内容 | |
* @return void | |
*/ | |
public void send(String topic, String message) { | |
mqttGateway.sendToMqtt(topic, message); | |
} | |
/** | |
* 发送包含qos的消息 | |
* @param topic 主题 | |
* @param qos 质量 | |
* @param messageBody 消息体 | |
* @return void | |
*/ | |
public void send(String topic, int qos, JSONObject messageBody){ | |
mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); | |
} | |
/** | |
* 发送包含qos的消息 | |
* @param topic 主题 | |
* @param qos 质量 | |
* @param message 消息体 | |
* @return void | |
*/ | |
public void send(String topic, int qos, byte[] message){ | |
mqttGateway.sendToMqtt(topic, qos, message); | |
} | |
} |
总结
综上所述上面就是我们经常用的到两种方式,希望对你有所帮助