从Kafka主题读取消息并将其转储到HDFS。

huangapple go评论119阅读模式
英文:

Reading Message from Kafka Topic and Dump it into HDFS

问题

我试图从Kafka主题消费数据,将其加载到数据集中,然后在加载到HDFS之前进行筛选。

我已经能够从Kafka主题消费数据,将其加载到数据集中,并将其保存为Parquet文件在HDFS中,但在保存到HDFS之前无法执行筛选条件。你可以分享一下在保存到HDFS之前执行筛选的方法吗?
我在使用Java和Spark从Kafka主题中消费数据。
我的代码的一部分如下:

  1. DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
  2. ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
  3. StreamingQuery query = ds.coalesce(10)
  4. .writeStream()
  5. .format("parquet")
  6. .option("path", path.toString())
  7. .option("checkpointLocation", "<your path>")
  8. .trigger(Trigger.Once())
  9. .start();
英文:

I am trying to consume data from Kafka Topic, load it into Dataset and then perform filter before load into into Hdfs.

I am able to consume from kafka topic, load it into dataset and save as a parquet file in HDFS But not able to perform filter condition. can you please share the way to perform filter before save to hdfs?
i am using Java with Spark to consume from kafka topic.
some part of my code is like this:

  1. DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
  2. ds = dataframe.fromConfluentAvro(&quot;value&quot;, &lt;your schema path&gt;, &lt;yourmap&gt;, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
  3. StreamingQuery query = ds.coalesce(10)
  4. .writeStream()
  5. .format(&quot;parquet&quot;)
  6. .option(&quot;path&quot;, path.toString())
  7. .option(&quot;checkpointLocation&quot;, &quot;&lt;your path&gt;&quot;)
  8. .trigger(Trigger.Once())
  9. .start();

答案1

得分: 1

coalesce之前编写过滤逻辑,即ds.filter().coalesce()

  1. DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
  2. ds = dataframe.fromConfluentAvro("value", <your schema path>, <yourmap>, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
  3. StreamingQuery query =
  4. ds
  5. .filter(...) // 在这里编写您的过滤条件
  6. .coalesce(10)
  7. .writeStream()
  8. .format("parquet")
  9. .option("path", path.toString())
  10. .option("checkpointLocation", "<your path>")
  11. .trigger(Trigger.Once())
  12. .start();
英文:

Write filter logic before coalesce i.e ds.filter().coalesce()

  1. DataframeDeserializer dataframe = new DataframeDeserializer(dataset);
  2. ds = dataframe.fromConfluentAvro(&quot;value&quot;, &lt;your schema path&gt;, &lt;yourmap&gt;, RETAIN_SELECTED_COLUMN_ONLY$.MODULE$);
  3. StreamingQuery query =
  4. ds
  5. .filter(...) // Write your filter condition here
  6. .coalesce(10)
  7. .writeStream()
  8. .format(&quot;parquet&quot;)
  9. .option(&quot;path&quot;, path.toString())
  10. .option(&quot;checkpointLocation&quot;, &quot;&lt;your path&gt;&quot;)
  11. .trigger(Trigger.Once())
  12. .start();

答案2

得分: 1

不需要重新发明轮子,我强烈推荐使用Kafka Connect
你只需要使用HDFS Sink Connector,它可以将数据从Kafka主题复制到HDFS。

英文:

Instead of re-inventing the wheel, I would strongly recommend Kafka Connect.
All you need is the HDFS Sink Connector, that replicates the data from a Kafka topic to HDFS.

huangapple
  • 本文由 发表于 2020年5月5日 08:08:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/61603687.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定