Kafka issue: MessageConversionException: Cannot convert from [java.lang.String] to [my_custom_model] for GenericMessage […]

huangapple go评论78阅读模式
英文:

Kafka issue: MessageConversionException: Cannot convert from [java.lang.String] to [my_custom_model] for GenericMessage [...]

问题

我有Kafka生产者和消费者服务器,当我尝试发送消息时,我收到以下异常:

org.springframework.kafka.listener.ListenerExecutionFailedException: 无法使用传入的消息调用监听器方法
端点处理程序详细信息
方法 [public void com.mail.sender.service.senders.GmailConfirmationSenderService.confirmationMessageListener(com.mail.sender.dto.request.AccountRequest)]
Bean [com.mail.sender.service.senders.GmailConfirmationSenderService@24787445]嵌套异常为 org.springframework.messaging.converter.MessageConversionException: 无法处理消息嵌套异常为 org.springframework.messaging.converter.MessageConversionException: 无法将 [java.lang.String] 转换为 [com.mail.sender.dto.request.AccountRequest]对于 GenericMessage [payload={"email":"ckopo.6ygy@gmail.com","username":"asdasd-mjeesh","confirmationTokenDetails":{"token":"3fd3c1ee-20ec-420b-8ee9-11d22cd7598e","createdAt":[2016,1,25,21,34,55],"expiredAt":[2023,1,8,18,19,6,661473300]}}, headers={kafka_offset=12, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4008ea0f, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=mail_confirmation_message, kafka_receivedTimestamp=1673193848466, __TypeId__=[B@68a9f1ab, kafka_groupId=account_confirmation_group_id}]

我的生产者服务器配置

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServersUrl;

    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(JsonSerializer.TYPE_MAPPINGS, "accountRequest:com.confirmation_token.model.dto.request.outgoing.AccountRequest");
        return props;
    }

    @Bean
    public ProducerFactory<String, AccountRequest> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, AccountRequest> kafkaTemplate(ProducerFactory<String, AccountRequest> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

我发送消息的方式

@Service
@Slf4j
@RequiredArgsConstructor
public class EmailConfirmationSenderServiceCommunication implements ConfirmationSender {
    private final KafkaTemplate<String, AccountRequest> kafkaTemplate;

    @Override
    public void sendConfirmationToken(AccountRequest accountRequest) {
        kafkaTemplate.send("mail_confirmation_message", accountRequest);
        log.info("Confirmation token={} has been send", accountRequest);
    }
}

生产者端的模型

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccountRequest {
    private String email;
    private String username;
    private ConfirmationTokenDetailsRequest confirmationTokenDetails;
}

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ConfirmationTokenDetailsRequest {
    private String token;
    private LocalDateTime createdAt;
    private LocalDateTime expiredAt;
}

我的消费者服务器配置

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServerUrl;

    public Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TYPE_MAPPINGS, "accountRequest:com.mail.sender.dto.request.AccountRequest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, AccountRequest> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, AccountRequest>> listenerContainerFactory(
            ConsumerFactory<String, AccountRequest> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, AccountRequest> listenerContainerFactory =
                new ConcurrentKafkaListenerContainerFactory<>();
        listenerContainerFactory.setConsumerFactory(consumerFactory);
        return listenerContainerFactory;
    }
}

主题的配置

@Configuration
public class KafkaTopicConfig {

    @Value("${kafka.topic.names.account.confirmation}")
    private String accountMailConfirmationTopicName;

    @Bean
    public NewTopic accountConfirmationTopic() {
        return TopicBuilder
                .name(accountMailConfirmationTopicName)
                .build();
    }
}

我尝试监听消息的方式

@Service
@Slf4j
@RequiredArgsConstructor
public class GmailConfirmationSenderService implements EmailSender<AccountRequest> {
    private final String accountConfirmationTemplate;
    private final ApplicationModelValidator applicationModelValidator;
    private final JavaMailSender mailSender;

    @Value("${confirmation.link.template}")
    private String confirmationLink;

    @KafkaListener(topics = "mail_confirmation_message", groupId = "account_confirmation_group_id")
    public void confirmationMessageListener(AccountRequest accountRequest) {
        String validationViolations = applicationModelValidator.validate(accountRequest);
        if (!validationViolations.isBlank()) {
            log.error(validationViolations);
            throw new ModelValidationException(validationViolations);
        }

        String userConfirmationLink = String.format(
                accountConfirmationTemplate,
                accountRequest.getUsername(),
                confirmationLink + accountRequest.getConfirmationTokenDetails().getToken());
        this.send(accountRequest, userConfirmationLink);
    }
    
    ...
}

消费者端的模型

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccountRequest {
    @Email
    private String email;

    @Length(min = 8, max = 40)
    private String username;

    @NotNull
    @Valid
    private ConfirmationTokenDetailsRequest confirmationTokenDetails;
}

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ConfirmationTokenDetailsRequest {
    @NotBlank
    private String token;

    @PastOrPresent
    private LocalDateTime createdAt;

    @Future
    private LocalDateTime expiredAt;
}

*在消费者模型中有用于验证的注解,但它们不会影响流程。

英文:

I have Kafka producer and consumer servers, when I try to send a message I get following exception:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mail.sender.service.senders.GmailConfirmationSenderService.confirmationMessageListener(com.mail.sender.dto.request.AccountRequest)]
Bean [com.mail.sender.service.senders.GmailConfirmationSenderService@24787445]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.mail.sender.dto.request.AccountRequest] for GenericMessage [payload={&quot;email&quot;:&quot;ckopo.6ygy@gmail.com&quot;,&quot;username&quot;:&quot;asdasd-mjeesh&quot;,&quot;confirmationTokenDetails&quot;:{&quot;token&quot;:&quot;3fd3c1ee-20ec-420b-8ee9-11d22cd7598e&quot;,&quot;createdAt&quot;:[2016,1,25,21,34,55],&quot;expiredAt&quot;:[2023,1,8,18,19,6,661473300]}}, headers={kafka_offset=12, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4008ea0f, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=mail_confirmation_message, kafka_receivedTimestamp=1673193848466, __TypeId__=[B@68a9f1ab, kafka_groupId=account_confirmation_group_id}]

Config for my producer server

@Configuration
public class KafkaProducerConfig {

    @Value(&quot;${spring.kafka.bootstrap-servers}&quot;)
    private String bootstrapServersUrl;

    public Map&lt;String, Object&gt; producerConfig() {
        Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(JsonSerializer.TYPE_MAPPINGS, &quot;accountRequest:com.confirmation_token.model.dto.request.outgoing.AccountRequest&quot;);
        return props;
    }

    @Bean
    public ProducerFactory&lt;String, AccountRequest&gt; producerFactory() {
        return new DefaultKafkaProducerFactory&lt;&gt;(producerConfig());
    }

    @Bean
    public KafkaTemplate&lt;String, AccountRequest&gt; kafkaTemplate(ProducerFactory&lt;String, AccountRequest&gt; producerFactory) {
        return new KafkaTemplate&lt;&gt;(producerFactory);
    }
}

How I send the messages

@Service
@Slf4j
@RequiredArgsConstructor
public class EmailConfirmationSenderServiceCommunication implements ConfirmationSender {
    private final KafkaTemplate&lt;String, AccountRequest&gt; kafkaTemplate;

    @Override
    public void sendConfirmationToken(AccountRequest accountRequest) {
        kafkaTemplate.send(&quot;mail_confirmation_message&quot;, accountRequest);
        log.info(&quot;Confirmation token={} has been send&quot;, accountRequest);
    }
}

Models on producer side

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccountRequest {
    private String email;
    private String username;
    private ConfirmationTokenDetailsRequest confirmationTokenDetails;
}

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ConfirmationTokenDetailsRequest {
    private String token;
    private LocalDateTime createdAt;
    private LocalDateTime expiredAt;
}

Config for my consumer server

@Configuration
public class KafkaConsumerConfig {

    @Value(&quot;${spring.kafka.bootstrap-servers}&quot;)
    private String bootstrapServerUrl;

    public Map&lt;String, Object&gt; consumerConfig() {
        Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TYPE_MAPPINGS, &quot;accountRequest:com.mail.sender.dto.request.AccountRequest&quot;);
        return props;
    }

    @Bean
    public ConsumerFactory&lt;String, AccountRequest&gt; consumerFactory() {
        return new DefaultKafkaConsumerFactory&lt;&gt;(consumerConfig());
    }

    @Bean
    public KafkaListenerContainerFactory&lt;ConcurrentMessageListenerContainer&lt;String, AccountRequest&gt;&gt; listenerContainerFactory(
            ConsumerFactory&lt;String, AccountRequest&gt; consumerFactory) {
        ConcurrentKafkaListenerContainerFactory&lt;String, AccountRequest&gt; listenerContainerFactory =
                new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
        listenerContainerFactory.setConsumerFactory(consumerFactory);
        return listenerContainerFactory;
    }
}

Config for topic

@Configuration
public class KafkaTopicConfig {

    @Value(&quot;${kafka.topic.names.account.confirmation}&quot;)
    private String accountMailConfirmationTopicName;

    @Bean
    public NewTopic accountConfirmationTopic() {
        return TopicBuilder
                .name(accountMailConfirmationTopicName)
                .build();
    }
}

How I try to listen the messages

@Service
@Slf4j
@RequiredArgsConstructor
public class GmailConfirmationSenderService implements EmailSender&lt;AccountRequest&gt; {
    private final String accountConfirmationTemplate;
    private final ApplicationModelValidator applicationModelValidator;
    private final JavaMailSender mailSender;

    @Value(&quot;${confirmation.link.template}&quot;)
    private String confirmationLink;

    @KafkaListener(topics = &quot;mail_confirmation_message&quot;, groupId = &quot;account_confirmation_group_id&quot;)
    public void confirmationMessageListener(AccountRequest accountRequest) {
        String validationViolations = applicationModelValidator.validate(accountRequest);
        if (!validationViolations.isBlank()) {
            log.error(validationViolations);
            throw new ModelValidationException(validationViolations);
        }

        String userConfirmationLink = String.format(
                accountConfirmationTemplate,
                accountRequest.getUsername(),
                confirmationLink + accountRequest.getConfirmationTokenDetails().getToken());
        this.send(accountRequest, userConfirmationLink);
    }
    
    ...
}

Models on consumer side

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccountRequest {
    @Email
    private String email;

    @Length(min = 8, max = 40)
    private String username;

    @NotNull
    @Valid
    private ConfirmationTokenDetailsRequest confirmationTokenDetails;
}

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ConfirmationTokenDetailsRequest {
    @NotBlank
    private String token;

    @PastOrPresent
    private LocalDateTime createdAt;

    @Future
    private LocalDateTime expiredAt;
}

*In the consumer's models are annotations for validation, but they don't affect the process

答案1

得分: 3

listenerContainerFactory - 由于您正在使用非标准名称的工厂,您需要在 @KafkaListener 上指定它(或将工厂bean名称更改为 kafkaListenerContainerFactory)。

英文:

Your listener is using the default (String) deserializer (probably from Boot's auto configured factory).

listenerContainerFactory - since you are using a non-standard name for the factory, you need to specify it on the @KafkaListener (or change the factory bean name to kafkaListenerContainerFactory).

huangapple
  • 本文由 发表于 2023年1月9日 01:09:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/75049783.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定