Pyspark,在执行连接操作时执行程序失去了连接。

huangapple go评论66阅读模式
英文:

Pyspark, executors lost connectivity when performing a join

问题

我使用的mesos-spark集群:

执行器在我尝试执行连接后的.count()操作时每次都崩溃,而没有连接的计数工作正常,但不确定为什么在失败的查询中我看到:

在执行器日志中:

我没有看到明确的OOM问题,怎么回事?似乎只有在进行连接时才会发生。

遵循@busfighter的建议,在连接之前将数据帧设置为StorageLevel.MEMORY_ONLY,并使用coalesce()减少分区。仍然出现相同的错误。

编辑 1

尝试了所有的评论,无果:

将数据保存到内存中
重新分区为12个分区(之前是200个),需要补充的是,在检查了我的集群上的spark/jobs web UI后,执行器从未被Spark(Mesos)专门移除过
将spark.sql.autoBroadcastJoinThreshold的值更改为比默认值小的20

编辑 2

在任何给定的时点,当任务失败时,执行器都不会被移除,它们只会在洗牌超时时超时:

编辑 3

注意到在崩溃时数据大小确实很小,感到迷茫,无法找到执行器日志以查看是否因为OOM而被杀死:

编辑 4

一些重要的说明:

  • 该作业在仅使用1个从节点时运行良好(时间更长),但不会崩溃,我不认为这是OOM问题。
  • 代码中不涉及连接数据的其他部分(仅涉及读取和转换)也能正常工作。

在PySpark上使用的配置:

conf = (SparkConf()
.setAppName('daily_etl')
.setMaster(XXXXX)
.set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
.set('spark.mesos.executor.home','/opt/spark')
)

spark = SparkSession.builder
.config(conf=conf)
.getOrCreate()

编辑 5

错误的屏幕截图:

编辑 6

Mesos UI的屏幕截图:

编辑 7

设法缩小了问题的范围,由于某种原因BlockManager正在侦听localhost,因此其他执行器无法连接:

不确定原因,但将创建另一个主题。

英文:

My mesos-spark cluster:

Pyspark,在执行连接操作时执行程序失去了连接。

Executors are crashing every time I try to do a .count() after a join, the count without the join works perfectly, not sure why but in failed queries I see:

Pyspark,在执行连接操作时执行程序失去了连接。

And in the executor logs:

Pyspark,在执行连接操作时执行程序失去了连接。

I don't see an specific OOM issue, what's the deal here? It seems to happen only when the join is made.

Followed @busfighter suggestions and set the dataframes to StorageLevel.MEMORY_ONLY before joining and reduced partitions using coalesce().
Still the same error.

Edit 1

Tried all comments, nothing:

  1. Saving to memory the data
  2. Repartitioned to 12 partition (was 200), to be added that after checking the spark/jobs web UI the executors are never specifically removed by Spark(Mesos) on my cluster
  3. Changed value spark.sql.autoBroadcastJoinThreshold to 20 smaller the default value

Edit 2

At no given point, when the task fails the executors are removed, they just timeout on shuffle:

Pyspark,在执行连接操作时执行程序失去了连接。

Edit 3

See that the data size is really small when it crashes, feeling lost and can't find the executor logs to see if it was killed becaues of OOM:

Pyspark,在执行连接操作时执行程序失去了连接。

Edit 4

Some important notes:

  • The job works OK with only 1 slave (takes more time) but it doesnt crash, I don't think its an OOM issue.
  • Other parts of the code, that don't involve joining data (merely reading and transforming work OK)

Config used on PySpark

conf = (SparkConf()
        .setAppName('daily_etl')
        .setMaster(XXXXX)
        .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
        .set('spark.mesos.executor.home','/opt/spark')
        )

spark = SparkSession.builder\
    .config(conf=conf) \
    .getOrCreate()

Edit 5

Screenshot of the error:

Pyspark,在执行连接操作时执行程序失去了连接。

Edit 6

Adding screenshot of the Mesos UI

Pyspark,在执行连接操作时执行程序失去了连接。

Edit 7

Managed to narrow down the problem, for some reason BlockManager is listening to localhost, hence the other executors cannot conect:

Pyspark,在执行连接操作时执行程序失去了连接。

Not sure why but will crate another topic.

答案1

得分: 1

请尝试以下代码:

conf = (SparkConf()
        .setAppName('daily_etl')
        .setMaster(XXXXX)
        .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
        .set("spark.mesos.executor.home","/opt/spark")
        .set("spark.driver.memory", "16G")
        .set("spark.executor.memory","8G")
        .set("spark.sql.autoBroadcastJoinThreshold", "-1")
        )

可能还需要进行重新分区

df = df.repartition(2000)

具体数值取决于集群情况
英文:

Please try this:

conf = (SparkConf()
        .setAppName('daily_etl')
        .setMaster(XXXXX)
        .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
        .set("spark.mesos.executor.home","/opt/spark")
        .set("spark.driver.memory", "16G")
        .set("spark.executor.memory","8G")
        .set("spark.sql.autoBroadcastJoinThreshold", "-1")
        )

Maybe also do a repartition

df = df.repartition(2000)

Value depends on cluster

huangapple
  • 本文由 发表于 2020年10月8日 22:44:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/64265027.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定