客户端在调用Azure Event Hub并遇到连接错误时出现挂起情况。

huangapple go评论145阅读模式
英文:

Client hangs when calling Azure Event Hub and facing connection error

问题

我想发送事件消息到Azure事件中心。我注意到,如果我配置错误,我的应用程序会挂起而不终止。

我编写了一个非常简单的Java类,试图发送事件消息到事件中心。如果我错误地输入事件中心的终结点,应用程序会挂起。这相当令人失望。

我可能误解了一些东西,但我想要做的是发送一条简单的消息,仅此而已。我该怎么做?

ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
connectionStringBuilder
        .setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
        .setTransportType(TransportType.AMQP_WEB_SOCKETS)
        .setSasKeyName("XXX")
        .setSasKey("XXX");
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final EventHubClient ehClient =
        EventHubClient.createFromConnectionStringSync(
                connectionStringBuilder.toString(),
                RetryPolicy.getNoRetry(),
                scheduledExecutorService
        );
ehClient.sendSync(EventData.create("Test Message".getBytes()));
ehClient.closeSync();
scheduledExecutorService.shutdown();

我使用以下依赖:

compile "com.microsoft.azure:azure-eventhubs:3.2.0"

我会感激任何帮助!谢谢!

英文:

I want to send event messages to Azure Event Hub. I noticed if I misconfigured something then my app hangs and not terminates.
I wrote a very simple Java class that tries to send event message to the Event Hub. If I mistype the endpoint of the Event Hub then the app hangs. Which is pretty disappointing.

There is a chance that I misunderstand something but what I want to do is to send a simple message and that's all. How can I do that?

    ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
    connectionStringBuilder
            .setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
            .setTransportType(TransportType.AMQP_WEB_SOCKETS)
            .setSasKeyName("XXX")
            .setSasKey("XXX");
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    final EventHubClient ehClient =
            EventHubClient.createFromConnectionStringSync(
                    connectionStringBuilder.toString(),
                    RetryPolicy.getNoRetry(),
                    scheduledExecutorService
            );
    ehClient.sendSync(EventData.create("Test Message".getBytes()));
    ehClient.closeSync();
    scheduledExecutorService.shutdown();

I use the following dependency:

    compile "com.microsoft.azure:azure-eventhubs:3.2.0"

I'd appreciate any help!
Thanks!

答案1

得分: 1

I believe the reason for your hang is the executor is not getting a chance to shutdown in case of an error. You should wrap the code within try finally like below:

ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
connectionStringBuilder
        .setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
        .setTransportType(TransportType.AMQP_WEB_SOCKETS)
        .setSasKeyName("XXX")
        .setSasKey("XXX");
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
EventHubClient ehClient = null;
try {
    ehClient =
            EventHubClient.createFromConnectionStringSync(
                    connectionStringBuilder.toString(),
                    RetryPolicy.getNoRetry(),
                    scheduledExecutorService
            );
    ehClient.sendSync(EventData.create("Test Message".getBytes()));
}
finally {
    if (ehClient != null)
       ehClient.closeSync();
    scheduledExecutorService.shutdown();
}

N.B. Your code is using the old azure-eventhubs package (Event Hub v3) as mentioned in this tutorial. The latest package azure-messaging-eventhubs (Event Hub v5 using Producer/Consumer pattern) has slightly different APIs described in this tutorial. You should use the new SDK if it's fresh development.

import com.azure.messaging.eventhubs.*;

public class Sender {
    public static void main(String[] args) {
        final String connectionString = "EVENT HUBS NAMESPACE CONNECTION STRING";
        final String eventHubName = "EVENT HUB NAME";

        // create a producer using the namespace connection string and event hub name
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // prepare a batch of events to send to the event hub    
        EventDataBatch batch = producer.createBatch();
        batch.tryAdd(new EventData("First event"));
        batch.tryAdd(new EventData("Second event"));
        batch.tryAdd(new EventData("Third event"));
        batch.tryAdd(new EventData("Fourth event"));
        batch.tryAdd(new EventData("Fifth event"));

        // send the batch of events to the event hub
        producer.send(batch);

        // close the producer
        producer.close();
    }
}

On a further note, there is also a migration guide from v3 to v5 here. Even with the old package, I could not reproduce your hang issue using either Executors.newScheduledThreadPool(4) or Executors.newSingleThreadScheduledExecutor() when closing the Executor gracefully as mentioned in the beginning.

英文:

EDIT:

I believe the reason for your hang is the executor is not getting chance to shutdown in case of error. You should wrap the code within try finally like below:

ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
    connectionStringBuilder
            .setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
            .setTransportType(TransportType.AMQP_WEB_SOCKETS)
            .setSasKeyName("XXX")
            .setSasKey("XXX");
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
EventHubClient ehClient = null;
try {
    ehClient =
            EventHubClient.createFromConnectionStringSync(
                    connectionStringBuilder.toString(),
                    RetryPolicy.getNoRetry(),
                    scheduledExecutorService
            );
    ehClient.sendSync(EventData.create("Test Message".getBytes()));
}
finally {
    if (ehClient != null)
       ehClient.closeSync();
    scheduledExecutorService.shutdown();
}

N.B. Your code code is using the old azure-eventhubs package (Event Hub v3) as mentioned in this tutorial. The latest package azure-messaging-eventhubs (Event Hub v5 using Producer/Consumer pattern) has bit different APIs which is described in this tutorial. You should use the new SDK if it's fresh development.

import com.azure.messaging.eventhubs.*;

public class Sender {
    public static void main(String[] args) {
        final String connectionString = "EVENT HUBS NAMESPACE CONNECTION STRING";
        final String eventHubName = "EVENT HUB NAME";

        // create a producer using the namespace connection string and event hub name
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // prepare a batch of events to send to the event hub    
        EventDataBatch batch = producer.createBatch();
        batch.tryAdd(new EventData("First event"));
        batch.tryAdd(new EventData("Second event"));
        batch.tryAdd(new EventData("Third event"));
        batch.tryAdd(new EventData("Fourth event"));
        batch.tryAdd(new EventData("Fifth event"));

        // send the batch of events to the event hub
        producer.send(batch);

        // close the producer
        producer.close();
    }
}

On further note, there is also a migration guide from v3 to v5 here.

Even with old package, I could not reproduce your hang issue using either Executors.newScheduledThreadPool(4) or Executors.newSingleThreadScheduledExecutor() when closed Executor gracefully as mentioned in the beginning. If I give wrong connection string by mistake, it immediately throws exception: Exception in thread "main" com.microsoft.azure.eventhubs.CommunicationException: A communication error has occurred. This may be due to an incorrect host name in your connection string or a problem with your network connection.

答案2

得分: 0

I use Maven to create a java project, then add dependency in pom.xml:

我使用Maven创建一个Java项目,然后在pom.xml中添加依赖项:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.6</version>
</dependency>

And This is the code to send the event message:

这是发送事件消息的代码:

package test;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class App
{
    public static void main( String[] args ) throws Exception
    {
        final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
                .setNamespaceName("testbowman")
                .setEventHubName("test")
                .setSasKeyName("testbowman")
                .setSasKey("xxxxxx");
        final Gson gson = new GsonBuilder().create();
        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
        final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);

        try {
            for (int i = 0; i < 10; i++) {
                String payload = "Message " + Integer.toString(i);
                byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
                EventData sendEvent = EventData.create(payloadBytes);
                ehClient.sendSync(sendEvent);
            }
            System.out.println(Instant.now() + ": Send Complete...");
            System.out.println("Press Enter to stop.");
            System.in.read();
        } finally {
            ehClient.closeSync();
            executorService.shutdown();
        }
        System.out.println( "Hello World!" );
        System.out.println( "!!!!!!!!!!!!!" );
    }
}

At last, I can see the messages come in on the metrics(Can not see immediately, need wait a few time.):

最后,我可以在度量标准上看到消息进来(不能立即看到,需要等待一段时间):

客户端在调用Azure Event Hub并遇到连接错误时出现挂起情况。

This is the official doc:

这是官方文档:

https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send#write-code-to-send-events

英文:

I use Maven to create a java project, then add dependency in pom.xml:

&lt;dependency&gt;
&lt;groupId&gt;com.microsoft.azure&lt;/groupId&gt;
&lt;artifactId&gt;azure-eventhubs&lt;/artifactId&gt;
&lt;version&gt;2.2.0&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.google.code.gson&lt;/groupId&gt;
&lt;artifactId&gt;gson&lt;/artifactId&gt;
&lt;version&gt;2.8.6&lt;/version&gt;
&lt;/dependency&gt;

And This is the code to send the event message:

package test;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class App
{
public static void main( String[] args ) throws Exception
{
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName(&quot;testbowman&quot;)
.setEventHubName(&quot;test&quot;)
.setSasKeyName(&quot;testbowman&quot;)
.setSasKey(&quot;xxxxxx&quot;);
final Gson gson = new GsonBuilder().create();
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);
try {
for (int i = 0; i &lt; 10; i++) {
String payload = &quot;Message &quot; + Integer.toString(i);
byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
EventData sendEvent = EventData.create(payloadBytes);
ehClient.sendSync(sendEvent);
}
System.out.println(Instant.now() + &quot;: Send Complete...&quot;);
System.out.println(&quot;Press Enter to stop.&quot;);
System.in.read();
} finally {
ehClient.closeSync();
executorService.shutdown();
}
System.out.println( &quot;Hello World!&quot; );
System.out.println( &quot;!!!!!!!!!!!!!&quot; );
}
}

(I hide the sas key, I think you know where to get the sas key.:))

At last, I can see the messages come in on the metrics(Can not see immediately, need wait a few time.):

客户端在调用Azure Event Hub并遇到连接错误时出现挂起情况。

This is the offcial doc:

https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send#write-code-to-send-events

huangapple
  • 本文由 发表于 2020年7月23日 00:33:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/63039023.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定