英文:
Error trying to send Kogito end & start messages using Smallrye Kafka in Quarkus
问题
以下是您提供的代码部分的翻译:
kafka.bootstrap.servers=0.0.0.0\:9092
mp.messaging.incoming.endSales.auto.offset.reset=earliest
mp.messaging.incoming.endSales.connector=smallrye-kafka
mp.messaging.incoming.endSales.topic=endSales
mp.messaging.incoming.endSales.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.outgoing.kogito-processinstances-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-processinstances-events.topic=kogito-processinstances-events
mp.messaging.outgoing.kogito-processinstances-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito-usertaskinstances-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-usertaskinstances-events.topic=kogito-usertaskinstances-events
mp.messaging.outgoing.kogito-usertaskinstances-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito-variables-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-variables-events.topic=kogito-variables-events
mp.messaging.outgoing.kogito-variables-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito_outgoing_stream.connector=smallrye-kafka
mp.messaging.outgoing.kogito_outgoing_stream.topic=endSales
mp.messaging.outgoing.kogito.outgoing_stream.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito_outgoing_newEnds.connector=smallrye-kafka
mp.messaging.outgoing.kogito_outgoing_newEnds.topic=newEnds
mp.messaging.outgoing.kogito_outgoing_newEnds.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito_outgoing_sudoku.connector=smallrye-kafka
mp.messaging.outgoing.kogito_outgoing_sudoku.topic=sudoku
mp.messaging.outgoing.kogito_outgoing_sudoku.value.serializer=org.apache.kafka.common.serialization.StringSerializer
请注意,以下是错误信息的翻译:
java.lang.IllegalArgumentException: SRMSG00071: 无效的通道配置 - 必须设置`connector`属性以用于通道`kogito`
at io.smallrye.reactive.messaging.providers.impl.ConnectorConfig.lambda$new$0(ConnectorConfig.java:50)
at java.base/java.util.Optional.orElseThrow(Optional.java:403)
at io.smallrye.reactive.messaging.providers.impl.ConnectorConfig.lambda$new$1(ConnectorConfig.java:50)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at io.smallrye.reactive.messaging.providers.impl.ConnectorConfig.<init>(ConnectorConfig.java:49)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.lambda$extractConfigurationFor$0(ConfiguredChannelFactory.java:85)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.extractConfigurationFor(ConfiguredChannelFactory.java:74)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:101)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_Subclass.initialize$$superforward1(Unknown Source)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_Subclass$$function$$4.apply(Unknown Source)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:49)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(Unknown Source)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_Subclass.initialize(Unknown Source)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:192)
至于Kogito消息在Quarkus中的Kafka主题的通道命名约定,错误消息没有提到具体的命名约定。您可能需要查看Kogito和Quarkus的文档或配置文件以获取有关通道命名的更多信息。
英文:
I have created a quarkus app with multiple BPMN's having start messages and end messages.
The below is my application.properties for messaging
kafka.bootstrap.servers=0.0.0.0\:9092
mp.messaging.incoming.endSales.auto.offset.reset=earliest
mp.messaging.incoming.endSales.connector=smallrye-kafka
mp.messaging.incoming.endSales.topic=endSales
mp.messaging.incoming.endSales.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.outgoing.kogito-processinstances-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-processinstances-events.topic=kogito-processinstances-events
mp.messaging.outgoing.kogito-processinstances-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito-usertaskinstances-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-usertaskinstances-events.topic=kogito-usertaskinstances-events
mp.messaging.outgoing.kogito-usertaskinstances-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito-variables-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-variables-events.topic=kogito-variables-events
mp.messaging.outgoing.kogito-variables-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito_outgoing_stream.connector=smallrye-kafka
mp.messaging.outgoing.kogito_outgoing_stream.topic=endSales
mp.messaging.outgoing.kogito.outgoing_stream.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito_outgoing_newEnds.connector=smallrye-kafka
mp.messaging.outgoing.kogito_outgoing_newEnds.topic=newEnds
mp.messaging.outgoing.kogito_outgoing_newEnds.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito_outgoing_sudoku.connector=smallrye-kafka
mp.messaging.outgoing.kogito_outgoing_sudoku.topic=sudoku
mp.messaging.outgoing.kogito_outgoing_sudoku.value.serializer=org.apache.kafka.common.serialization.StringSerializer
This throws me the following error during build time
java.lang.IllegalArgumentException: SRMSG00071: Invalid channel configuration - the `connector` attribute must be set for channel `kogito`
at io.smallrye.reactive.messaging.providers.impl.ConnectorConfig.lambda$new$0(ConnectorConfig.java:50)
at java.base/java.util.Optional.orElseThrow(Optional.java:403)
at io.smallrye.reactive.messaging.providers.impl.ConnectorConfig.lambda$new$1(ConnectorConfig.java:50)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at io.smallrye.reactive.messaging.providers.impl.ConnectorConfig.<init>(ConnectorConfig.java:49)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.lambda$extractConfigurationFor$0(ConfiguredChannelFactory.java:85)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.extractConfigurationFor(ConfiguredChannelFactory.java:74)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:101)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_Subclass.initialize$$superforward1(Unknown Source)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_Subclass$$function$$4.apply(Unknown Source)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:49)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(Unknown Source)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_Subclass.initialize(Unknown Source)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:192)
Also are there any channel naming conventions related to kafka topics for Kogito messages in Quarkus?
答案1
得分: 1
根据1中解释,重要的是你的bpmn中消息的名称要与application.properties中通道的名称匹配。
建议使用较短的通道名称。例如,可以使用newEnds而不是kogito_outgoing_newEnds(这样您就不需要指定主题,因为默认情况下主题与通道名称相同)。
如您在博客中看到的,kogito_outgoing_stream是默认的通道名称(也可以通过配置进行更改)。
英文:
As explained here, the important thing is that the name of your message name in your bpmn matches the name of the channel in your application.properties
As a recommendation use shorter channel names. For example, rather than kogito_outgoing_newEnds, just use newEnds (so you do not need to specify the topic, since the topic is by default equal to channel name)
As you will see in the blog, kogito_outgoing_stream is the default channel name (which can also be changed through configuration)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论