英文:
Pyspark - Kafka integration works for batches but not for readStream
问题
I'll provide the translation of the code and logs you've shared without additional information:
希望一切都好 :)
在过去的几天里,我一直在琢磨着这个问题...
基本上,我正在尝试从一个Kafka生产者中使用PySpark读取流(这方面没有什么新东西)。然而,有一些奇怪的问题,因为`readStream`方法似乎不愿意合作,但我可以很好地`read`主题(这意味着没有连接或缺少依赖项的问题)。
这是我的流设置代码:
```python
from pyspark.sql import SparkSession
import logging
logging.basicConfig(level=logging.INFO)
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
.appName("KafkaStreams") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
# 读取流
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9094") \
.option("subscribe", "TEST_TOPIC") \
.option("startingOffsets", "earliest") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "60000") \
.load()
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
ds.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
这是从日志中设置的读取部分:
(注意:我正在重用相同的Spark初始化)
df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9094") \
.option("subscribe", "TEST_TOPIC") \
.option("startingOffsets", "earliest") \
.load()
df.select('timestamp').filter(to_timestamp(df.timestamp, 'HH:mm:ss') > '09:30:00').show()
正如我所说,使用read
,我能够获得预期的输出,但当我启动读写流时,代码只是以代码0退出。
这是我得到的日志堆栈:
23/06/18 15:04:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
设置默认日志级别为"WARN"。
要调整日志级别,请使用sc.setLogLevel(newLevel)。对于SparkR,请使用setLogLevel(newLevel)。
23/06/18 15:04:26 INFO SharedState: 将hive.metastore.warehouse.dir('null')设置为spark.sql.warehouse.dir的值。
23/06/18 15:04:26 INFO SharedState: 仓库路径是'file:/home/ettore/Documents/Portfolio/Spark%20Streams/spark-warehouse'。
23/06/18 15:04:27 INFO StateStoreCoordinatorRef: 注册StateStoreCoordinator端点
23/06/18 15:04:27 WARN ResolveWriteToStream: 临时检查点位置被创建,通常在查询未失败时删除:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe。如果在任何情况下都需要删除它,请设置spark.sql.streaming.forceDeleteTempCheckpointLocation为true。重要的是要知道删除临时检查点文件夹是最佳努力。
23/06/18 15:04:27 INFO ResolveWriteToStream: 检查点根目录/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe解析为文件:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe。
23/06/18 15:04:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled不支持流数据帧/数据集,并将被禁用。
23/06/18 15:04:27 INFO CheckpointFileManager: 原子地写入文件:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/metadata,使用临时文件file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/.metadata.16db89ea-8365-4111-aa6d-43debb423f95.tmp
23/06/18 15:04:27 INFO CheckpointFileManager: 重命名临时文件file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/.metadata.16db89ea-8365-4111-aa6d-43debb423f95.tmp到file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/metadata
23/06/18 15:04:28 INFO MicroBatchExecution: 开始[id = daa91f2b-8899-499d-ab1f-ff196563153b, runId = 999e4e36-7c95-4678-ac51-fe4b00a77e78]。使用file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe存储查询检查点。
23/06/18 15:04:28 INFO MicroBatchExecution: 从DataSourceV2中的表[org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@10ba27c7]读取,名为'kafka'[org.apache.spark.sql.kafka010.KafkaSourceProvider@7080ff7d]
进程以代码0结束
希望这能帮助你。如果你有任何问题,请随时提问。
英文:
hope to find you well
I've been scratching my head the last couple of days to try and make sense of this...
Basically, I am trying to read a stream from a kafka producer in pyspark (nothing new here). However there is something funky going on, because the readStream
method simply doesn't want to cooperate, but I can read
the topic just fine (which means there is no connectivity or missing dependencies issue).
Here is my code for the streaming setup:
from pyspark.sql import SparkSession
import logging
logging.basicConfig(level=logging.INFO)
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
.appName("KafkaStreams") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
# Read stream
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9094") \
.option("subscribe", "TEST_TOPIC") \
.option("startingOffsets", "earliest") \
.option("kafka.sasl.mechanism", "PLAIN")\
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "60000") \
.load()
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
ds.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
And here is the set up reading from the logs:
(note: I am reusing the same spark initialization)
df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9094") \
.option("subscribe", "TEST_TOPIC") \
.option("startingOffsets", "earliest") \
.load()
df.select('timestamp').filter(to_timestamp(df.timestamp, 'HH:mm:ss') > '09:30:00').show()
As I said, using read
I am able to get the expected output, but when I launch the read-writeStream the code simply exists with code 0.
Here is a stack of the logs i get:
23/06/18 15:04:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/18 15:04:26 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/06/18 15:04:26 INFO SharedState: Warehouse path is 'file:/home/ettore/Documents/Portfolio/Spark%20Streams/spark-warehouse'.
23/06/18 15:04:27 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
23/06/18 15:04:27 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/06/18 15:04:27 INFO ResolveWriteToStream: Checkpoint root /tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe resolved to file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe.
23/06/18 15:04:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/06/18 15:04:27 INFO CheckpointFileManager: Writing atomically to file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/metadata using temp file file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/.metadata.16db89ea-8365-4111-aa6d-43debb423f95.tmp
23/06/18 15:04:27 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/.metadata.16db89ea-8365-4111-aa6d-43debb423f95.tmp to file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe/metadata
23/06/18 15:04:28 INFO MicroBatchExecution: Starting [id = daa91f2b-8899-499d-ab1f-ff196563153b, runId = 999e4e36-7c95-4678-ac51-fe4b00a77e78]. Use file:/tmp/temporary-f93a4d9e-203e-4612-8bac-2a7d839623fe to store the query checkpoint.
23/06/18 15:04:28 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@10ba27c7] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@7080ff7d]
Process finished with exit code 0
As you can see, I've tried to set up a delay to process the data, but to no use unfortunately. Plus, I don't think the warning Unable to load native-hadoop library
is really what is causing is again given that in batches it works.
If you have any idea of what might be going on or how to trouble shoot it it would be much appreciated.
Thank you in advance and have a nice day
答案1
得分: 1
你需要等待流程运行 - 当你启动它时,它在后台运行。通常需要使用.start()
返回的结果上的.awaitTermination()
函数:
query = ds.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
# 等待直到流程完成
query.awaitTermination()
Spark文档描述了它在第一个示例部分。
英文:
You need to wait for stream to run - when you start it, it runs in the background. Usually you need to use the .awaitTermination()
function on result returned by .start()
:
query = ds.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
# wait until stream finishes
query.awaitTermination()
Spark documentation describes it's in the first example section.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论