从Kafka主题读取消息并将其转储到HDFS。

huangapple go评论104阅读模式
英文:

Reading Message from Kafka Topic and Dump it into HDFS

问题

我试图从Kafka主题消费数据,将其加载到数据集中,然后在加载到HDFS之前进行筛选。

我已经能够从Kafka主题消费数据,将其加载到数据集中,并将其保存为Parquet文件在HDFS中,但在保存到HDFS之前无法执行筛选条件。你可以分享一下在保存到HDFS之前执行筛选的方法吗?
我在使用Java和Spark从Kafka主题中消费数据。
我的代码的一部分如下:

DataframeDeserializer dataframe = new DataframeDeserializer(dataset);

ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);

StreamingQuery query = ds.coalesce(10)
                    .writeStream()
                    .format("parquet")
                    .option("path", path.toString())
                    .option("checkpointLocation", "<your path>")
                    .trigger(Trigger.Once())
                    .start();
英文:

I am trying to consume data from Kafka Topic, load it into Dataset and then perform filter before load into into Hdfs.

I am able to consume from kafka topic, load it into dataset and save as a parquet file in HDFS But not able to perform filter condition. can you please share the way to perform filter before save to hdfs?
i am using Java with Spark to consume from kafka topic.
some part of my code is like this:

DataframeDeserializer dataframe = new DataframeDeserializer(dataset);

 ds = dataframe.fromConfluentAvro(&quot;value&quot;, &lt;your schema path&gt;, &lt;yourmap&gt;, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);

StreamingQuery query = ds.coalesce(10)
                .writeStream()
                .format(&quot;parquet&quot;)
                .option(&quot;path&quot;, path.toString())
                .option(&quot;checkpointLocation&quot;, &quot;&lt;your path&gt;&quot;)
                .trigger(Trigger.Once())
                .start();

答案1

得分: 1

coalesce之前编写过滤逻辑,即ds.filter().coalesce()

DataframeDeserializer dataframe = new DataframeDeserializer(dataset);

 ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);

StreamingQuery query = 
                ds
                .filter(...) // 在这里编写您的过滤条件
                .coalesce(10)
                .writeStream()
                .format("parquet")
                .option("path", path.toString())
                .option("checkpointLocation", "<your path>")
                .trigger(Trigger.Once())
                .start();
英文:

Write filter logic before coalesce i.e ds.filter().coalesce()


DataframeDeserializer dataframe = new DataframeDeserializer(dataset);

 ds = dataframe.fromConfluentAvro(&quot;value&quot;, &lt;your schema path&gt;, &lt;yourmap&gt;, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);

StreamingQuery query = 
                ds
                .filter(...) // Write your filter condition here
                .coalesce(10)
                .writeStream()
                .format(&quot;parquet&quot;)
                .option(&quot;path&quot;, path.toString())
                .option(&quot;checkpointLocation&quot;, &quot;&lt;your path&gt;&quot;)
                .trigger(Trigger.Once())
                .start();


答案2

得分: 1

不需要重新发明轮子,我强烈推荐使用Kafka Connect
你只需要使用HDFS Sink Connector,它可以将数据从Kafka主题复制到HDFS。

英文:

Instead of re-inventing the wheel, I would strongly recommend Kafka Connect.
All you need is the HDFS Sink Connector, that replicates the data from a Kafka topic to HDFS.

huangapple
  • 本文由 发表于 2020年5月5日 08:08:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/61603687.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定