Test Kafka consumer with Testcontainers and Mockito couldn’t invoke consumer in SpringBoot

huangapple go评论69阅读模式
英文:

Test Kafka consumer with Testcontainers and Mockito couldn't invoke consumer in SpringBoot

问题

以下是您提供的代码的翻译部分:

我正在尝试设置Kafka消费者并使用Testcontainers和Mickito进行测试生产者似乎正常工作但消费者尚未触发并始终给出以下错误信息

期望但未调用
helloKafkaService.handleMessage(
    <捕获参数>
);
-> 在com.my.kafka.HelloKafkaListenerTestIT.helloKafkaListenerTest中
实际上与此模拟对象的互动次数为零

这是我的服务接口HelloKafkaService.java

@Service
public interface HelloKafkaService {
  public void handleMessage(ExampleEvent exampleEvent);
}

我的消费者HelloKafkaListener.java

@Component
public class HelloKafkaListener {
  private static final Logger log = LoggerFactory.getLogger(HelloKafkaListener.class);
  
  private final HelloKafkaService helloKafkaService;
  
  public HelloKafkaListener(HelloKafkaService helloKafkaService) {
    this.helloKafkaService = helloKafkaService;
  }
  
  @KafkaListener(
      topics = "my-topic",
      groupId = "my-topic:HelloKafkaListener")
  public void process(ExampleEvent event) {
    this.helloKafkaService.handleMessage(event);
    log.info("处理事件:" + event.getExampleField());
  }
}

我的测试HelloKafkaListenerTestIT.java

@SpringBootTest
@Testcontainers
public class HelloKafkaListenerTestIT {
  
  @Container
  public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.5"));
  
  @Mock
  private HelloKafkaService helloKafkaService;
  
  @Autowired
  private KafkaTemplate<String, ExampleEvent> kafkaTemplate;
  
  @DynamicPropertySource
  static void kafkaProperties(DynamicPropertyRegistry registry) {
    registry.add(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer::getBootstrapServers);
    registry add(ProducerConfig.CLIENT_ID_CONFIG, () -> "test-id");
    registry add(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, () -> StringSerializer.class.getName());
    registry add(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, () -> KafkaAvroSerializer.class.getName());
  }
  
  @BeforeEach
  public void setUp() {
    Serializer keySerializer;
    Serializer valueSerializer;
  
    var avroConfig = Map.of(
        KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081",
        KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true,
        KafkaAvroSerializerConfig.USE_LATEST_VERSION, true
    );
  
    keySerializer = new StringSerializer();
  
    valueSerializer = new SpecificAvroSerializer<ExampleEvent>();
    valueSerializer.configure(avroConfig, false);
  
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    config.put(ProducerConfig.CLIENT_ID_CONFIG, "test-id");
    config.put("schema.registry.url", "http://localhost:8081");

  
    DefaultKafkaProducerFactory<String, ExampleEvent> producerFactory =
        new DefaultKafkaProducerFactory<>(config, keySerializer, valueSerializer);
    this.kafkaTemplate = new KafkaTemplate<>(producerFactory);
 
  }
  
  static {
    kafkaContainer.start();
  }
  
  
  @Test
  public void helloKafkaListenerTest() {
    ArgumentCaptor<ExampleEvent> captor = ArgumentCaptor.forClass(ExampleEvent.class);
    ExampleEvent exampleEvent = new ExampleEvent("Hello World!");
    // 确认this.kafkaTemplate.send正常工作
    this.kafkaTemplate.send("my-topic", "key", exampleEvent);
    // 在此处引发错误:
    verify(helloKafkaService, timeout(5000)).handleMessage(captor.capture());
    assertNotNull(captor.getValue());
    assertEquals("Hello World!", captor.getValue().getExampleField());
  }
  
  @AfterAll
  static void tearDown() {
    kafkaContainer.stop();
  }

event.avsc

{
  "namespace": "my.namespace",
  "type": "record",
  "name": "ExampleEvent",
  "doc": "一个示例事件",
  "fields": [
    {
      "name": "exampleField",
      "type": "string"
    }
  ]
}

请注意,我已将您提供的代码进行了翻译,如果您有任何其他问题或需要进一步的帮助,请随时提出。

英文:

I am trying to setup a kafka consumer and test it with Testcontainers and Mickito. It looks like the producer works fine but the consumer has not been triggered and always gives me:

Wanted but not invoked:
helloKafkaService.handleMessage(
&lt;Capturing argument&gt;
);
-&gt; at com.my.kafka.HelloKafkaListenerTestIT.helloKafkaListenerTest
Actually, there were zero interactions with this mock.

Here is my service interface HelloKafkaService.java:

@Service
public interface HelloKafkaService {
public void handleMessage(ExampleEvent exampleEvent);
}

My consumer: HelloKafkaListener.java

@Component
public class HelloKafkaListener {
private static final Logger log = LoggerFactory.getLogger(HelloKafkaListener.class);
private final HelloKafkaService helloKafkaService;
public HelloKafkaListener(HelloKafkaService helloKafkaService) {
this.helloKafkaService = helloKafkaService;
}
@KafkaListener(
topics = &quot;my-topic&quot;,
groupId = &quot;my-topic:HelloKafkaListener&quot;)
public void process(ExampleEvent event) {
this.helloKafkaService.handleMessage(event);
log.info(&quot;Processing event: &quot; + event.getExampleField());
}
}

My test HelloKafkaListenerTestIT.java:

@SpringBootTest
@Testcontainers
public class HelloKafkaListenerTestIT {
@Container
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse(&quot;confluentinc/cp-kafka:7.0.5&quot;));
@Mock
private HelloKafkaService helloKafkaService;
@Autowired
private KafkaTemplate&lt;String, ExampleEvent&gt; kafkaTemplate;
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer::getBootstrapServers);
registry.add(ProducerConfig.CLIENT_ID_CONFIG, () -&gt; &quot;test-id&quot;);
registry.add(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, () -&gt; StringSerializer.class.getName());
registry.add(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, () -&gt; KafkaAvroSerializer.class.getName());
}
@BeforeEach
public void setUp() {
Serializer keySerializer;
Serializer valueSerializer;
var avroConfig = Map.of(
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, &quot;http://localhost:8081&quot;,
KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true,
KafkaAvroSerializerConfig.USE_LATEST_VERSION, true
);
keySerializer = new StringSerializer();
valueSerializer = new SpecificAvroSerializer&lt;ExampleEvent&gt;();
valueSerializer.configure(avroConfig, false);
Map&lt;String, Object&gt; config = new HashMap&lt;&gt;();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
config.put(ProducerConfig.CLIENT_ID_CONFIG, &quot;test-id&quot;);
config.put(&quot;schema.registry.url&quot;, &quot;http://localhost:8081&quot;);
DefaultKafkaProducerFactory&lt;String, ExampleEvent&gt; producerFactory =
new DefaultKafkaProducerFactory&lt;&gt;(config, keySerializer, valueSerializer);
this.kafkaTemplate = new KafkaTemplate&lt;&gt;(producerFactory);
}
static {
kafkaContainer.start();
}
@Test
public void helloKafkaListenerTest() {
ArgumentCaptor&lt;ExampleEvent&gt; captor = ArgumentCaptor.forClass(ExampleEvent.class);
ExampleEvent exampleEvent = new ExampleEvent(&quot;Hello World!&quot;);
// Confirmed this.kafkaTemplate.send works well
this.kafkaTemplate.send(&quot;my-topic&quot;, &quot;key&quot;, exampleEvent);
// Throw the error here:
verify(helloKafkaService, timeout(5000)).handleMessage(captor.capture());
assertNotNull(captor.getValue());
assertEquals(&quot;Hello World!&quot;, captor.getValue().getExampleField());
}
@AfterAll
static void tearDown() {
kafkaContainer.stop();
}

event.avsc:

{
&quot;namespace&quot;: &quot;my.namespace&quot;,
&quot;type&quot;: &quot;record&quot;,
&quot;name&quot;: &quot;ExampleEvent&quot;,
&quot;doc&quot;: &quot;A sample event&quot;,
&quot;fields&quot;: [
{
&quot;name&quot;: &quot;exampleField&quot;,
&quot;type&quot;: &quot;string&quot;
}
]
}

Why can the consumer not be triggered, if using @Mock and verify? Is there a better idea to test this?

答案1

得分: 1

因为 @Mock 只是创建了一个模拟实例,但这个实例不会被 Spring 上下文管理。所以 HelloKafkaListener 内部的 HelloKafkaService 仍然是实际的实例,而不是模拟实例。

你需要使用 @MockBean。不同之处在于它将创建一个模拟实例,并用它替换 Spring 上下文中的实际 bean 实例。

所以,改成以下方式应该可以解决问题:

@SpringBootTest
@Testcontainers
public class HelloKafkaListenerTestIT {
  
  @MockBean
  private HelloKafkaService helloKafkaService;

}
英文:

Because @Mock just create a mocked instance but this instance will not be managed by the spring context. So HelloKafkaService inside HelloKafkaListener is still the actual instance but not the mocked one.

You have to use @MockBean. The difference is that it will create a mocked instance and also replace the actual bean instance in the spring context with it.

So change to the following should fix the problem :

@SpringBootTest
@Testcontainers
public class HelloKafkaListenerTestIT {
  
  @MockBean
  private HelloKafkaService helloKafkaService;

}

huangapple
  • 本文由 发表于 2023年2月27日 11:20:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/75576510.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定