无法为需要会话的实体创建非会话消息接收器以用于 Azure IMessageReceiver。

huangapple go评论91阅读模式
英文:

It is not possible for an entity that requires sessions to create a non-sessionful message receiver for azure IMessageReceiver

问题

我正在尝试通过Java应用程序从Azure Service Bus获取消息。我已经创建了必要的客户端配置,例如通过ManagementClient成功建立了连接。

@Bean
public ClientSettings getMessageReceiver() throws ServiceBusException, InterruptedException {

    AzureTokenCredentials azureTokenCredentials = new ApplicationTokenCredentials(
            "clientID",
            "domain",
            "secret",
            AzureEnvironment.AZURE
    );

    TokenProvider tokenProvider = TokenProvider.createAzureActiveDirectoryTokenProvider(
            new AzureAuthentication(azureTokenCredentials),
            AzureEnvironment.AZURE.activeDirectoryEndpoint(),
            null
    );

    ClientSettings clientSettings = new ClientSettings(tokenProvider,
            RetryPolicy.getDefault(),
            Duration.ofSeconds(30),
            TransportType.AMQP);

    return clientSettings;
}

ManagementClient managementClient =
            new ManagementClient(Util.convertNamespaceToEndPointURI("namespace"),
                    clientSettings);
managementClient.getTopics();

但是当我尝试从特定主题获取消息时:

SubscriptionClient subscriptionClient = new SubscriptionClient("namespace", "events/subscriptions/subscription", clientSettings, ReceiveMode.PEEKLOCK);

我收到了一个错误消息:

无法为需要会话的实体创建非会话消息接收器。

需要提供哪些额外的步骤?

英文:

I'm trying to get messages from Azure Service Bus via java application. I created necessary client config and for example there was successful connection through ManagementClient

@Bean
public ClientSettings getMessageReceiver() throws ServiceBusException, InterruptedException {

    AzureTokenCredentials azureTokenCredentials = new ApplicationTokenCredentials(
            "clientID,
            "domain",
            "secret",
            AzureEnvironment.AZURE
    );

    TokenProvider tokenProvider = TokenProvider.createAzureActiveDirectoryTokenProvider(
            new AzureAuthentication(azureTokenCredentials),
            AzureEnvironment.AZURE.activeDirectoryEndpoint(),
            null
    );

    ClientSettings clientSettings = new ClientSettings(tokenProvider,
            RetryPolicy.getDefault(),
            Duration.ofSeconds(30),
            TransportType.AMQP);

    return clientSettings;
}

ManagementClient managementClient =
            new ManagementClient(Util.convertNamespaceToEndPointURI("namespace"),
                    clientSettings);
    managementClient.getTopics();

But when I try to get messages from particular topic:

        SubscriptionClient subscriptionClient = new SubscriptionClient("namespace", "events/subscriptions/subscription", clientSettings, ReceiveMode.PEEKLOCK);

And got an error message:

> It is not possible for an entity that requires sessions to create a non-sessionful message receiver.

What additional steps should be provided?

答案1

得分: 2

你在创建主题订阅时启用了会话(默认情况下禁用)。如果您不需要消息会话,请重新创建订阅并禁用 'requires session' 属性(注意:一旦创建订阅,就无法更改该属性)。

或者,如果您确实需要消息会话,请像下面这样更新您的代码,首先接收会话,然后从接收到的会话中接收消息。所有代码示例可以在此处找到,特定的会话示例可以在此处找到。

        // 连接字符串的值可以通过以下方式获得:
        // 1. 在 Azure 门户中转到您的 Service Bus 命名空间。
        // 2. 转到“共享访问策略”。
        // 3. 复制“RootManageSharedAccessKey”策略的连接字符串。
        String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};"
            + "SharedAccessKey={key}";

        // 创建接收器。
        // "<<topic-name>>" 将是您在 Service Bus 命名空间中创建的 Service Bus 主题的名称。
        // "<<subscription-name>>" 将是启用会话的订阅的名称。
        ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .sessionReceiver()
            .receiveMode(ReceiveMode.PEEK_LOCK)
            .topicName("<<topic-name>>")
            .subscriptionName("<<subscription-name>>")
            .buildAsyncClient();

        Disposable subscription = receiver.receiveMessages()
            .flatMap(context -> {
                if (context.hasError()) {
                    System.out.printf("An error occurred in session %s. Error: %s%n",
                        context.getSessionId(), context.getThrowable());
                    return Mono.empty();
                }

                System.out.println("Processing message from session: " + context.getSessionId());

                // 处理消息
                return receiver.complete(context.getMessage());
            }).subscribe(aVoid -> {
            }, error -> System.err.println("Error occurred: " + error));

        // 订阅不是阻塞调用,因此我们在这里休眠,以防止程序结束。
        TimeUnit.SECONDS.sleep(60);

        // 释放订阅将取消 receive() 操作。
        subscription.dispose();

        // 关闭接收器。
        receiver.close();
英文:

You have enabled Session (by default disabled) while creating in your topic subscription. If you do not need message session, recreate the subscription with 'requires session' disabled (NOTE: you cannot change that property once a subscription is created).

Or, if you really need message session, update your code like below to receive session first, and from received session, receive messages. All the code samples can be found here and the session sample specifically here.

        // The connection string value can be obtained by:
        // 1. Going to your Service Bus namespace in Azure Portal.
        // 2. Go to &quot;Shared access policies&quot;
        // 3. Copy the connection string for the &quot;RootManageSharedAccessKey&quot; policy.
        String connectionString = &quot;Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};&quot;
            + &quot;SharedAccessKey={key}&quot;;

        // Create a receiver.
        // &quot;&lt;&lt;topic-name&gt;&gt;&quot; will be the name of the Service Bus topic you created inside the Service Bus namespace.
        // &quot;&lt;&lt;subscription-name&gt;&gt;&quot; will be the name of the session-enabled subscription.
        ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .sessionReceiver()
            .receiveMode(ReceiveMode.PEEK_LOCK)
            .topicName(&quot;&lt;&lt;topic-name&gt;&gt;&quot;)
            .subscriptionName(&quot;&lt;&lt;subscription-name&gt;&gt;&quot;)
            .buildAsyncClient();

        Disposable subscription = receiver.receiveMessages()
            .flatMap(context -&gt; {
                if (context.hasError()) {
                    System.out.printf(&quot;An error occurred in session %s. Error: %s%n&quot;,
                        context.getSessionId(), context.getThrowable());
                    return Mono.empty();
                }

                System.out.println(&quot;Processing message from session: &quot; + context.getSessionId());

                // Process message
                return receiver.complete(context.getMessage());
            }).subscribe(aVoid -&gt; {
            }, error -&gt; System.err.println(&quot;Error occurred: &quot; + error));

        // Subscribe is not a blocking call so we sleep here so the program does not end.
        TimeUnit.SECONDS.sleep(60);

        // Disposing of the subscription will cancel the receive() operation.
        subscription.dispose();

        // Close the receiver.
        receiver.close();

huangapple
  • 本文由 发表于 2020年10月16日 18:01:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/64387019.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定