Kafka Streams – 生产异常处理程序实现的单元测试

huangapple go评论80阅读模式
英文:

Kafka Streams - Unit test for ProductionExceptionHandler implementation

问题

I am developing a Kafka streams application that uses Spring Cloud streams for configuration.
In order to handle "RecordTooLargeException," I have implemented a custom ProductionExceptionHandler as below.

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

 @Override
 public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record,
                                                 Exception exception) {
    if (exception instanceof RecordTooLargeException) {
      
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
    return ProductionExceptionHandlerResponse.FAIL;
 }

 @Override
 public void configure(Map<String, ?> configs) {
 }

}

I have added the following property in my application.yml

spring.kafka:
streams:
properties:
default.production.exception.handler: "com.fd.acquisition.product.availability.product.exception.StreamsRecordProducerErrorHandler"

The code works fine as the default failing behavior is overridden, and the processing is continued.

I am trying to write unit test cases to simulate this behavior.
I am making use of TopologyTestDriver, and it has the following configuration.

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "TopologyTestDriver");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
streamsConfiguration.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "200");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, (10 * 1024));
streamsConfiguration.put(StreamsConfig.SEND_BUFFER_CONFIG, 10);
streamsConfiguration.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, 10);

final String tempDirectory = Files.createTempDirectory("kafka-streams").toAbsolutePath().toString();
streamsConfiguration.setProperty(StreamsConfig.STATE_DIR_CONFIG, tempDirectory);
streamsConfiguration.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProductionExceptionHandler.class);

Ideally, the size of the record should throw RecordTooLargeException when I push a large record. But the MockProducer class is used instead of KafkaProducer, and hence the ensureValidRecordSize() method is never called.

Is there any way to override this behavior or any other way to simulate the RecordTooLargeException?

英文:

I am developing a Kafka streams application that uses Spring Cloud streams for configuration.
In order to handle "RecordTooLargeException" I have implemented a custom ProductionExceptionHandler as below.

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

 @Override
 public ProductionExceptionHandlerResponse handle(ProducerRecord&lt;byte[], byte[]&gt; record,
                                                 Exception exception) {
    if (exception instanceof RecordTooLargeException) {
      
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
    return ProductionExceptionHandlerResponse.FAIL;
 }

 @Override
 public void configure(Map&lt;String, ?&gt; configs) {
 }

}

I have added the following property in my application.yml

spring.kafka:
    streams:
     properties:
       default.production.exception.handler: &quot;com.fd.acquisition.product.availability.product.exception.StreamsRecordProducerErrorHandler&quot;

The code works fine as the default failing behavior is overridden and the processing is continued.

I am trying to write unit test cases to simulate this behaviour.
I am making use of TopologyTestDriver and it has the following configuration.

	Properties streamsConfiguration = new Properties();
	streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, &quot;TopologyTestDriver&quot;);
	streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;dummy:1234&quot;);
	streamsConfiguration.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, &quot;200&quot;);
	streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
	streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, (10 * 1024 ));
	streamsConfiguration.put(StreamsConfig.SEND_BUFFER_CONFIG, (10 ));
	streamsConfiguration.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, (10 ));

	final String tempDrectory = Files.createTempDirectory(&quot;kafka-streams&quot;).toAbsolutePath().toString();
	streamsConfiguration.setProperty(StreamsConfig.STATE_DIR_CONFIG, tempDrectory);
	streamsConfiguration.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
			CustomProductionExceptionHandler.class);

Ideally, the size of the record should throw RecordTooLargeException when I push a large record.
But the MockProducer class is used instead of KafkaProducer and hence the ensureValidRecordSize() method is never called.

Is there any way to override this behaviour or any other way to simulate the RecordTooLargeException ?

答案1

得分: 2

我认为这基本上是MockProducer缺少的一个功能。所以第一种选择是向Kafka贡献一个PR,以添加对MockProducer的缺少检查。看起来应该很容易添加它。如果您不是开发人员,您还可以创建一个问题,并希望社区中的某人会处理它。

无论哪种方式,您都需要等待下一个Kafka发布以获得这个功能。您必须决定在那之前是否能够没有单元测试。可能有一些方法可以绕过这个问题(使用反射使TopologyTestDriver.producer可见/非final,并将其替换为MockProducer的“改进”版本),但这可能比仅仅贡献给Kafka更困难。

英文:

I think this is basically a feature missing from MockProducer. So the first option would be to contribute a PR back to Kafka that adds the missing check to MockProducer. It looks that it should be fairly easy to add it. If you are not a developer, alternatively you can create an issue and hope somebody from the community will pick it up.

Either way, you will have to wait for the next Kafka release to get the feature. You have to decide whether you can live without the unit test until then. There could be ways to hack this (use reflection to make TopologyTestDriver.producer visible / non-final and replace it by an "improved" version of MockProducer), but it's probably going to be harder than to just contribute to Kafka.

huangapple
  • 本文由 发表于 2023年2月10日 13:06:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/75407171.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定