英文:
Scala: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field Child1.myfun of type scala.Function1
问题
我想在父类中将函数名作为参数,以便子类可以设置它。这个变量将在父类的一个方法中使用。
abstract class Parent[T: TypeInformation] {
val myfun: T => Unit
// 另一个方法使用 myfun
}
class Child1 extends Parent[User] {
val service = new Service()
val myfun: User => Unit = service.callme
}
class Service {
def callme(user: User): Unit = {
println("我们在处理用户")
}
}
我对Scala还不太熟悉,但这看起来还可以。尽管编译器没有抱怨,但我在运行时遇到了异常,作业无法启动:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: 无法实例化用户函数。
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:250)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: 无法将 java.lang.invoke.SerializedLambda 的实例分配到类型为 scala.Function1 的 Child1.myfun 字段中
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2054)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1635)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
... 11 more
有没有什么办法可以在子类中设置函数名作为变量值,以便父类可以使用它?
英文:
I want to take function name as a parameter in parent class so child class can set it. This variable will be used in one of the parent class' method.
abstract class Parent[T: TypeInformation] {
val myfun: T => Unit
// A different method uses myfun
}
class Child1 extends Parent[User] {
val service = new Service()
val myfun: User => Unit = service.callme
}
class Service {
def callme(user: User) => Unit = {
println("We are here for user")
}
}
I'm new to scala but this looked okay. While compiler doesn't complain, I am getting run time exception and the job won't start:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:250)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field Child1.myfun of type scala.Function1 in instance of Child1
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2054)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1635)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
... 11 more
Any idea how can I set the function name as variable value in child so parent can use it?
答案1
得分: 5
你在谈论传递函数的 名称,但实际上你传递的是函数本身。看起来这在通过 flink 进行序列化时无法保留。
另一种编码可能是根本不使用函数值,而是使用普通的方法:
abstract class Parent[T: TypeInformation] {
def myfun(t: T): Unit
// 另一个方法使用 myfun
}
class Child1 extends Parent[User] {
val service = new Service()
def myfun(t: User): Unit = service.callme(t)
}
class Service {
def callme(user: User): Unit = {
println("我们在为用户提供服务")
}
}
如果这不能直接奏效,至少它应该能给你一个更清晰的错误信息(可能涉及到类路径上是否存在 Service
)。
修复这个问题可能会让你回到之前的编码方式。
英文:
You are talking about passing function names, but what you're passing is the actual function itself. It looks like that doesn't survive serialization through flink.
An alternative encoding might be not using a function value at all, but a plain old method:
abstract class Parent[T: TypeInformation] {
def myfun(t: T): Unit
// A different method uses myfun
}
class Child1 extends Parent[User] {
val service = new Service()
def myfun(t: User): Unit = service.callme(t)
}
class Service {
def callme(user: User) => Unit = {
println("We are here for user")
}
}
If this doesn't work out of the box, it should at least give you a clearer error message (possibly about the availability of Service
on the class path.
Fixing that issue might then allow you to go back to your previous encoding
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论