英文:
Apache flink join: ".apply()" does not exist? (scala)
问题
不是 Flink 的问题,而是缺少一个闭合括号!
我试图按照1.16(当前版本,2023年3月7日发布)中的示例操作。
示例代码如下:
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>);
我的代码如下:
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
private val result = streamA.join(streamB)
.where(new MySelector().getKey)
.equalTo(new MySelector().getKey)
.window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(60)))
.apply() // <- 函数不存在!
IntelliJ 给出了一些选项,但没有 apply
(请参见截图)。有什么想法吗?
build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.12"
lazy val root = (project in file("."))
.settings(
name := "apache-flink-example",
libraryDependencies += "org.apache.flink" % "flink-streaming-scala_2.12" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-connectors" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-core" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-java8" % "0.10.1",
libraryDependencies += "org.apache.flink" % "flink-quickstart" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-libraries" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-clients" % "1.16.1",
libraryDependencies += "org.json4s" %% "json4s-native" % "4.0.6",
)
英文:
Not an issue with flink, but missing a closing bracket!
I am trying to follow the examples from the 1.16 (current, 2023-03-07) release.
The example is this:
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>);
My code looks like this:
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
private val result = streamA.join(streamB)
.where(new MySelector().getKey)
.equalTo(new MySelector().getKey)
.window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(60))
.apply() // <- function does not exist!
Intellij gives a couple of options there but no apply
(see screenshot). Any ideas?
build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.12"
lazy val root = (project in file("."))
.settings(
name := "apache-flink-example",
libraryDependencies += "org.apache.flink" % "flink-streaming-scala_2.12" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-connectors" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-core" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-java8" % "0.10.1",
libraryDependencies += "org.apache.flink" % "flink-quickstart" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-libraries" % "1.16.1",
libraryDependencies += "org.apache.flink" % "flink-clients" % "1.16.1",
libraryDependencies += "org.json4s" %% "json4s-native" % "4.0.6",
)
答案1
得分: 1
你看到的方法是SlidingEventTimeWindows
类可用的方法,因为函数没有关闭。
窗口函数中缺少一个括号。请尝试这样:
private val result = streamA.join(streamB)
.where(new MySelector().getKey)
.equalTo(new MySelector().getKey)
.window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(60))) //检查括号
.apply()
英文:
The methods you see are the methods available for the SlidingEventTimeWindows
class, because the function is not closed.
There is a parenthesis missing in the window function. Try this:
private val result = streamA.join(streamB)
.where(new MySelector().getKey)
.equalTo(new MySelector().getKey)
.window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(60))) //check the parenthesis
.apply()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论