英文:
@KafkaListener __listener not available in BeanExpressionContext in Spring 3.0.1, works in v2.7.8
问题
以下是您要翻译的内容:
I'm creating a generic kafka listener which is spawned at startup and during execution. However, after upgrading to Spring boot 3.0.1 (from 2.7.8) it seems like SpEL isn't capable of accessing __listener
anymore, it seems like it isn't created...
In my Application I have a bean scoped to prototype, to create new instances of generic kafka listeners. The code is as follows:
XyzApplication.kt
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun createKafkaListener(
groupId: String,
topic: String,
integrations: List<Integration>,
avroSchema: String?
): GenericKafkaListener {
return GenericKafkaListener(
groupId = groupId, topic = topic, integrations = integrations, avroSchema = avroSchema)
}
GenericKafkaListener.kt
class GenericKafkaListener(
val id: String = UUID.randomUUID().toString(),
val groupId: String,
val topic: String,
val integrations: List<Integration>,
val avroSchema: String?
) {
private val schema: Schema? = avroSchema?.let { Schema.Parser().parse(it) }
@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topic}"], groupId = "#{__listener.groupId}")
fun listen(message: GenericRecord) {
integrations.forEach { it.send(message.toString()) }
}
}
The createKafkaListener is added to the applicationContext when a request is processed and a new kafka listener should be instantiated. Which happens in
StaticAppContext.kt
import xyz.controller.model.Integration
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationContext
import org.springframework.context.ApplicationContextAware
import org.springframework.stereotype.Component
@Component
object StaticAppContext : ApplicationContextAware {
private val LOG = LoggerFactory.getLogger(this::class.java)
@Volatile @JvmStatic lateinit var context: ApplicationContext
fun addKafkaListener(
groupId: String,
topic: String,
integrations: List<Integration>,
avroSchema: String?
) {
val listener =
createKafkaListener(
groupId = groupId, topic = topic, integrations = integrations, avroSchema = avroSchema)
context.autowireCapableBeanFactory.autowireBean(listener)
context.autowireCapableBeanFactory.initializeBean(listener, listener.id)
LOG.info("KafkaListener(${listener.id}) added to group: $groupId")
}
fun addHttpEndpoint(path: String) {
val endpoint = createHttpEndpoint1(path = path)
context.autowireCapableBeanFactory.autowireBean(endpoint)
context.autowireCapableBeanFactory.initializeBean(endpoint, path)
LOG.info("HttpEndpoint added: $path")
}
override fun setApplicationContext(applicationContext: ApplicationContext) {
context = applicationContext
LOG.info("ApplicationContext injected")
}
}
With v3.0.1 I get the following exception stacktrace after failing on
context.autowireCapableBeanFactory.initializeBean(listener, listener.id)
org.springframework.beans.factory.BeanExpressionException: Expression parsing failed
...
build.gradle.kts - v3.0.1
...
build.gradle.kts - v2.7.8
...
I'm not that experienced with Spring Boot, but I've tried to read the changelog, however from my understanding it didn't indicate any obvious changes which could create this issue.. Then again, I'm not that experienced...
So what am I looking for here? Just some pointers in the right direction to how I can solve the initializing problem or avoid the problem all together - maybe I need some annotations, imports or structure the code in another way to achieve what I'm trying to do. Help is much appreciated!
Update - Solution
Adding the @KafkaListener annotation to the class level and added @KafkaHandler to the function fixed it...
Shown below
GenericKafkaListener.kt
import xyz.controller.model.Integration
import org.apache.avro.Schema
import org.springframework.kafka.annotation.KafkaHandler
import org.springframework.kafka.annotation.KafkaListener
import java.util.UUID
@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topic}"], groupId = "#{__listener.groupId}", )
class GenericKafkaListener(
val id: String = UUID.randomUUID().toString(),
val groupId: String,
val topic: String,
val integrations: List<Integration>,
val avroSchema: String?
) {
private val schema: Schema? = avroSchema?.let { Schema.Parser().parse(it) }
@KafkaHandler
fun onMessage(message: String) {
integrations.forEach { it.send(message) }
}
}
希望这些翻译对您有所帮助。如果您需要进一步的帮助,请随时提问。
英文:
I'm creating a generic kafka listener which is spawned at startup and during execution. However, after upgrading to Spring boot 3.0.1(from 2.7.8) it seems like Spel isn't capable of accessing __listener
anymore, it seems like it isn't created..
In my Application I have a bean scoped to prototype, to create new instances of generic kafka listeners. The code is as follows:
XyzApplication.kt
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun createKafkaListener(
groupId: String,
topic: String,
integrations: List<Integration>,
avroSchema: String?
): GenericKafkaListener {
return GenericKafkaListener(
groupId = groupId, topic = topic, integrations = integrations, avroSchema = avroSchema)
}
GenericKafkaListener.kt
class GenericKafkaListener(
val id: String = UUID.randomUUID().toString(),
val groupId: String,
val topic: String,
val integrations: List<Integration>,
val avroSchema: String?
) {
private val schema: Schema? = avroSchema?.let { Schema.Parser().parse(it) }
@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topic}"], groupId = "#{__listener.groupId}")
fun listen(message: GenericRecord) {
integrations.forEach { it.send(message.toString()) }
}
}
The createKafkaListener is added to the applicationContext when a request is processed and a new kafka listener should be instantiated. Which happens in
StaticAppContext.kt
import xyz.controller.model.Integration
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationContext
import org.springframework.context.ApplicationContextAware
import org.springframework.stereotype.Component
@Component
object StaticAppContext : ApplicationContextAware {
private val LOG = LoggerFactory.getLogger(this::class.java)
@Volatile @JvmStatic lateinit var context: ApplicationContext
fun addKafkaListener(
groupId: String,
topic: String,
integrations: List<Integration>,
avroSchema: String?
) {
val listener =
createKafkaListener(
groupId = groupId, topic = topic, integrations = integrations, avroSchema = avroSchema)
context.autowireCapableBeanFactory.autowireBean(listener)
context.autowireCapableBeanFactory.initializeBean(listener, listener.id)
LOG.info("KafkaListener(${listener.id}) added to group: $groupId")
}
fun addHttpEndpoint(path: String) {
val endpoint = createHttpEndpoint1(path = path)
context.autowireCapableBeanFactory.autowireBean(endpoint)
context.autowireCapableBeanFactory.initializeBean(endpoint, path)
LOG.info("HttpEndpoint added: $path")
}
override fun setApplicationContext(applicationContext: ApplicationContext) {
context = applicationContext
LOG.info("ApplicationContext injected")
}
}
With v3.0.1 I get the following exception stacktrace after failing on
context.autowireCapableBeanFactory.initializeBean(listener, listener.id)
org.springframework.beans.factory.BeanExpressionException: Expression parsing failed
at org.springframework.context.expression.StandardBeanExpressionResolver.evaluate(StandardBeanExpressionResolver.java:170)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveExpression(KafkaListenerAnnotationBeanPostProcessor.java:1051)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveExpressionAsString(KafkaListenerAnnotationBeanPostProcessor.java:987)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.getEndpointId(KafkaListenerAnnotationBeanPostProcessor.java:795)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:484)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:391)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:435)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1754)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:411)
at xyz.StaticAppContext.addKafkaListener(StaticAppContext.kt:25)
at xyz.service.TriggerService.spawnKafkaTrigger(TriggerService.kt:42)
at xyz.service.TriggerService.spawnTrigger(TriggerService.kt:36)
at xyz.TriggerInitializer.onApplicationEvent(TriggerInitializer.kt:14)
at xyz.TriggerInitializer.onApplicationEvent(TriggerInitializer.kt:9)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:413)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:370)
at org.springframework.boot.context.event.EventPublishingRunListener.ready(EventPublishingRunListener.java:109)
at org.springframework.boot.SpringApplicationRunListeners.lambda$ready$6(SpringApplicationRunListeners.java:80)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:118)
at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:112)
at org.springframework.boot.SpringApplicationRunListeners.ready(SpringApplicationRunListeners.java:80)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:327)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1291)
at xyz..XyzApplicationKt.main(XyzApplication.kt:51)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field '__listener' cannot be found on object of type 'org.springframework.beans.factory.config.BeanExpressionContext' - maybe not public or not valid?
at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:217)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:104)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:92)
at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:55)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:91)
at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:112)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:273)
at org.springframework.context.expression.StandardBeanExpressionResolver.evaluate(StandardBeanExpressionResolver.java:167)
build.gradle.kts - v3.0.1
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
import java.io.ByteArrayOutputStream
import java.net.URI
plugins {
id("org.springframework.boot") version "3.0.1"
id("io.spring.dependency-management") version "1.1.0"
kotlin("jvm") version "1.8.0"
kotlin("plugin.spring") version "1.8.0"
kotlin("plugin.jpa") version "1.8.0"
kotlin("plugin.allopen") version "1.8.0"
id("com.github.davidmc24.gradle.plugin.avro") version "1.2.0"
id("org.flywaydb.flyway") version "9.14.1"
}
allOpen {
annotation("jakarta.persistence.Entity")
annotation("jakarta.persistence.Embeddable")
annotation("jakarta.persistence.MappedSuperclass")
}
tasks.getByName<Jar>("jar") {
enabled = false
}
group = "no.xyz"
java.sourceCompatibility = JavaVersion.VERSION_17
repositories {
maven { url = uri("https://packages.confluent.io/maven/") }
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-jdbc")
implementation("org.springframework.boot:spring-boot-starter-actuator")
runtimeOnly("org.springframework.boot:spring-boot-devtools")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")
implementation("io.confluent:kafka-avro-serializer:7.3.1") {
// kafka-avro-serializer 7.3.1 includes swagger core version < 2.2 which causes issues with readonly schema definition
// springdoc-openapi-starter-webmvc-ui includes an applicable version
exclude("io.swagger.core.v3")
}
implementation("org.apache.avro:avro:1.11.1")
implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.0.2")
implementation("org.flywaydb:flyway-core")
runtimeOnly("org.postgresql:postgresql")
implementation("org.ktorm:ktorm-core:3.5.0")
implementation("org.ktorm:ktorm-support-postgresql:3.5.0")
implementation("org.ktorm:ktorm-jackson:3.5.0")
// ---------- Slack ---------- \\
val slackApiVersion = "1.27.3"
implementation("com.slack.api:slack-api-client:$slackApiVersion")
implementation("com.slack.api:slack-api-model-kotlin-extension:$slackApiVersion")
implementation("com.slack.api:slack-api-client-kotlin-extension:$slackApiVersion")
// ---------- Spring Cloud ---------- \\
val springCloudVersion = "4.0.1"
implementation("org.springframework.cloud:spring-cloud-starter-openfeign:$springCloudVersion")
implementation("com.jayway.jsonpath:json-path:2.7.0")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.9.8")
implementation("com.google.cloud:spring-cloud-gcp-starter-logging:4.1.0")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.testcontainers:postgresql:1.17.6")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("io.mockk:mockk:1.13.3")
testImplementation("com.ninja-squad", "springmockk", "4.0.0")
}
build.gradle.kts - v2.7.8
import org.gradle.api.tasks.testing.logging.TestExceptionFormat
import org.gradle.api.tasks.testing.logging.TestLogEvent
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
import java.net.URI
plugins {
idea
jacoco
id("org.springframework.boot") version "2.7.8"
val kotlinVersion = "1.7.22"
kotlin("jvm") version kotlinVersion
kotlin("plugin.spring") version kotlinVersion
kotlin("plugin.jpa") version kotlinVersion
kotlin("plugin.allopen") version kotlinVersion
id("org.owasp.dependencycheck") version "8.0.2"
id("io.spring.dependency-management") version "1.1.0"
id("com.github.ben-manes.versions") version "0.43.0"
id("org.sonarqube") version "3.4.0.2513"
id("com.diffplug.spotless") version "6.14.1"
id("com.avast.gradle.docker-compose") version "0.16.11"
id("com.github.davidmc24.gradle.plugin.avro") version "1.6.0"
id("org.openapi.generator") version "5.4.0"
id("org.flywaydb.flyway") version "9.14.1"
}
apply(from = "gradle/oidc-auth.gradle.kts")
allOpen {
annotation("javax.persistence.Entity")
annotation("javax.persistence.MappedSuperclass")
}
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
maven { url = URI("https://packages.confluent.io/maven") }
mavenCentral()
}
dependencies {
// To avoid security vulnerability CVE-2022-38752, caused by spring-boot-starter-actuator and swagger-core
implementation("org.yaml:snakeyaml:1.33")
// ---------- Spring Boot ---------- \\
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.kafka:spring-kafka")
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
implementation("org.springframework.boot:spring-boot-starter-jdbc")
val springSecurityVersion = "5.7.6"
implementation("org.springframework.security:spring-security-web:$springSecurityVersion")
implementation("org.springframework.security:spring-security-core:$springSecurityVersion")
implementation("org.springframework.security:spring-security-config:$springSecurityVersion")
implementation("org.springframework.security:spring-security-crypto:$springSecurityVersion")
testImplementation("org.springframework.boot:spring-boot-starter-webflux")
// ---------- Spring Cloud ---------- \\
val springCloudVersion = "3.1.5"
implementation("org.springframework.cloud:spring-cloud-starter-openfeign:$springCloudVersion")
// ---------- Kotlin ---------- \\
implementation(platform("org.jetbrains.kotlin:kotlin-bom"))
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
// ---------- Database ---------- \\
val flywayVersion = "9.14.1"
implementation("org.flywaydb:flyway-core:$flywayVersion")
val postgresVersion = "42.5.2"
implementation("org.postgresql:postgresql:$postgresVersion")
val hibernateTypesVersion = "2.21.1"
implementation("com.vladmihalcea:hibernate-types-55:$hibernateTypesVersion")
val hibernateValidationVersion = "8.0.0.Final"
implementation("org.hibernate.validator:hibernate-validator:$hibernateValidationVersion")
// ---------- Slack ---------- \\
val slackApiVersion = "1.27.3"
implementation("com.slack.api:slack-api-client:$slackApiVersion")
implementation("com.slack.api:slack-api-model-kotlin-extension:$slackApiVersion")
implementation("com.slack.api:slack-api-client-kotlin-extension:$slackApiVersion")
// ----------- Kafka ------------ \\
val kafkaVersion = "3.4.0"
implementation("org.apache.kafka:kafka-clients:${kafkaVersion}")
val avroVersion = "1.11.1"
implementation("org.apache.avro:avro:${avroVersion}")
val confluentVersion = "7.3.1"
implementation("io.confluent:kafka-avro-serializer:${confluentVersion}")
// ---------- Jackson ---------- \\
val jacksonVersion = "2.14.2"
implementation("com.fasterxml.jackson.core:jackson-core:$jacksonVersion")
implementation("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion")
implementation("com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonVersion")
// ---------- Json Path ---------- \\
implementation("com.jayway.jsonpath:json-path:2.7.0")
implementation("tech.allegro.schema.json2avro:converter:0.2.15")
// ---------- Micrometer ---------- \\
val micrometerPrometheusRegistry = "1.10.3"
runtimeOnly("io.micrometer:micrometer-registry-prometheus:$micrometerPrometheusRegistry")
// ---------- Swagger ---------- \\
val springdocVersion = "1.6.14"
implementation("org.springdoc:springdoc-openapi-ui:$springdocVersion")
implementation("org.springdoc:springdoc-openapi-kotlin:$springdocVersion")
// ---------- Testing ---------- \\
val junitJupiterVersion = "5.9.2"
testImplementation("org.junit.jupiter:junit-jupiter:$junitJupiterVersion")
val mockKVersion = "1.13.4"
testImplementation("io.mockk:mockk:$mockKVersion")
val wiremockVersion = "2.27.2"
testImplementation("com.github.tomakehurst:wiremock:$wiremockVersion")
// Forcing upgrade of handlebars due to owasp vulnerability, can probably be removed when oidc-auth-spring-boot-starter-test is updated above 1.2.4
testImplementation("com.github.jknack:handlebars:4.3.1")
}
I'm not that experienced with Spring Boot, but I've tried to read the changelog, however from my understanding it didn't indicate any obvious changes which could create this issue.. Then again, I'm not that experienced...
So what am I looking for here? Just some pointers in the right direction to how I can solve the initializing problem or avoid the problem all together - maybe I need some annotations, imports or structure the code in another way to achieve what I'm trying to do. Help is much appreciated!
Update - Solution
Adding the @KafkaListener annotation to the class level and added @KafkaHandler to the function fixed it...
Shown below
GenericKafkaListener.kt
import xyz.controller.model.Integration
import org.apache.avro.Schema
import org.springframework.kafka.annotation.KafkaHandler
import org.springframework.kafka.annotation.KafkaListener
import java.util.UUID
@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topic}"], groupId = "#{__listener.groupId}", )
class GenericKafkaListener(
val id: String = UUID.randomUUID().toString(),
val groupId: String,
val topic: String,
val integrations: List<Integration>,
val avroSchema: String?
) {
private val schema: Schema? = avroSchema?.let { Schema.Parser().parse(it) }
@KafkaHandler
fun onMessage(message: String) {
integrations.forEach { it.send(message) }
}
}
答案1
得分: 1
在3.0.2版本中已修复 https://github.com/spring-projects/spring-kafka/issues/2521
> KafkaListener 回归 - 伪 Bean __listener 在 Id 属性中不可用
英文:
Fixed in 3.0.2 https://github.com/spring-projects/spring-kafka/issues/2521
>KafkaListener Regression - Pseudo Bean __listener Not Available in the Id Attribute
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论