@KafkaListener 在Spring 3.0.1中的BeanExpressionContext中不可用,在v2.7.8中可用。

huangapple go评论85阅读模式
英文:

@KafkaListener __listener not available in BeanExpressionContext in Spring 3.0.1, works in v2.7.8

问题

以下是您要翻译的内容:

I'm creating a generic kafka listener which is spawned at startup and during execution. However, after upgrading to Spring boot 3.0.1 (from 2.7.8) it seems like SpEL isn't capable of accessing __listener anymore, it seems like it isn't created...

In my Application I have a bean scoped to prototype, to create new instances of generic kafka listeners. The code is as follows:

XyzApplication.kt

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun createKafkaListener(
    groupId: String,
    topic: String,
    integrations: List<Integration>,
    avroSchema: String?
): GenericKafkaListener {
    return GenericKafkaListener(
        groupId = groupId, topic = topic, integrations = integrations, avroSchema = avroSchema)
}

GenericKafkaListener.kt

class GenericKafkaListener(
  val id: String = UUID.randomUUID().toString(),
  val groupId: String,
  val topic: String,
  val integrations: List<Integration>,
  val avroSchema: String?
) {

  private val schema: Schema? = avroSchema?.let { Schema.Parser().parse(it) }

  @KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topic}"], groupId = "#{__listener.groupId}")
  fun listen(message: GenericRecord) {
    integrations.forEach { it.send(message.toString()) }
  }
}

The createKafkaListener is added to the applicationContext when a request is processed and a new kafka listener should be instantiated. Which happens in

StaticAppContext.kt

import xyz.controller.model.Integration
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationContext
import org.springframework.context.ApplicationContextAware
import org.springframework.stereotype.Component

@Component
object StaticAppContext : ApplicationContextAware {

  private val LOG = LoggerFactory.getLogger(this::class.java)
  @Volatile @JvmStatic lateinit var context: ApplicationContext

  fun addKafkaListener(
      groupId: String,
      topic: String,
      integrations: List<Integration>,
      avroSchema: String?
  ) {
    val listener =
        createKafkaListener(
            groupId = groupId, topic = topic, integrations = integrations, avroSchema = avroSchema)
    context.autowireCapableBeanFactory.autowireBean(listener)
    context.autowireCapableBeanFactory.initializeBean(listener, listener.id)
    LOG.info("KafkaListener(${listener.id}) added to group: $groupId")
  }

  fun addHttpEndpoint(path: String) {
    val endpoint = createHttpEndpoint1(path = path)
    context.autowireCapableBeanFactory.autowireBean(endpoint)
    context.autowireCapableBeanFactory.initializeBean(endpoint, path)

    LOG.info("HttpEndpoint added: $path")
  }

  override fun setApplicationContext(applicationContext: ApplicationContext) {
    context = applicationContext
    LOG.info("ApplicationContext injected")
  }
}

With v3.0.1 I get the following exception stacktrace after failing on

context.autowireCapableBeanFactory.initializeBean(listener, listener.id)

org.springframework.beans.factory.BeanExpressionException: Expression parsing failed
...

build.gradle.kts - v3.0.1
...

build.gradle.kts - v2.7.8
...

I'm not that experienced with Spring Boot, but I've tried to read the changelog, however from my understanding it didn't indicate any obvious changes which could create this issue.. Then again, I'm not that experienced...

So what am I looking for here? Just some pointers in the right direction to how I can solve the initializing problem or avoid the problem all together - maybe I need some annotations, imports or structure the code in another way to achieve what I'm trying to do. Help is much appreciated!

Update - Solution
Adding the @KafkaListener annotation to the class level and added @KafkaHandler to the function fixed it...
Shown below

GenericKafkaListener.kt

import xyz.controller.model.Integration
import org.apache.avro.Schema
import org.springframework.kafka.annotation.KafkaHandler
import org.springframework.kafka.annotation.KafkaListener
import java.util.UUID

@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topic}"], groupId = "#{__listener.groupId}", )
class GenericKafkaListener(
  val id: String = UUID.randomUUID().toString(),
  val groupId: String,
  val topic: String,
  val integrations: List<Integration>,
  val avroSchema: String?
) {

  private val schema: Schema? = avroSchema?.let { Schema.Parser().parse(it) }

  @KafkaHandler
  fun onMessage(message: String) {
    integrations.forEach { it.send(message) }
  }
}

希望这些翻译对您有所帮助。如果您需要进一步的帮助,请随时提问。

英文:

I'm creating a generic kafka listener which is spawned at startup and during execution. However, after upgrading to Spring boot 3.0.1(from 2.7.8) it seems like Spel isn't capable of accessing __listener anymore, it seems like it isn't created..

In my Application I have a bean scoped to prototype, to create new instances of generic kafka listeners. The code is as follows:

XyzApplication.kt

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun createKafkaListener(
    groupId: String,
    topic: String,
    integrations: List<Integration>,
    avroSchema: String?
): GenericKafkaListener {
    return GenericKafkaListener(
        groupId = groupId, topic = topic, integrations = integrations, avroSchema = avroSchema)
}

GenericKafkaListener.kt

class GenericKafkaListener(
  val id: String = UUID.randomUUID().toString(),
  val groupId: String,
  val topic: String,
  val integrations: List<Integration>,
  val avroSchema: String?
) {

  private val schema: Schema? = avroSchema?.let { Schema.Parser().parse(it) }

  @KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topic}"], groupId = "#{__listener.groupId}")
  fun listen(message: GenericRecord) {
    integrations.forEach { it.send(message.toString()) }
  }
}

The createKafkaListener is added to the applicationContext when a request is processed and a new kafka listener should be instantiated. Which happens in

StaticAppContext.kt

import xyz.controller.model.Integration
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationContext
import org.springframework.context.ApplicationContextAware
import org.springframework.stereotype.Component

@Component
object StaticAppContext : ApplicationContextAware {

  private val LOG = LoggerFactory.getLogger(this::class.java)
  @Volatile @JvmStatic lateinit var context: ApplicationContext

  fun addKafkaListener(
      groupId: String,
      topic: String,
      integrations: List<Integration>,
      avroSchema: String?
  ) {
    val listener =
        createKafkaListener(
            groupId = groupId, topic = topic, integrations = integrations, avroSchema = avroSchema)
    context.autowireCapableBeanFactory.autowireBean(listener)
    context.autowireCapableBeanFactory.initializeBean(listener, listener.id)
    LOG.info("KafkaListener(${listener.id}) added to group: $groupId")
  }

  fun addHttpEndpoint(path: String) {
    val endpoint = createHttpEndpoint1(path = path)
    context.autowireCapableBeanFactory.autowireBean(endpoint)
    context.autowireCapableBeanFactory.initializeBean(endpoint, path)

    LOG.info("HttpEndpoint added: $path")
  }

  override fun setApplicationContext(applicationContext: ApplicationContext) {
    context = applicationContext
    LOG.info("ApplicationContext injected")
  }
}

With v3.0.1 I get the following exception stacktrace after failing on

context.autowireCapableBeanFactory.initializeBean(listener, listener.id)
org.springframework.beans.factory.BeanExpressionException: Expression parsing failed
	at org.springframework.context.expression.StandardBeanExpressionResolver.evaluate(StandardBeanExpressionResolver.java:170)
	at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveExpression(KafkaListenerAnnotationBeanPostProcessor.java:1051)
	at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveExpressionAsString(KafkaListenerAnnotationBeanPostProcessor.java:987)
	at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.getEndpointId(KafkaListenerAnnotationBeanPostProcessor.java:795)
	at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:484)
	at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:391)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:435)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1754)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:411)
	at xyz.StaticAppContext.addKafkaListener(StaticAppContext.kt:25)
	at xyz.service.TriggerService.spawnKafkaTrigger(TriggerService.kt:42)
	at xyz.service.TriggerService.spawnTrigger(TriggerService.kt:36)
	at xyz.TriggerInitializer.onApplicationEvent(TriggerInitializer.kt:14)
	at xyz.TriggerInitializer.onApplicationEvent(TriggerInitializer.kt:9)
	at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
	at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
	at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
	at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:413)
	at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:370)
	at org.springframework.boot.context.event.EventPublishingRunListener.ready(EventPublishingRunListener.java:109)
	at org.springframework.boot.SpringApplicationRunListeners.lambda$ready$6(SpringApplicationRunListeners.java:80)
	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
	at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:118)
	at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:112)
	at org.springframework.boot.SpringApplicationRunListeners.ready(SpringApplicationRunListeners.java:80)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:327)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1291)
	at xyz..XyzApplicationKt.main(XyzApplication.kt:51)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field '__listener' cannot be found on object of type 'org.springframework.beans.factory.config.BeanExpressionContext' - maybe not public or not valid?
	at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:217)
	at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:104)
	at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:92)
	at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:55)
	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:91)
	at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:112)
	at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:273)
	at org.springframework.context.expression.StandardBeanExpressionResolver.evaluate(StandardBeanExpressionResolver.java:167)

build.gradle.kts - v3.0.1

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
import java.io.ByteArrayOutputStream
import java.net.URI

plugins {
	id("org.springframework.boot") version "3.0.1"
	id("io.spring.dependency-management") version "1.1.0"
	kotlin("jvm") version "1.8.0"
	kotlin("plugin.spring") version "1.8.0"
	kotlin("plugin.jpa") version "1.8.0"
	kotlin("plugin.allopen") version "1.8.0"
	id("com.github.davidmc24.gradle.plugin.avro") version "1.2.0"
	id("org.flywaydb.flyway") version "9.14.1"
}

allOpen {
	annotation("jakarta.persistence.Entity")
	annotation("jakarta.persistence.Embeddable")
	annotation("jakarta.persistence.MappedSuperclass")
}

tasks.getByName<Jar>("jar") {
	enabled = false
}

group = "no.xyz"
java.sourceCompatibility = JavaVersion.VERSION_17


repositories {
	maven { url = uri("https://packages.confluent.io/maven/") }
	mavenCentral()
}

dependencies {
	implementation("org.springframework.boot:spring-boot-starter-web")
	implementation("org.springframework.boot:spring-boot-starter-jdbc")
	implementation("org.springframework.boot:spring-boot-starter-actuator")
	runtimeOnly("org.springframework.boot:spring-boot-devtools")
	implementation("org.jetbrains.kotlin:kotlin-reflect")
	implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")

	implementation("org.springframework.kafka:spring-kafka")
	implementation("io.confluent:kafka-avro-serializer:7.3.1") {
		// kafka-avro-serializer 7.3.1 includes swagger core version < 2.2 which causes issues with readonly schema definition
		// springdoc-openapi-starter-webmvc-ui includes an applicable version
		exclude("io.swagger.core.v3")
	}
	implementation("org.apache.avro:avro:1.11.1")

	implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.0.2")

	implementation("org.flywaydb:flyway-core")
	runtimeOnly("org.postgresql:postgresql")

	implementation("org.ktorm:ktorm-core:3.5.0")
	implementation("org.ktorm:ktorm-support-postgresql:3.5.0")
	implementation("org.ktorm:ktorm-jackson:3.5.0")


	// ---------- Slack ---------- \\
	val slackApiVersion = "1.27.3"
	implementation("com.slack.api:slack-api-client:$slackApiVersion")
	implementation("com.slack.api:slack-api-model-kotlin-extension:$slackApiVersion")
	implementation("com.slack.api:slack-api-client-kotlin-extension:$slackApiVersion")

	// ---------- Spring Cloud ---------- \\
	val springCloudVersion = "4.0.1"
	implementation("org.springframework.cloud:spring-cloud-starter-openfeign:$springCloudVersion")

	implementation("com.jayway.jsonpath:json-path:2.7.0")
	implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
	implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.9.8")

	implementation("com.google.cloud:spring-cloud-gcp-starter-logging:4.1.0")

	
	testImplementation("org.springframework.boot:spring-boot-starter-test")
	testImplementation("org.testcontainers:postgresql:1.17.6")
	testImplementation("org.springframework.kafka:spring-kafka-test")
	testImplementation("io.mockk:mockk:1.13.3")
	testImplementation("com.ninja-squad", "springmockk", "4.0.0")
}

build.gradle.kts - v2.7.8

import org.gradle.api.tasks.testing.logging.TestExceptionFormat
import org.gradle.api.tasks.testing.logging.TestLogEvent
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
import java.net.URI

plugins {
	idea
	jacoco
	id("org.springframework.boot") version "2.7.8"

	val kotlinVersion = "1.7.22"
	kotlin("jvm") version kotlinVersion
	kotlin("plugin.spring") version kotlinVersion
	kotlin("plugin.jpa") version kotlinVersion
	kotlin("plugin.allopen") version kotlinVersion

	id("org.owasp.dependencycheck") version "8.0.2"
	id("io.spring.dependency-management") version "1.1.0"
	id("com.github.ben-manes.versions") version "0.43.0"
	id("org.sonarqube") version "3.4.0.2513"
	id("com.diffplug.spotless") version "6.14.1"
	id("com.avast.gradle.docker-compose") version "0.16.11"
	id("com.github.davidmc24.gradle.plugin.avro") version "1.6.0"
	id("org.openapi.generator") version "5.4.0"
	id("org.flywaydb.flyway") version "9.14.1"
}

apply(from = "gradle/oidc-auth.gradle.kts")

allOpen {
	annotation("javax.persistence.Entity")
	annotation("javax.persistence.MappedSuperclass")
}

java.sourceCompatibility = JavaVersion.VERSION_11

repositories {
	maven { url = URI("https://packages.confluent.io/maven") }
	mavenCentral()
}

dependencies {
	// To avoid security vulnerability CVE-2022-38752, caused by spring-boot-starter-actuator and swagger-core
	implementation("org.yaml:snakeyaml:1.33")

	// ---------- Spring Boot ---------- \\
	implementation("org.springframework.boot:spring-boot-starter")
	implementation("org.springframework.kafka:spring-kafka")
	implementation("org.springframework.boot:spring-boot-starter-data-jpa")
	implementation("org.springframework.boot:spring-boot-starter-jdbc")
	val springSecurityVersion = "5.7.6"
	implementation("org.springframework.security:spring-security-web:$springSecurityVersion")
	implementation("org.springframework.security:spring-security-core:$springSecurityVersion")
	implementation("org.springframework.security:spring-security-config:$springSecurityVersion")
	implementation("org.springframework.security:spring-security-crypto:$springSecurityVersion")

	testImplementation("org.springframework.boot:spring-boot-starter-webflux")

	// ---------- Spring Cloud ---------- \\
	val springCloudVersion = "3.1.5"
	implementation("org.springframework.cloud:spring-cloud-starter-openfeign:$springCloudVersion")

	// ---------- Kotlin ---------- \\
	implementation(platform("org.jetbrains.kotlin:kotlin-bom"))
	implementation("org.jetbrains.kotlin:kotlin-reflect")
	implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")

	// ---------- Database ---------- \\
	val flywayVersion = "9.14.1"
	implementation("org.flywaydb:flyway-core:$flywayVersion")
	val postgresVersion = "42.5.2"
	implementation("org.postgresql:postgresql:$postgresVersion")
	val hibernateTypesVersion = "2.21.1"
	implementation("com.vladmihalcea:hibernate-types-55:$hibernateTypesVersion")
	val hibernateValidationVersion = "8.0.0.Final"
	implementation("org.hibernate.validator:hibernate-validator:$hibernateValidationVersion")

	// ---------- Slack ---------- \\
	val slackApiVersion = "1.27.3"
	implementation("com.slack.api:slack-api-client:$slackApiVersion")
	implementation("com.slack.api:slack-api-model-kotlin-extension:$slackApiVersion")
	implementation("com.slack.api:slack-api-client-kotlin-extension:$slackApiVersion")

	// ----------- Kafka ------------ \\
	val kafkaVersion = "3.4.0"
	implementation("org.apache.kafka:kafka-clients:${kafkaVersion}")
	val avroVersion = "1.11.1"
	implementation("org.apache.avro:avro:${avroVersion}")
	val confluentVersion = "7.3.1"
	implementation("io.confluent:kafka-avro-serializer:${confluentVersion}")
	
	// ---------- Jackson ---------- \\
	val jacksonVersion = "2.14.2"
	implementation("com.fasterxml.jackson.core:jackson-core:$jacksonVersion")
	implementation("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion")
	implementation("com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion")
	implementation("com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonVersion")

	// ---------- Json Path ---------- \\
	implementation("com.jayway.jsonpath:json-path:2.7.0")

	implementation("tech.allegro.schema.json2avro:converter:0.2.15")

	// ---------- Micrometer ---------- \\
	val micrometerPrometheusRegistry = "1.10.3"
	runtimeOnly("io.micrometer:micrometer-registry-prometheus:$micrometerPrometheusRegistry")

	// ---------- Swagger ---------- \\
	val springdocVersion = "1.6.14"
	implementation("org.springdoc:springdoc-openapi-ui:$springdocVersion")
	implementation("org.springdoc:springdoc-openapi-kotlin:$springdocVersion")


	// ---------- Testing ---------- \\
	val junitJupiterVersion = "5.9.2"
	testImplementation("org.junit.jupiter:junit-jupiter:$junitJupiterVersion")

	val mockKVersion = "1.13.4"
	testImplementation("io.mockk:mockk:$mockKVersion")

	val wiremockVersion = "2.27.2"
	testImplementation("com.github.tomakehurst:wiremock:$wiremockVersion")

	// Forcing upgrade of handlebars due to owasp vulnerability, can probably be removed when oidc-auth-spring-boot-starter-test is updated above 1.2.4
	testImplementation("com.github.jknack:handlebars:4.3.1")
}

I'm not that experienced with Spring Boot, but I've tried to read the changelog, however from my understanding it didn't indicate any obvious changes which could create this issue.. Then again, I'm not that experienced...

So what am I looking for here? Just some pointers in the right direction to how I can solve the initializing problem or avoid the problem all together - maybe I need some annotations, imports or structure the code in another way to achieve what I'm trying to do. Help is much appreciated!

Update - Solution
Adding the @KafkaListener annotation to the class level and added @KafkaHandler to the function fixed it...
Shown below

GenericKafkaListener.kt

import xyz.controller.model.Integration
import org.apache.avro.Schema
import org.springframework.kafka.annotation.KafkaHandler
import org.springframework.kafka.annotation.KafkaListener
import java.util.UUID

@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topic}"], groupId = "#{__listener.groupId}", )
class GenericKafkaListener(
  val id: String = UUID.randomUUID().toString(),
  val groupId: String,
  val topic: String,
  val integrations: List<Integration>,
  val avroSchema: String?
) {

  private val schema: Schema? = avroSchema?.let { Schema.Parser().parse(it) }

  @KafkaHandler
  fun onMessage(message: String) {
    integrations.forEach { it.send(message) }
  }
}

答案1

得分: 1

在3.0.2版本中已修复 https://github.com/spring-projects/spring-kafka/issues/2521

> KafkaListener 回归 - 伪 Bean __listener 在 Id 属性中不可用

英文:

Fixed in 3.0.2 https://github.com/spring-projects/spring-kafka/issues/2521

>KafkaListener Regression - Pseudo Bean __listener Not Available in the Id Attribute

huangapple
  • 本文由 发表于 2023年2月27日 17:23:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/75578659.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定