从Azure事件中心使用Kafka驱动程序读取数据似乎没有收到任何数据。

huangapple go评论59阅读模式
英文:

Reading from Azure Event hub with Kafka driver doesn't seem to get any data

问题

我在Azure Databricks的Python笔记本中运行以下代码:

TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhubns.servicebus.windows.net/;SharedAccessKeyName=MyKeyName;SharedAccessKey=myaccesskey;\";"

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

df_write = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start() \
    .awaitTermination()

笔记本中没有显示任何输出。我应该如何调试问题?

(请注意,我只提供代码的翻译部分,不包括问题或其他内容。)

英文:

I'm running the following code in an Azure Databricks python notebook:

TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhubns.servicebus.windows.net/;SharedAccessKeyName=MyKeyName;SharedAccessKey=myaccesskey;\";"

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

df_write = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start() \
    .awaitTermination()

This shows no output in the notebook. How could I debug what the problem is?

答案1

得分: 1

如果您使用.format("console"),则输出不会出现在笔记本中,而会出现在驱动程序和执行程序的日志中 - 这是Spark和Databricks之间的区别。

如果您想查看数据,只需使用display函数:

display(df)
英文:

If you use .format("console") then output won't be in the notebook, it will be in the driver & executor logs - it's a difference between Spark and Databricks.

If you want to see the data, just use the display function:

display(df)

答案2

得分: 1

这段代码现在以非常低的延迟写入数据。当我在SQL数据仓库中进行选择时,最新的数据点大约是10秒前的。问题仍然是foreachBatch没有运行,但除此之外,它是可以工作的。

TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

n = 100
count = 0

def run_command(batchDF, epoch_id):
    global count
    count += 1
    if count % n == 0:
        spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")

...省略了将数据在value列中转换为强类型数据的代码...

myTypedDF.writeStream \
    .foreachBatch(run_command) \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
    .partitionBy("somecolumn") \
    .toTable("myunitycatalog.bronze.mytable")
英文:

This code is now writing data with quite low latency. Newest datapoint is around 10 seconds old when I do a select in a sql warehouse. The problem still is that foreachBatch is not run, but otherwise it's working.

TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

n = 100
count = 0

def run_command(batchDF, epoch_id):
    global count
    count += 1
    if count % n == 0:
        spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")

...Omitted code where I transform the data in the value column to strongly typed data...

myTypedDF.writeStream \
    .foreachBatch(run_command) \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
    .partitionBy("somecolumn") \
    .toTable("myunitycatalog.bronze.mytable")

huangapple
  • 本文由 发表于 2023年2月16日 15:41:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/75469152.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定