使用Kafka Streams绑定测试Spring Cloud Stream应用程序

huangapple go评论159阅读模式
英文:

Testing Spring Cloud Stream app with kafka-streams binder

问题

经过尝试了许多不同的设置并阅读了许多不同的指南之后,我仍然不知道如何为使用kafka-streams绑定器的Spring Cloud Stream处理器编写简单的单元测试。

从一个全新的存储库开始,并遵循此处概述的设置1,我能够编写基本的大写字母演示函数(在我的代码中命名为'uppercaseSimple'),如下所示:

  1. @Bean
  2. public Function<String, String> uppercaseSimple() {
  3. return value -> value.toUpperCase();
  4. }

并且有一个正常工作的测试通过:

  1. @SpringBootTest(classes = SpringCloudStreamDemoApplication.class)
  2. @Import({TestChannelBinderConfiguration.class})
  3. class SpringCloudStreamDemoApplicationTests {
  4. @Autowired
  5. private InputDestination input;
  6. @Autowired
  7. private OutputDestination output;
  8. @Test
  9. void contextLoads() {
  10. input.send(new GenericMessage<byte[]>("hello".getBytes()));
  11. assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
  12. }
  13. }

然而,尝试将函数签名更改为完全使用kafka-streams绑定器的处理器,如下所示:

  1. @Bean
  2. public Function<KStream<Bytes, String>, KStream<Bytes, String>> uppercaseStream() {
  3. return input -> input
  4. .map((key, value) -> new KeyValue<>(null, value.toUpperCase()));
  5. }

这意味着相同的测试现在失败,出现以下错误:

  1. Index: 0, Size: 0
  2. java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
  3. at java.util.ArrayList.rangeCheck(ArrayList.java:657)
  4. ...

我已经将此演示推送到GitHub 存储库,并且一直在通过注释行1924在两个函数之间切换以进行测试,以确保每次只注册一个版本的函数。我在哪里出错了?这种测试是否不适用于使用KStream / KTable的函数?有关测试的文档似乎没有区分哪些底层绑定器是兼容的,但似乎kafka-streams 不兼容?

英文:

After trying a number of different setups, and reading many different guides, I'm still at a loss for how to write simple unit tests for Spring Cloud Stream processors that use the kafka-streams binder.

Starting with a completely new repo and following the setup outlined here, I am able to write the basic uppercase demo function (in my code named 'uppercaseSimple') like so:

  1. @Bean
  2. public Function&lt;String, String&gt; uppercaseSimple() {
  3. return value -&gt; value.toUpperCase();
  4. }

with a functioning test that passes:

  1. @SpringBootTest(classes = SpringCloudStreamDemoApplication.class)
  2. @Import({TestChannelBinderConfiguration.class})
  3. class SpringCloudStreamDemoApplicationTests {
  4. @Autowired
  5. private InputDestination input;
  6. @Autowired
  7. private OutputDestination output;
  8. @Test
  9. void contextLoads() {
  10. input.send(new GenericMessage&lt;byte[]&gt;(&quot;hello&quot;.getBytes()));
  11. assertThat(output.receive().getPayload()).isEqualTo(&quot;HELLO&quot;.getBytes());
  12. }
  13. }

However, trying to change the function signature to a processor that makes full use of the kafka-streams binder like so:

  1. @Bean
  2. public Function&lt;KStream&lt;Bytes, String&gt;, KStream&lt;Bytes, String&gt;&gt; uppercaseStream() {
  3. return input -&gt; input
  4. .map((key, value) -&gt; new KeyValue&lt;&gt;(null, value.toUpperCase()));
  5. }

means the same test now fails with the following error:

  1. Index: 0, Size: 0
  2. java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
  3. at java.util.ArrayList.rangeCheck(ArrayList.java:657)
  4. at java.util.ArrayList.get(ArrayList.java:433)
  5. at org.springframework.cloud.stream.binder.test.AbstractDestination.getChannel(AbstractDestination.java:34)
  6. at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:37)
  7. at com.example.springcloudstreamdemo.SpringCloudStreamDemoApplicationTests.contextLoads(SpringCloudStreamDemoApplicationTests.java:26)
  8. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  9. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  10. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  11. at java.lang.reflect.Method.invoke(Method.java:498)
  12. at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)
  13. at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
  14. at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
  15. at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
  16. at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
  17. at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
  18. at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
  19. at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
  20. at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
  21. at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
  22. at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
  23. at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
  24. at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
  25. at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
  26. at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
  27. at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  28. at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
  29. at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
  30. at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
  31. at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
  32. at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  33. at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
  34. at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
  35. at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
  36. at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  37. at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
  38. at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
  39. at java.util.ArrayList.forEach(ArrayList.java:1257)
  40. at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
  41. at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
  42. at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  43. at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
  44. at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
  45. at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
  46. at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  47. at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
  48. at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
  49. at java.util.ArrayList.forEach(ArrayList.java:1257)
  50. at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
  51. at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
  52. at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  53. at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
  54. at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
  55. at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
  56. at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  57. at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
  58. at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
  59. at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
  60. at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
  61. at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
  62. at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
  63. at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
  64. at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
  65. at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
  66. at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
  67. at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
  68. at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
  69. at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
  70. at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
  71. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  72. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  73. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  74. at java.lang.reflect.Method.invoke(Method.java:498)
  75. at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
  76. at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
  77. at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
  78. at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
  79. at com.sun.proxy.$Proxy2.stop(Unknown Source)
  80. at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:133)
  81. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  82. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  83. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  84. at java.lang.reflect.Method.invoke(Method.java:498)
  85. at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
  86. at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
  87. at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
  88. at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
  89. at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414)
  90. at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
  91. at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
  92. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  93. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  94. at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
  95. at java.lang.Thread.run(Thread.java:748)

I have pushed this demo to a GitHub repo, and have been toggling between testing the two functions by commenting in / out lines 19 and 24, to ensure that only one version of the function is registered at a time. Where am I going wrong here? Is this kind of testing not applicable to functions that make use of KStream / KTable? The docs on testing seem to make no distinction about which underlying binder(s) are compatible, but it seems like kafka-streams isn't?

答案1

得分: 3

Kafka Streams在Spring Cloud Stream中的绑定器与您上面提到的测试策略不兼容。这个示例应用程序演示了一些在Spring Cloud Stream中单元测试Kafka Streams应用程序的潜在方法。您可以在这里查看这些测试。

英文:

Kafka Streams binder in Spring Cloud Stream is not compatible with that testing strategy you mentioned above. This sample application demonstrates some potential ways to unit test a Kafka Streams application in Spring Cloud Stream. Take a look at the tests here.

huangapple
  • 本文由 发表于 2020年9月28日 14:12:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/64096840.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定