MQTT与SpringBoot集成连接丢失

huangapple go评论83阅读模式
英文:

MQTT and SpringBoot Integration Connection Lost

问题

我目前在SpringBoot中有一个API,我想添加一个MQTT客户端来订阅一个或多个主题。
我尝试了几个Paho和Hive客户端,但都没有成功,我目前正在使用SpringBoot的默认MQTT,它使用了Paho,但即使使用基本配置,我也无法使其工作。
当我启动应用程序时,我会收到一个“连接丢失”的错误...
你能告诉我修复方法或其他可行的方法吗?谢谢!

Maven依赖:
```xml
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

......

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import lombok.extern.slf4j.Slf4j;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Slf4j
@SpringBootApplication
@EnableSwagger2
public class MainApiSpring {

public static void main(String[] args) {
    SpringApplication.run(MainApiSpring.class, args);
    log.trace("L'application a correctement été démarrée.");
}

@Bean
public MessageChannel mqttInputChannel() {
    return new DirectChannel();
}

@Bean
public MessageProducer inbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
            "test/topic");
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);

    adapter.setOutputChannel(mqttInputChannel());
    return adapter;
}

@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }
    };
}

}


运行时的错误:
```plaintext
2020-09-04 10:31:39.099 ERROR 4244 --- [           main] .m.i.MqttPahoMessageDrivenChannelAdapter : Exception while connecting and subscribing, retrying

org.eclipse.paho.client.mqttv3.MqttException: Connexion perdue
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
    at java.base/java.lang.Thread.run(Thread.java:830) ~[na:na]
Caused by: java.io.EOFException: null
    at java.base/java.io.DataInputStream.readByte(DataInputStream.java:272) ~[na:na]
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
    ... 1 common frames omitted

更多日志信息


<details>
<summary>英文:</summary>

I currently have an API in SpringBoot and I would like to add an MQTT client to subscribe to one or more topics.
I tried several Paho,Hive clients, without success, I&#39;m currently on the default MQTT of SpringBoot which uses Paho but I can&#39;t get it to work even with the basic configuration.
I get a &quot;Connection Lost&quot; error as soon as I launch the application...
Can you tell me a fix or something else that would work. Thanks!
[![Mosquitto is running on windows for testing][1]][1]

Maven : 
    &lt;dependency&gt;
	    &lt;groupId&gt;org.springframework.integration&lt;/groupId&gt;
	    &lt;artifactId&gt;spring-integration-mqtt&lt;/artifactId&gt;		   
	&lt;/dependency&gt;

....

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import lombok.extern.slf4j.Slf4j;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Slf4j
@SpringBootApplication
@EnableSwagger2
public class MainApiSpring {

public static void main(String[] args) {

	SpringApplication.run(MainApiSpring.class, args);
	log.trace(&quot;L&#39;application a correctement &#233;t&#233; d&#233;marr&#233;e.&quot;);

}

@Bean
public MessageChannel mqttInputChannel() {
	return new DirectChannel();
}

@Bean
public MessageProducer inbound() {
	MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(&quot;tcp://localhost:1883&quot;,
			&quot;test/topic&quot;);
	adapter.setCompletionTimeout(5000);
	adapter.setConverter(new DefaultPahoMessageConverter());
	adapter.setQos(1);

	adapter.setOutputChannel(mqttInputChannel());
	return adapter;
}

@Bean
@ServiceActivator(inputChannel = &quot;mqttInputChannel&quot;)
public MessageHandler handler() {
	return new MessageHandler() {

		@Override
		public void handleMessage(Message&lt;?&gt; message) throws MessagingException {
			System.out.println(message.getPayload());
		}

	};
}

}


The error on run : 

2020-09-04 10:31:39.099 ERROR 4244 --- [ main] .m.i.MqttPahoMessageDrivenChannelAdapter : Exception while connecting and subscribing, retrying

org.eclipse.paho.client.mqttv3.MqttException: Connexion perdue
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
at java.base/java.lang.Thread.run(Thread.java:830) ~[na:na]
Caused by: java.io.EOFException: null
at java.base/java.io.DataInputStream.readByte(DataInputStream.java:272) ~[na:na]
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
... 1 common frames omitted



[![More log][2]][2]


  [1]: https://i.stack.imgur.com/KZld6.png
  [2]: https://i.stack.imgur.com/G8bTg.png

</details>


# 答案1
**得分**: 1

```java
    @Bean
	public MqttConnectOptions getReceiverMqttConnectOptions() {
		MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
		mqttConnectOptions.setCleanSession(true);
		mqttConnectOptions.setConnectionTimeout(30);
		mqttConnectOptions.setKeepAliveInterval(60);
		mqttConnectOptions.setAutomaticReconnect(true);

//		mqttConnectOptions.setUserName("myemail");
		String password = "mypassword!";
//		String hostUrl = "tcp://maqiatto.com:1883";
		String hostUrl = "tcp://localhost:1883";
//		mqttConnectOptions.setPassword(password.toCharArray());
		mqttConnectOptions.setServerURIs(new String[] { hostUrl });
		return mqttConnectOptions;
	}

    @Bean
	public MqttPahoClientFactory mqttClientFactory() {
		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
		factory.setConnectionOptions(getReceiverMqttConnectOptions());
		return factory;
	}

    @Bean
	public MessageProducer inbound() {
		String clientId2 = "uuid-" + UUID.randomUUID().toString();
		MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId2,
//				mqttClientFactory(), "myemail/test");
				mqttClientFactory(), "test", "test/paho");
		adapter.setCompletionTimeout(20000);
		adapter.setConverter(new DefaultPahoMessageConverter());
		adapter.setQos(2);
		adapter.setOutputChannel(mqttInputChannel());
		return adapter;
	}
英文:

Answer : This works with MqttOptions defined !

    @Bean
	public MqttConnectOptions getReceiverMqttConnectOptions() {
		MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
		mqttConnectOptions.setCleanSession(true);
		mqttConnectOptions.setConnectionTimeout(30);
		mqttConnectOptions.setKeepAliveInterval(60);
		mqttConnectOptions.setAutomaticReconnect(true);

//		mqttConnectOptions.setUserName(&quot;myemail&quot;);
		String password = &quot;mypassword!&quot;;
//		String hostUrl = &quot;tcp://maqiatto.com:1883&quot;;
		String hostUrl = &quot;tcp://localhost:1883&quot;;
//		mqttConnectOptions.setPassword(password.toCharArray());
		mqttConnectOptions.setServerURIs(new String[] { hostUrl });
		return mqttConnectOptions;
	}

    @Bean
	public MqttPahoClientFactory mqttClientFactory() {
		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
		factory.setConnectionOptions(getReceiverMqttConnectOptions());
		return factory;
	}

    @Bean
	public MessageProducer inbound() {
		String clientId2 = &quot;uuid-&quot; + UUID.randomUUID().toString();
		MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId2,
//				mqttClientFactory(), &quot;myemail/test&quot;);
				mqttClientFactory(), &quot;test&quot;, &quot;test/paho&quot;);
		adapter.setCompletionTimeout(20000);
		adapter.setConverter(new DefaultPahoMessageConverter());
		adapter.setQos(2);
		adapter.setOutputChannel(mqttInputChannel());
		return adapter;
	}

huangapple
  • 本文由 发表于 2020年9月4日 16:43:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/63737765.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定