英文:
Kafka Streams - Unit test for ProductionExceptionHandler implementation
问题
I am developing a Kafka streams application that uses Spring Cloud streams for configuration.
In order to handle "RecordTooLargeException," I have implemented a custom ProductionExceptionHandler as below.
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record,
Exception exception) {
if (exception instanceof RecordTooLargeException) {
return ProductionExceptionHandlerResponse.CONTINUE;
}
return ProductionExceptionHandlerResponse.FAIL;
}
@Override
public void configure(Map<String, ?> configs) {
}
}
I have added the following property in my application.yml
spring.kafka:
streams:
properties:
default.production.exception.handler: "com.fd.acquisition.product.availability.product.exception.StreamsRecordProducerErrorHandler"
The code works fine as the default failing behavior is overridden, and the processing is continued.
I am trying to write unit test cases to simulate this behavior.
I am making use of TopologyTestDriver, and it has the following configuration.
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "TopologyTestDriver");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
streamsConfiguration.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "200");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, (10 * 1024));
streamsConfiguration.put(StreamsConfig.SEND_BUFFER_CONFIG, 10);
streamsConfiguration.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, 10);
final String tempDirectory = Files.createTempDirectory("kafka-streams").toAbsolutePath().toString();
streamsConfiguration.setProperty(StreamsConfig.STATE_DIR_CONFIG, tempDirectory);
streamsConfiguration.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProductionExceptionHandler.class);
Ideally, the size of the record should throw RecordTooLargeException when I push a large record. But the MockProducer class is used instead of KafkaProducer, and hence the ensureValidRecordSize() method is never called.
Is there any way to override this behavior or any other way to simulate the RecordTooLargeException?
英文:
I am developing a Kafka streams application that uses Spring Cloud streams for configuration.
In order to handle "RecordTooLargeException" I have implemented a custom ProductionExceptionHandler as below.
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record,
Exception exception) {
if (exception instanceof RecordTooLargeException) {
return ProductionExceptionHandlerResponse.CONTINUE;
}
return ProductionExceptionHandlerResponse.FAIL;
}
@Override
public void configure(Map<String, ?> configs) {
}
}
I have added the following property in my application.yml
spring.kafka:
streams:
properties:
default.production.exception.handler: "com.fd.acquisition.product.availability.product.exception.StreamsRecordProducerErrorHandler"
The code works fine as the default failing behavior is overridden and the processing is continued.
I am trying to write unit test cases to simulate this behaviour.
I am making use of TopologyTestDriver and it has the following configuration.
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "TopologyTestDriver");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
streamsConfiguration.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "200");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, (10 * 1024 ));
streamsConfiguration.put(StreamsConfig.SEND_BUFFER_CONFIG, (10 ));
streamsConfiguration.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, (10 ));
final String tempDrectory = Files.createTempDirectory("kafka-streams").toAbsolutePath().toString();
streamsConfiguration.setProperty(StreamsConfig.STATE_DIR_CONFIG, tempDrectory);
streamsConfiguration.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
Ideally, the size of the record should throw RecordTooLargeException when I push a large record.
But the MockProducer class is used instead of KafkaProducer and hence the ensureValidRecordSize() method is never called.
Is there any way to override this behaviour or any other way to simulate the RecordTooLargeException ?
答案1
得分: 2
我认为这基本上是MockProducer
缺少的一个功能。所以第一种选择是向Kafka贡献一个PR,以添加对MockProducer
的缺少检查。看起来应该很容易添加它。如果您不是开发人员,您还可以创建一个问题,并希望社区中的某人会处理它。
无论哪种方式,您都需要等待下一个Kafka发布以获得这个功能。您必须决定在那之前是否能够没有单元测试。可能有一些方法可以绕过这个问题(使用反射使TopologyTestDriver.producer
可见/非final,并将其替换为MockProducer
的“改进”版本),但这可能比仅仅贡献给Kafka更困难。
英文:
I think this is basically a feature missing from MockProducer
. So the first option would be to contribute a PR back to Kafka that adds the missing check to MockProducer
. It looks that it should be fairly easy to add it. If you are not a developer, alternatively you can create an issue and hope somebody from the community will pick it up.
Either way, you will have to wait for the next Kafka release to get the feature. You have to decide whether you can live without the unit test until then. There could be ways to hack this (use reflection to make TopologyTestDriver.producer
visible / non-final and replace it by an "improved" version of MockProducer
), but it's probably going to be harder than to just contribute to Kafka.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论