英文:
Reading from Azure Event hub with Kafka driver doesn't seem to get any data
问题
我在Azure Databricks的Python笔记本中运行以下代码:
TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhubns.servicebus.windows.net/;SharedAccessKeyName=MyKeyName;SharedAccessKey=myaccesskey;\";"
df = spark.readStream \
.format("kafka") \
.option("subscribe", TOPIC) \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "60000") \
.option("failOnDataLoss", "false") \
.option("startingOffsets", "earliest") \
.load()
df_write = df.writeStream \
.outputMode("append") \
.format("console") \
.start() \
.awaitTermination()
笔记本中没有显示任何输出。我应该如何调试问题?
(请注意,我只提供代码的翻译部分,不包括问题或其他内容。)
英文:
I'm running the following code in an Azure Databricks python notebook:
TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhubns.servicebus.windows.net/;SharedAccessKeyName=MyKeyName;SharedAccessKey=myaccesskey;\";"
df = spark.readStream \
.format("kafka") \
.option("subscribe", TOPIC) \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "60000") \
.option("failOnDataLoss", "false") \
.option("startingOffsets", "earliest") \
.load()
df_write = df.writeStream \
.outputMode("append") \
.format("console") \
.start() \
.awaitTermination()
This shows no output in the notebook. How could I debug what the problem is?
答案1
得分: 1
如果您使用.format("console")
,则输出不会出现在笔记本中,而会出现在驱动程序和执行程序的日志中 - 这是Spark和Databricks之间的区别。
如果您想查看数据,只需使用display
函数:
display(df)
英文:
If you use .format("console")
then output won't be in the notebook, it will be in the driver & executor logs - it's a difference between Spark and Databricks.
If you want to see the data, just use the display
function:
display(df)
答案2
得分: 1
这段代码现在以非常低的延迟写入数据。当我在SQL数据仓库中进行选择时,最新的数据点大约是10秒前的。问题仍然是foreachBatch
没有运行,但除此之外,它是可以工作的。
TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"
df = spark.readStream \
.format("kafka") \
.option("subscribe", TOPIC) \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "60000") \
.option("failOnDataLoss", "false") \
.option("startingOffsets", "earliest") \
.load()
n = 100
count = 0
def run_command(batchDF, epoch_id):
global count
count += 1
if count % n == 0:
spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")
...(省略了将数据在“value”列中转换为强类型数据的代码)...
myTypedDF.writeStream \
.foreachBatch(run_command) \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
.partitionBy("somecolumn") \
.toTable("myunitycatalog.bronze.mytable")
英文:
This code is now writing data with quite low latency. Newest datapoint is around 10 seconds old when I do a select in a sql warehouse. The problem still is that foreachBatch is not run, but otherwise it's working.
TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"
df = spark.readStream \
.format("kafka") \
.option("subscribe", TOPIC) \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "60000") \
.option("failOnDataLoss", "false") \
.option("startingOffsets", "earliest") \
.load()
n = 100
count = 0
def run_command(batchDF, epoch_id):
global count
count += 1
if count % n == 0:
spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")
...Omitted code where I transform the data in the value column to strongly typed data...
myTypedDF.writeStream \
.foreachBatch(run_command) \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
.partitionBy("somecolumn") \
.toTable("myunitycatalog.bronze.mytable")
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论