从Azure事件中心使用Kafka驱动程序读取数据似乎没有收到任何数据。

huangapple go评论82阅读模式
英文:

Reading from Azure Event hub with Kafka driver doesn't seem to get any data

问题

我在Azure Databricks的Python笔记本中运行以下代码:

  1. TOPIC = "myeventhub"
  2. BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093"
  3. EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhubns.servicebus.windows.net/;SharedAccessKeyName=MyKeyName;SharedAccessKey=myaccesskey;\";"
  4. df = spark.readStream \
  5. .format("kafka") \
  6. .option("subscribe", TOPIC) \
  7. .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
  8. .option("kafka.sasl.mechanism", "PLAIN") \
  9. .option("kafka.security.protocol", "SASL_SSL") \
  10. .option("kafka.sasl.jaas.config", EH_SASL) \
  11. .option("kafka.request.timeout.ms", "60000") \
  12. .option("kafka.session.timeout.ms", "60000") \
  13. .option("failOnDataLoss", "false") \
  14. .option("startingOffsets", "earliest") \
  15. .load()
  16. df_write = df.writeStream \
  17. .outputMode("append") \
  18. .format("console") \
  19. .start() \
  20. .awaitTermination()

笔记本中没有显示任何输出。我应该如何调试问题?

(请注意,我只提供代码的翻译部分,不包括问题或其他内容。)

英文:

I'm running the following code in an Azure Databricks python notebook:

  1. TOPIC = "myeventhub"
  2. BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093"
  3. EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhubns.servicebus.windows.net/;SharedAccessKeyName=MyKeyName;SharedAccessKey=myaccesskey;\";"
  4. df = spark.readStream \
  5. .format("kafka") \
  6. .option("subscribe", TOPIC) \
  7. .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
  8. .option("kafka.sasl.mechanism", "PLAIN") \
  9. .option("kafka.security.protocol", "SASL_SSL") \
  10. .option("kafka.sasl.jaas.config", EH_SASL) \
  11. .option("kafka.request.timeout.ms", "60000") \
  12. .option("kafka.session.timeout.ms", "60000") \
  13. .option("failOnDataLoss", "false") \
  14. .option("startingOffsets", "earliest") \
  15. .load()
  16. df_write = df.writeStream \
  17. .outputMode("append") \
  18. .format("console") \
  19. .start() \
  20. .awaitTermination()

This shows no output in the notebook. How could I debug what the problem is?

答案1

得分: 1

如果您使用.format("console"),则输出不会出现在笔记本中,而会出现在驱动程序和执行程序的日志中 - 这是Spark和Databricks之间的区别。

如果您想查看数据,只需使用display函数:

  1. display(df)
英文:

If you use .format("console") then output won't be in the notebook, it will be in the driver & executor logs - it's a difference between Spark and Databricks.

If you want to see the data, just use the display function:

  1. display(df)

答案2

得分: 1

这段代码现在以非常低的延迟写入数据。当我在SQL数据仓库中进行选择时,最新的数据点大约是10秒前的。问题仍然是foreachBatch没有运行,但除此之外,它是可以工作的。

  1. TOPIC = "myeventhub"
  2. BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
  3. EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"
  4. df = spark.readStream \
  5. .format("kafka") \
  6. .option("subscribe", TOPIC) \
  7. .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
  8. .option("kafka.sasl.mechanism", "PLAIN") \
  9. .option("kafka.security.protocol", "SASL_SSL") \
  10. .option("kafka.sasl.jaas.config", EH_SASL) \
  11. .option("kafka.request.timeout.ms", "60000") \
  12. .option("kafka.session.timeout.ms", "60000") \
  13. .option("failOnDataLoss", "false") \
  14. .option("startingOffsets", "earliest") \
  15. .load()
  16. n = 100
  17. count = 0
  18. def run_command(batchDF, epoch_id):
  19. global count
  20. count += 1
  21. if count % n == 0:
  22. spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")
  23. ...省略了将数据在value列中转换为强类型数据的代码...
  24. myTypedDF.writeStream \
  25. .foreachBatch(run_command) \
  26. .format("delta") \
  27. .outputMode("append") \
  28. .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
  29. .partitionBy("somecolumn") \
  30. .toTable("myunitycatalog.bronze.mytable")
英文:

This code is now writing data with quite low latency. Newest datapoint is around 10 seconds old when I do a select in a sql warehouse. The problem still is that foreachBatch is not run, but otherwise it's working.

  1. TOPIC = "myeventhub"
  2. BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
  3. EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"
  4. df = spark.readStream \
  5. .format("kafka") \
  6. .option("subscribe", TOPIC) \
  7. .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
  8. .option("kafka.sasl.mechanism", "PLAIN") \
  9. .option("kafka.security.protocol", "SASL_SSL") \
  10. .option("kafka.sasl.jaas.config", EH_SASL) \
  11. .option("kafka.request.timeout.ms", "60000") \
  12. .option("kafka.session.timeout.ms", "60000") \
  13. .option("failOnDataLoss", "false") \
  14. .option("startingOffsets", "earliest") \
  15. .load()
  16. n = 100
  17. count = 0
  18. def run_command(batchDF, epoch_id):
  19. global count
  20. count += 1
  21. if count % n == 0:
  22. spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")
  23. ...Omitted code where I transform the data in the value column to strongly typed data...
  24. myTypedDF.writeStream \
  25. .foreachBatch(run_command) \
  26. .format("delta") \
  27. .outputMode("append") \
  28. .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
  29. .partitionBy("somecolumn") \
  30. .toTable("myunitycatalog.bronze.mytable")

huangapple
  • 本文由 发表于 2023年2月16日 15:41:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/75469152.html
  • apache-kafka
  • apache-spark
  • azure-databricks
  • azure-eventhub
  • spark-structured-streaming
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定