MQTT协议
- MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。MQTT协议特点
- MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。
- MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。
MQTT Client
- publisher 和 subscriber 都属于 MQTT Client,之所以有发布者和订阅者这个概念,其实是一种相对的概念,就是指当前客户端是在发布消息还是在接收消息,发布和订阅的功能也可以由同一个 MQTT Client 实现。MQTT 客户端是运行 MQTT 库并通过网络连接到 MQTT 代理的任何设备(从微控制器到成熟的服务器)。例如,MQTT 客户端可以是一个非常小的、资源受限的设备,它通过无线网络进行连接并具有一个最低限度的库。基本上,任何使用 TCP/IP 协议使用 MQTT 设备的都可以称之为 MQTT Client。MQTT 协议的客户端实现非常简单直接,易于实施是 MQTT 非常适合小型设备的原因之一。MQTT 客户端库可用于多种编程语言。例如,Android、Arduino、C、C++、C#、Go、iOS、Java、JavaScript 和 .NET。
MQTT Broker
- 与 MQTT Client 对应的就是 MQTT Broker,Broker 是任何发布/订阅协议的核心,根据实现的不同,代理可以处理多达数百万连接的 MQTT Client。 Broker 负责接收所有消息,过滤消息,确定是哪个Client 订阅了每条消息,并将消息发送给对应的 Client,Broker 还负责保存会话数据,这些数据包括订阅的和错过的消息。Broker 还负责客户端的身份验证和授权。
MQTT Connection
- MQTT 协议基于 TCP/IP。客户端和代理都需要有一个 TCP/IP 协议支持。 MQTT 连接始终位于
准备工作 (下载Broker服务端,相关客户端工具)
- 服务端工具:
- mosquitto https://mosquitto.org/download/
- 客户端工具:
- MQTTX https://mqttx.app/zh#download
创建SpringBoot的工程 (这里自己搭建就行了)
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.17.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.huawen</groupId>
<artifactId>mqtt-demo</artifactId>
<version>0.0.1</version>
<name>mqtt-demo</name>
<description>Boot with MQTT Demo</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
spring:
application:
name: MQTT-DEMO
server:
port: 8989
mqtt:
uris:
- tcp://127.0.0.1:1883
clientId: mqtt_test1
topics:
- demo
- test
username: admin
password: 123456
timeout: 30
keepalive: 60
qos: 1
- 增加config配置读取yml文件 (使用了Lombok 需要自行添加pom依赖)
package com.huawen.mqtt.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttConfiguration {
private String[] uris;
private String clientId;
private String[] topics;
private String username;
private String password;
private Integer timeout;
private Integer keepalive;
private Integer qos;
}
package com.huawen.mqtt.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import javax.annotation.Resource;
@Configuration
@Slf4j
public class MqttInBoundConfiguration {
@Resource
private MqttConfiguration mqttProperties;
@Bean("input")
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory inClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(mqttProperties.getUris());
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepalive());
options.setCleanSession(false);
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageProducer producer() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_customer", inClientFactory(), mqttProperties.getTopics());
adapter.setCompletionTimeout(5000);
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
adapter.setConverter(defaultPahoMessageConverter);
adapter.setQos(mqttProperties.getQos());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler handler() {
return message -> {
log.info("收到的完整消息为--->{}", message);
log.info("----------------------");
log.info("message:" + message.getPayload());
log.info("Id:" + message.getHeaders().getId());
log.info("receivedQos:" + message.getHeaders().get(MqttHeaders.RECEIVED_QOS));
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
log.info("topic:" + topic);
log.info("----------------------");
};
}
}
package com.huawen.mqtt.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import javax.annotation.Resource;
@Configuration
public class MqttOutBoundConfiguration {
@Resource
private MqttConfiguration mqttProperties;
@Bean("out")
public MessageChannel mqttOutBoundChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory outClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
String[] uris = mqttProperties.getUris();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(uris);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepalive());
options.setCleanSession(false);
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "out")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "_producer", outClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultQos(mqttProperties.getQos());
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
messageHandler.setConverter(defaultPahoMessageConverter);
return messageHandler;
}
}
package com.huawen.mqtt.inter;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "out")
public interface MqttGateway {
void sendToMqtt(String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);