英文:
Test Kafka consumer with Testcontainers and Mockito couldn't invoke consumer in SpringBoot
问题
以下是您提供的代码的翻译部分:
我正在尝试设置Kafka消费者并使用Testcontainers和Mickito进行测试。生产者似乎正常工作,但消费者尚未触发,并始终给出以下错误信息:
期望但未调用:
helloKafkaService.handleMessage(
<捕获参数>
);
-> 在com.my.kafka.HelloKafkaListenerTestIT.helloKafkaListenerTest中
实际上,与此模拟对象的互动次数为零。
这是我的服务接口HelloKafkaService.java:
@Service
public interface HelloKafkaService {
public void handleMessage(ExampleEvent exampleEvent);
}
我的消费者:HelloKafkaListener.java
@Component
public class HelloKafkaListener {
private static final Logger log = LoggerFactory.getLogger(HelloKafkaListener.class);
private final HelloKafkaService helloKafkaService;
public HelloKafkaListener(HelloKafkaService helloKafkaService) {
this.helloKafkaService = helloKafkaService;
}
@KafkaListener(
topics = "my-topic",
groupId = "my-topic:HelloKafkaListener")
public void process(ExampleEvent event) {
this.helloKafkaService.handleMessage(event);
log.info("处理事件:" + event.getExampleField());
}
}
我的测试HelloKafkaListenerTestIT.java:
@SpringBootTest
@Testcontainers
public class HelloKafkaListenerTestIT {
@Container
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.5"));
@Mock
private HelloKafkaService helloKafkaService;
@Autowired
private KafkaTemplate<String, ExampleEvent> kafkaTemplate;
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer::getBootstrapServers);
registry add(ProducerConfig.CLIENT_ID_CONFIG, () -> "test-id");
registry add(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, () -> StringSerializer.class.getName());
registry add(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, () -> KafkaAvroSerializer.class.getName());
}
@BeforeEach
public void setUp() {
Serializer keySerializer;
Serializer valueSerializer;
var avroConfig = Map.of(
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081",
KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true,
KafkaAvroSerializerConfig.USE_LATEST_VERSION, true
);
keySerializer = new StringSerializer();
valueSerializer = new SpecificAvroSerializer<ExampleEvent>();
valueSerializer.configure(avroConfig, false);
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
config.put(ProducerConfig.CLIENT_ID_CONFIG, "test-id");
config.put("schema.registry.url", "http://localhost:8081");
DefaultKafkaProducerFactory<String, ExampleEvent> producerFactory =
new DefaultKafkaProducerFactory<>(config, keySerializer, valueSerializer);
this.kafkaTemplate = new KafkaTemplate<>(producerFactory);
}
static {
kafkaContainer.start();
}
@Test
public void helloKafkaListenerTest() {
ArgumentCaptor<ExampleEvent> captor = ArgumentCaptor.forClass(ExampleEvent.class);
ExampleEvent exampleEvent = new ExampleEvent("Hello World!");
// 确认this.kafkaTemplate.send正常工作
this.kafkaTemplate.send("my-topic", "key", exampleEvent);
// 在此处引发错误:
verify(helloKafkaService, timeout(5000)).handleMessage(captor.capture());
assertNotNull(captor.getValue());
assertEquals("Hello World!", captor.getValue().getExampleField());
}
@AfterAll
static void tearDown() {
kafkaContainer.stop();
}
event.avsc:
{
"namespace": "my.namespace",
"type": "record",
"name": "ExampleEvent",
"doc": "一个示例事件",
"fields": [
{
"name": "exampleField",
"type": "string"
}
]
}
请注意,我已将您提供的代码进行了翻译,如果您有任何其他问题或需要进一步的帮助,请随时提出。
英文:
I am trying to setup a kafka consumer and test it with Testcontainers and Mickito. It looks like the producer works fine but the consumer has not been triggered and always gives me:
Wanted but not invoked:
helloKafkaService.handleMessage(
<Capturing argument>
);
-> at com.my.kafka.HelloKafkaListenerTestIT.helloKafkaListenerTest
Actually, there were zero interactions with this mock.
Here is my service interface HelloKafkaService.java:
@Service
public interface HelloKafkaService {
public void handleMessage(ExampleEvent exampleEvent);
}
My consumer: HelloKafkaListener.java
@Component
public class HelloKafkaListener {
private static final Logger log = LoggerFactory.getLogger(HelloKafkaListener.class);
private final HelloKafkaService helloKafkaService;
public HelloKafkaListener(HelloKafkaService helloKafkaService) {
this.helloKafkaService = helloKafkaService;
}
@KafkaListener(
topics = "my-topic",
groupId = "my-topic:HelloKafkaListener")
public void process(ExampleEvent event) {
this.helloKafkaService.handleMessage(event);
log.info("Processing event: " + event.getExampleField());
}
}
My test HelloKafkaListenerTestIT.java:
@SpringBootTest
@Testcontainers
public class HelloKafkaListenerTestIT {
@Container
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.5"));
@Mock
private HelloKafkaService helloKafkaService;
@Autowired
private KafkaTemplate<String, ExampleEvent> kafkaTemplate;
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer::getBootstrapServers);
registry.add(ProducerConfig.CLIENT_ID_CONFIG, () -> "test-id");
registry.add(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, () -> StringSerializer.class.getName());
registry.add(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, () -> KafkaAvroSerializer.class.getName());
}
@BeforeEach
public void setUp() {
Serializer keySerializer;
Serializer valueSerializer;
var avroConfig = Map.of(
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081",
KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true,
KafkaAvroSerializerConfig.USE_LATEST_VERSION, true
);
keySerializer = new StringSerializer();
valueSerializer = new SpecificAvroSerializer<ExampleEvent>();
valueSerializer.configure(avroConfig, false);
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
config.put(ProducerConfig.CLIENT_ID_CONFIG, "test-id");
config.put("schema.registry.url", "http://localhost:8081");
DefaultKafkaProducerFactory<String, ExampleEvent> producerFactory =
new DefaultKafkaProducerFactory<>(config, keySerializer, valueSerializer);
this.kafkaTemplate = new KafkaTemplate<>(producerFactory);
}
static {
kafkaContainer.start();
}
@Test
public void helloKafkaListenerTest() {
ArgumentCaptor<ExampleEvent> captor = ArgumentCaptor.forClass(ExampleEvent.class);
ExampleEvent exampleEvent = new ExampleEvent("Hello World!");
// Confirmed this.kafkaTemplate.send works well
this.kafkaTemplate.send("my-topic", "key", exampleEvent);
// Throw the error here:
verify(helloKafkaService, timeout(5000)).handleMessage(captor.capture());
assertNotNull(captor.getValue());
assertEquals("Hello World!", captor.getValue().getExampleField());
}
@AfterAll
static void tearDown() {
kafkaContainer.stop();
}
event.avsc:
{
"namespace": "my.namespace",
"type": "record",
"name": "ExampleEvent",
"doc": "A sample event",
"fields": [
{
"name": "exampleField",
"type": "string"
}
]
}
Why can the consumer not be triggered, if using @Mock
and verify
? Is there a better idea to test this?
答案1
得分: 1
因为 @Mock
只是创建了一个模拟实例,但这个实例不会被 Spring 上下文管理。所以 HelloKafkaListener
内部的 HelloKafkaService
仍然是实际的实例,而不是模拟实例。
你需要使用 @MockBean
。不同之处在于它将创建一个模拟实例,并用它替换 Spring 上下文中的实际 bean 实例。
所以,改成以下方式应该可以解决问题:
@SpringBootTest
@Testcontainers
public class HelloKafkaListenerTestIT {
@MockBean
private HelloKafkaService helloKafkaService;
}
英文:
Because @Mock
just create a mocked instance but this instance will not be managed by the spring context. So HelloKafkaService
inside HelloKafkaListener
is still the actual instance but not the mocked one.
You have to use @MockBean
. The difference is that it will create a mocked instance and also replace the actual bean instance in the spring context with it.
So change to the following should fix the problem :
@SpringBootTest
@Testcontainers
public class HelloKafkaListenerTestIT {
@MockBean
private HelloKafkaService helloKafkaService;
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论