英文:
Reading Message from Kafka Topic and Dump it into HDFS
问题
我试图从Kafka主题消费数据,将其加载到数据集中,然后在加载到HDFS之前进行筛选。
我已经能够从Kafka主题消费数据,将其加载到数据集中,并将其保存为Parquet文件在HDFS中,但在保存到HDFS之前无法执行筛选条件。你可以分享一下在保存到HDFS之前执行筛选的方法吗?
我在使用Java和Spark从Kafka主题中消费数据。
我的代码的一部分如下:
DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
StreamingQuery query = ds.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", "<your path>")
.trigger(Trigger.Once())
.start();
英文:
I am trying to consume data from Kafka Topic, load it into Dataset and then perform filter before load into into Hdfs.
I am able to consume from kafka topic, load it into dataset and save as a parquet file in HDFS But not able to perform filter condition. can you please share the way to perform filter before save to hdfs?
i am using Java with Spark to consume from kafka topic.
some part of my code is like this:
DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
StreamingQuery query = ds.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", "<your path>")
.trigger(Trigger.Once())
.start();
答案1
得分: 1
在coalesce
之前编写过滤逻辑,即ds.filter().coalesce()
DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
StreamingQuery query =
ds
.filter(...) // 在这里编写您的过滤条件
.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", "<your path>")
.trigger(Trigger.Once())
.start();
英文:
Write filter logic before coalesce
i.e ds.filter().coalesce()
DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
StreamingQuery query =
ds
.filter(...) // Write your filter condition here
.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", "<your path>")
.trigger(Trigger.Once())
.start();
答案2
得分: 1
不需要重新发明轮子,我强烈推荐使用Kafka Connect。
你只需要使用HDFS Sink Connector,它可以将数据从Kafka主题复制到HDFS。
- 对于HDFS 2.x 文件,你可以使用HDFS 2 Sink Connector。
- 对于HDFS 3.x 文件,使用HDFS 3 Sink Connector。
英文:
Instead of re-inventing the wheel, I would strongly recommend Kafka Connect.
All you need is the HDFS Sink Connector, that replicates the data from a Kafka topic to HDFS.
-
For HDFS 2.x files you can use HDFS 2 Sink Connector
-
For HDFS 3.x files use the HDFS 3 Sink Connector
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论