在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。

huangapple go评论55阅读模式
英文:

Connection to rabbitmq failed when deploying spring cloud data flow in the local

问题

我参考了文档:https://dataflow.spring.io/docs/installation/local/manual/ 在我的本地 Mac 上运行 Spring Cloud Data Flow,并根据 https://dataflow.spring.io/docs/stream-developer-guides/getting-started/stream/ 部署了一个 http | log 流。

但是在“验证输出(本地)”步骤(https://dataflow.spring.io/docs/stream-developer-guides/getting-started/stream/#local)中,我失败了。

我可以在日志中看到很多连接错误:

在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。

我所采取的步骤总结如下:

  • 在终端1中,通过 docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3.7 启动 RabbitMQ。
  • 然后在终端1中,通过 java -jar spring-cloud-skipper-server-2.9.2.jar 启动 Skipper 服务器。
  • 然后在终端2中,通过 java -jar spring-cloud-dataflow-server-2.10.2.jar 启动 Data Flow 服务器。
  • 然后在终端3中,通过 java -jar spring-cloud-dataflow-shell-2.10.2.jar 启动 Data Flow Shell。
  • 打开 http://localhost:9393/dashboard/index.html#/streams/list/http-ingest 仪表板来创建流并部署。
  • 然后在终端4中,发送 curl http://localhost:20100 -H "Content-type: text/plain" -d "Happy streaming"。请求成功发送,但没有响应体。

附加信息

我可以从 localhost:20100 获取成功的响应,如下所示:
在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。

Docker 容器如下所示:
在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。

预期结果(需要帮助!)

我可以按照文档中的建议验证输出。

在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。

实际结果:

但是我没有得到日志。相反,我得到了以下结果:

2023-08-09 03:25:47.057  INFO [http-source,2aa0ce95298cd6b1,db9808ed2bfa4443] 1 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2023-08-09 03:26:47.705 ERROR [http-source,2aa0ce95298cd6b1,f116c3c77334209a] 1 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@7a71c2e3]; nested exception is org.springframework.amqp.AmqpIOException: java.net.SocketTimeoutException: connect timed out, failedMessage=GenericMessage [payload=byte[15], headers={content-length=15, http_requestMethod=POST, b3=2aa0ce95298cd6b1-db9808ed2bfa4443-0, nativeHeaders={}, host=localhost:20100, http_requestUrl=http://localhost:20100/, id=1bd16f86-c76a-fb4c-4362-6df88325daf7, contentType=text/plain, user-agent=curl/7.82.0, accept=*/*, timestamp=1691551546926}]
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1074)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
	at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
	at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
	at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
	at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:296)
	at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:277)
	at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
	at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
	at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
	at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
	at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
	at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
	at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
	at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$5(FluxMessageChannel.java:123)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
	at org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth.lambda$null$6(ReactorSleuth.java:324)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.springframework.amqp.AmqpIOException: java.net.SocketTimeoutException: connect timed out
	at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:70)
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:725)
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:249)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2173)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2146)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1072)
	at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.doRabbitSend(AmqpOutboundEndpoint.java:250)
	at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:231)
	at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:180)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
	... 43 more
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
	at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
	at java.base/java.net.SocksSocketImpl.connect(Unknown Source)
	at java.base/java.net.Socket.connect(Unknown Source)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1223)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1173)
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectAddresses(AbstractConnectionFactory.java:640)
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:615)
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565)
	... 53 more
英文:

I referenced the documentation: https://dataflow.spring.io/docs/installation/local/manual/ to run the spring cloud data flow in my local mac, and then deployed an http | log stream according to https://dataflow.spring.io/docs/stream-developer-guides/getting-started/stream/.

But during the "verifying output (local)" step (https://dataflow.spring.io/docs/stream-developer-guides/getting-started/stream/#local), I failed.

I can see there are a lot of connection error in the log:

在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。

Summary, the steps I took:

  • In terminal 1, start rabbitmq by docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3.7
  • Then in terminal 1, start skipper server by java -jar spring-cloud-skipper-server-2.9.2.jar在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。
  • Then in terminal 2, start dataflow server by java -jar spring-cloud-dataflow-server-2.10.2.jar在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。
  • Then in terminal 3, start dataflow shell by java -jar spring-cloud-dataflow-shell-2.10.2.jar在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。
  • Open http://localhost:9393/dashboard/index.html#/streams/list/http-ingest dashboard to create stream and deploy
  • Then in terminal 4, send curl http://localhost:20100 -H "Content-type: text/plain" -d "Happy streaming". The request was successfully sent, but no response body.

Additional information

I can get success response from localhost:20100 like:
在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。

The docker containers are as follows:
在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。

Expected (Help wanted!)

I can verify the output as suggested by the documentation.

在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。

Actual result:

But I didn't get the log. Instead, I get the following:

2023-08-09 03:25:47.057  INFO [http-source,2aa0ce95298cd6b1,db9808ed2bfa4443] 1 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2023-08-09 03:26:47.705 ERROR [http-source,2aa0ce95298cd6b1,f116c3c77334209a] 1 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@7a71c2e3]; nested exception is org.springframework.amqp.AmqpIOException: java.net.SocketTimeoutException: connect timed out, failedMessage=GenericMessage [payload=byte[15], headers={content-length=15, http_requestMethod=POST, b3=2aa0ce95298cd6b1-db9808ed2bfa4443-0, nativeHeaders={}, host=localhost:20100, http_requestUrl=http://localhost:20100/, id=1bd16f86-c76a-fb4c-4362-6df88325daf7, contentType=text/plain, user-agent=curl/7.82.0, accept=*/*, timestamp=1691551546926}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1074)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:296)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:277)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$5(FluxMessageChannel.java:123)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth.lambda$null$6(ReactorSleuth.java:324)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.springframework.amqp.AmqpIOException: java.net.SocketTimeoutException: connect timed out
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:70)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:602)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:725)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:249)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2173)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2146)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1072)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.doRabbitSend(AmqpOutboundEndpoint.java:250)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:231)
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:180)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 43 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.base/java.net.SocksSocketImpl.connect(Unknown Source)
at java.base/java.net.Socket.connect(Unknown Source)
at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1223)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1173)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connectAddresses(AbstractConnectionFactory.java:640)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:615)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:565)
... 53 more

答案1

得分: 0

通过使用主机IP连接到rabbitmq进行修复,因为它是由docker运行的。

但是我按照文档使用docker运行它,导致了连接问题。我认为官方文档应该进行更新。

要获取主机IP,在Mac OS X上,您可以使用以下命令:

ipconfig getifaddr en0
192.168.1.10

然后在部署流程中将其配置到应用程序中。

app.http.spring.rabbitmq.host=192.168.1.10
app.log.spring.rabbitmq.host=192.168.1.10
英文:

Fixed by connecting to the rabbitmq using host IP as it's run by docker.

But I followed the documentation to run it using docker, which lead to connection issues. I think the official document should be updated.
在本地部署Spring Cloud Data Flow时,与RabbitMQ的连接失败。

To get host IP, for mac osx you can use

ipconfig getifaddr en0
192.168.1.10

And then configure it into the apps during deploying stream step.

app.http.spring.rabbitmq.host=192.168.1.10
app.log.spring.rabbitmq.host=192.168.1.10

huangapple
  • 本文由 发表于 2023年8月9日 12:13:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/76864528.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定