如何在Apache Pulsar中使用spring-pulsar-spring-boot-starter消费JSON对象?

huangapple go评论65阅读模式
英文:

How to consume JSON object in Apache Pulsar using spring-pulsar-spring-boot-starter?

问题

我正在尝试使用Spring Boot和Apache Pulsar实现一个简单的消息队列。当使用String数据类型时,系统运行正常,但对于自定义对象则不然。以下是我使用的对象以及生产者和消费者的代码:

自定义对象:

package com.example.springpulsar;
public class User {

    private String email;
    private String firstName;

    // 构造函数、getter和setter
}

生产者:

package com.example.springpulsar;

// 导入语句

@Component
public class PulsarProducer {

    @Autowired
    private PulsarTemplate<User> template;

    private static final String USER_TOPIC = "user-topic";

    public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
        template.send(USER_TOPIC, user);
    }
}

消费者:

package com.example.springpulsar;

// 导入语句

@Service
public class PulsarConsumer {

    private static final String USER_TOPIC = "user-topic";

    private final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumer.class);

    @PulsarListener(
            subscriptionName = "user-topic-subscription",
            topics = USER_TOPIC,
            subscriptionType = SubscriptionType.Shared,
            schemaType = SchemaType.JSON
    )
    public void userTopicListener(User user) {
        LOGGER.info("Received user object with email: {}", user.getEmail());
    }
}

Gradle依赖: org.springframework.pulsar:spring-pulsar-spring-boot-starter:0.2.0

应用程序属性:

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.example.springpulsar.User
          schema-info:
            schema-type: JSON

我曾经在某处看到,添加Listener注解中的schemaType应该起作用,但似乎并没有。调试日志:

2023-06-20T14:41:24.050+05:30 DEBUG 32777 --- [nio-8085-exec-5] o.a.p.c.impl.BatchMessageContainerImpl   : [user-topic] [null] add message to batch, num messages in batch so far 0
2023-06-20T14:41:24.053+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl      : [user-topic] [standalone-1-21] Sending message cnx org.apache.pulsar.client.impl.ClientCnx@6f18e7a7, sequenceId 1
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.p.common.protocol.PulsarDecoder      : [localhost/127.0.0.1:6650] Received cmd SEND_RECEIPT
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.apache.pulsar.client.impl.ClientCnx    : [id: 0xbf5c5f2c, L:/127.0.0.1:48676 - R:localhost/127.0.0.1:6650] Got receipt for producer: 0 -- msg: 1 -- id: 22:42
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl      : [user-topic] [standalone-1-21] Received ack for msg 1 
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.p.common.protocol.PulsarDecoder      : [localhost/127.0.0.1:6650] Received cmd MESSAGE
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.apache.pulsar.client.impl.ClientCnx    : [id: 0xbf5c5f2c, L:/127.0.0.1:48676 - R:localhost/127.0.0.1:6650] Received a message from the server: org.apache.pulsar.common.api.proto.CommandMessage@697f429b
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ConsumerImpl      : [user-topic][user-topic-subscription] Received message: 22/42

我看到ConsumerImpl收到了消息,但监听器方法从未被调用。希望能有所帮助。谢谢!

英文:

I am trying to implement a simple messaging queue using Spring Boot and Apache Pulsar. The system works fine when working with String datatype but not for a Custom Object. Below are the object I'm using as well as the producer and consumer:

Custom Object:

package com.example.springpulsar;
public class User {

   private String email;
   private String firstName;

// constructors, getters and setters
}

Producer:

package com.example.springpulsar;

// imports

@Component
public class PulsarProducer {

    @Autowired
    private PulsarTemplate&lt;User&gt; template;

    private static final String USER_TOPIC = &quot;user-topic&quot;;

    public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
        template.send(USER_TOPIC, user);
    }
}

Consumer:

package com.example.springpulsar;

// imports

@Service
public class PulsarConsumer {

    private static final String USER_TOPIC = &quot;user-topic&quot;;
   
    private final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumer.class);

    @PulsarListener(
            subscriptionName = &quot;user-topic-subscription&quot;,
            topics = USER_TOPIC,
            subscriptionType = SubscriptionType.Shared,
            schemaType = SchemaType.JSON
    )
    public void userTopicListener(User user) {
        LOGGER.info(&quot;Received user object with email: {}&quot;, user.getEmail());
    }
}

Gradle dependency: org.springframework.pulsar:spring-pulsar-spring-boot-starter:0.2.0

Application properties:

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.example.springpulsar.User
          schema-info:
            schema-type: JSON

I read somewhere that adding the schemaType in the Listener annotation should work, but it hasn't. Debug logs:

2023-06-20T14:41:24.050+05:30 DEBUG 32777 --- [nio-8085-exec-5] o.a.p.c.impl.BatchMessageContainerImpl   : [user-topic] [null] add message to batch, num messages in batch so far 0
2023-06-20T14:41:24.053+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl      : [user-topic] [standalone-1-21] Sending message cnx org.apache.pulsar.client.impl.ClientCnx@6f18e7a7, sequenceId 1
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.p.common.protocol.PulsarDecoder      : [localhost/127.0.0.1:6650] Received cmd SEND_RECEIPT
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.apache.pulsar.client.impl.ClientCnx    : [id: 0xbf5c5f2c, L:/127.0.0.1:48676 - R:localhost/127.0.0.1:6650] Got receipt for producer: 0 -- msg: 1 -- id: 22:42
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl      : [user-topic] [standalone-1-21] Received ack for msg 1 
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.p.common.protocol.PulsarDecoder      : [localhost/127.0.0.1:6650] Received cmd MESSAGE
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.apache.pulsar.client.impl.ClientCnx    : [id: 0xbf5c5f2c, L:/127.0.0.1:48676 - R:localhost/127.0.0.1:6650] Received a message from the server: org.apache.pulsar.common.api.proto.CommandMessage@697f429b
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ConsumerImpl      : [user-topic][user-topic-subscription] Received message: 22/42

I do see that the ConsumerImpl receives the message, but the listener method is never called.

Any help is appreciated. Thank you!

答案1

得分: 1

问题是因为您的领域对象缺少默认构造函数,因此Jackson引发了内部异常。请参考Jackson文档以获取有关此主题的更多详细信息。您可以为您的User领域类添加一个默认构造函数。或者,您也可以将您的领域类切换为Java记录,如下所示:

public record User(String email, String firstName){}

发生的情况是,在内部,框架捕获了异常,并将其解释为否定消息(nack)的信号。每个nack的消息默认情况下将在60秒内重新传递给Pulsar消费者一次。这就是为什么您在控制台上没有看到任何异常堆栈跟踪的原因。我们可以添加调试日志以使这些错误更早地显现出来。

您还可以为消费者添加死信策略,以使失败的消息不会无限制地每分钟重新传递。有关更多详细信息,请参阅此文档部分:https://docs.spring.io/spring-pulsar/docs/current-SNAPSHOT/reference/html/#_using_dead_letter_topic_from_apache_pulsar_for_message_redelivery_and_error_handling

然而,在您的情况下,除非在此处建议的修复措施中,否则所有消息都将失败,因为基础领域对象类存在反序列化问题。

英文:

The problem is because of your domain object missing a default constructor and thus Jackson throws an internal exception. Please refer to Jackson docs for more details on this topic. You can add a default constructor to your User domain class. Alternatively, you can also switch your domain class to a Java record as below:

public record User(String email, String firstName){}

What was happening was that internally, the exception was caught by the framework and interpret that as a signal for negatively acknowledging the message (nack). Each nack'ed message will be redelivered to the Pulsar consumer every 60 seconds by default. This is why you were not seeing any exception stack trace on the console. We can add a debug log to make these errors come to surface sooner.

You can also add dead letter policy for the consumer so that the failed message doesn't get redelivered every minute infinitely. See this docs section for more details on that: https://docs.spring.io/spring-pulsar/docs/current-SNAPSHOT/reference/html/#_using_dead_letter_topic_from_apache_pulsar_for_message_redelivery_and_error_handling
However, in your case, all the messages would fail without the suggested fixes here, since the underlying domain object class has deserialization issues.

huangapple
  • 本文由 发表于 2023年6月22日 19:06:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/76531256.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定