英文:
Pyspark, executors lost connectivity when performing a join
问题
我使用的mesos-spark集群:
执行器在我尝试执行连接后的.count()操作时每次都崩溃,而没有连接的计数工作正常,但不确定为什么在失败的查询中我看到:
在执行器日志中:
我没有看到明确的OOM问题,怎么回事?似乎只有在进行连接时才会发生。
遵循@busfighter的建议,在连接之前将数据帧设置为StorageLevel.MEMORY_ONLY,并使用coalesce()减少分区。仍然出现相同的错误。
编辑 1
尝试了所有的评论,无果:
将数据保存到内存中
重新分区为12个分区(之前是200个),需要补充的是,在检查了我的集群上的spark/jobs web UI后,执行器从未被Spark(Mesos)专门移除过
将spark.sql.autoBroadcastJoinThreshold的值更改为比默认值小的20
编辑 2
在任何给定的时点,当任务失败时,执行器都不会被移除,它们只会在洗牌超时时超时:
编辑 3
注意到在崩溃时数据大小确实很小,感到迷茫,无法找到执行器日志以查看是否因为OOM而被杀死:
编辑 4
一些重要的说明:
- 该作业在仅使用1个从节点时运行良好(时间更长),但不会崩溃,我不认为这是OOM问题。
- 代码中不涉及连接数据的其他部分(仅涉及读取和转换)也能正常工作。
在PySpark上使用的配置:
conf = (SparkConf()
.setAppName('daily_etl')
.setMaster(XXXXX)
.set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
.set('spark.mesos.executor.home','/opt/spark')
)
spark = SparkSession.builder
.config(conf=conf)
.getOrCreate()
编辑 5
错误的屏幕截图:
编辑 6
Mesos UI的屏幕截图:
编辑 7
设法缩小了问题的范围,由于某种原因BlockManager正在侦听localhost,因此其他执行器无法连接:
不确定原因,但将创建另一个主题。
英文:
My mesos-spark cluster:
Executors are crashing every time I try to do a .count() after a join, the count without the join works perfectly, not sure why but in failed queries I see:
And in the executor logs:
I don't see an specific OOM issue, what's the deal here? It seems to happen only when the join is made.
Followed @busfighter suggestions and set the dataframes to StorageLevel.MEMORY_ONLY before joining and reduced partitions using coalesce().
Still the same error.
Edit 1
Tried all comments, nothing:
- Saving to memory the data
- Repartitioned to 12 partition (was 200), to be added that after checking the spark/jobs web UI the executors are never specifically removed by Spark(Mesos) on my cluster
- Changed value spark.sql.autoBroadcastJoinThreshold to 20 smaller the default value
Edit 2
At no given point, when the task fails the executors are removed, they just timeout on shuffle:
Edit 3
See that the data size is really small when it crashes, feeling lost and can't find the executor logs to see if it was killed becaues of OOM:
Edit 4
Some important notes:
- The job works OK with only 1 slave (takes more time) but it doesnt crash, I don't think its an OOM issue.
- Other parts of the code, that don't involve joining data (merely reading and transforming work OK)
Config used on PySpark
conf = (SparkConf()
.setAppName('daily_etl')
.setMaster(XXXXX)
.set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
.set('spark.mesos.executor.home','/opt/spark')
)
spark = SparkSession.builder\
.config(conf=conf) \
.getOrCreate()
Edit 5
Screenshot of the error:
Edit 6
Adding screenshot of the Mesos UI
Edit 7
Managed to narrow down the problem, for some reason BlockManager is listening to localhost, hence the other executors cannot conect:
Not sure why but will crate another topic.
答案1
得分: 1
请尝试以下代码:
conf = (SparkConf()
.setAppName('daily_etl')
.setMaster(XXXXX)
.set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
.set("spark.mesos.executor.home","/opt/spark")
.set("spark.driver.memory", "16G")
.set("spark.executor.memory","8G")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
)
可能还需要进行重新分区:
df = df.repartition(2000)
具体数值取决于集群情况
英文:
Please try this:
conf = (SparkConf()
.setAppName('daily_etl')
.setMaster(XXXXX)
.set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
.set("spark.mesos.executor.home","/opt/spark")
.set("spark.driver.memory", "16G")
.set("spark.executor.memory","8G")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
)
Maybe also do a repartition
df = df.repartition(2000)
Value depends on cluster
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论