1.前言 这篇文章其实会很乱,因为我不是从基础开始弄的这个mqtt协议,只能是从别人的代码的基础上进行修改的来的。
对于发布者:
消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。
DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。
对于订阅者:
通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel。
同样由 MessageChannel 的实例 DirectChannel 处理消费细节。Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。
参考文章: 1.SpringBoot集成MQTT 这篇文章关于mqtt的内容,可以好好的参考示例做一下。这里有一个 mqtt事件监听类 的东西,@EventListener,但是其他的地方好像没有。MqttPahoClientFactory 配置连接;MessageChannel 接收和发送消息通道;MessageProducer 接收消息;@ServiceActivator 接收和发送消息处理; 【2】.Spring Integration 中文手册(完整版) 这里讲了整个的spring integration的东西,包括很多内容,大部分通过xml配置进行解决。 【3】.springboot整合mqtt的详细图文教程 【4】. Spring Integration简介
2.消息接收 参考文章: 1.一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布 这里使用了EMQX作为服务器,讲解了各个部分不同的功能,没有用注解,就是建立连接、发送、配置回调 2.MQTT 消息通信工具使用 这里用了python和springboot进行了分别的示例说明。 3.java实现mqtt发送消息和接收消息 4.MQTT接收消息回调
3.消息通道 参考文章: 1.9.2.1 消息通道
4.引入步骤 这里的步骤是一个通道作为所有消息订阅的通道,一个通道作为所有发送消息的通道。
(1)创建Spring Boot Maven工程,引入如下依赖
1 2 3 4 5 <dependency > <groupId > org.springframework.integration</groupId > <artifactId > spring-integration-mqtt</artifactId > <version > 5.1.3.RELEASE</version > </dependency >
(2)配置MQTT消费端,添加SpringConfig.java类,添加消息消费Bean。@ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息的channel。当接收到消息时,可以先拿到topic,然后根据不同的topic分别对消息进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 @Configuration @IntegrationComponentScan public class MqttConfiguration { private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class); @Value("${cloud-sdk.mqtt.inbound-topic: }") private String inboundTopic; @Resource private MqttPahoClientFactory mqttClientFactory; @Resource(name = ChannelName.INBOUND) private MessageChannel inboundChannel; @Bean public MqttPahoMessageDrivenChannelAdapter mqttInbound () { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter ( UUID.randomUUID().toString(), mqttClientFactory, inboundTopic.split("," )); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter (); converter.setPayloadAsBytes(true ); adapter.setConverter(converter); adapter.setQos(1 ); adapter.setOutputChannel(inboundChannel); return adapter; } @Bean @ServiceActivator(inputChannel = ChannelName.OUTBOUND) public MessageHandler mqttOutbound () { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler ( UUID.randomUUID().toString(), mqttClientFactory); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter (); converter.setPayloadAsBytes(true ); messageHandler.setAsync(true ); messageHandler.setDefaultQos(0 ); messageHandler.setConverter(converter); return messageHandler; } @Bean @ServiceActivator(inputChannel = ChannelName.DEFAULT) public MessageHandler defaultInboundHandler () { return message -> { log.info("The default channel does not handle messages." + "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) + "\nPayload: " + message.getPayload() + "\n" ); }; } }
(3)配置MQTT消息发送端,@MessagingGateway是一个用于提供消息网关代理整合的注解,参数defaultRequestChannel指定发送消息绑定的channel。 这里我其实没有做具体的区分,只是借用了大疆里面的一些代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Component @MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND) public interface IMqttMessageGateway { void publish (@Header(MqttHeaders.TOPIC) String topic, byte [] payload) ; void publish (@Header(MqttHeaders.TOPIC) String topic, byte [] payload, @Header(MqttHeaders.QOS) int qos) ; }
(4)创建Rest Controller,通过http请求发送MQTT消息。
参考文章: 1.Spring 整合MQTT @ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息的channel。当接收到消息时,可以先拿到topic,然后根据不同的topic分别对消息进行处理。
问题 (1) 速度特别的慢
参考文章: 【1】.MQTT 为什么有时候很慢? 1.网络问题;2.QoS级别设置过高;3.大量连接;4.执行复杂的订阅操作;5.低性能设备; 【2】. 在Linux中,如何进行网络资源的优先级管理? 【3】.Linux 如何优化网络带宽?