英文:
Spark Barrier Executor Stage Not Retried on Task Failure
问题
使用障碍执行器运行阶段时,期望任务失败会导致阶段重试。然而,如果任务引发异常,则不会重试该阶段,作业将失败。
运行下面的pyspark代码将导致一个任务失败,从而导致阶段失败而不进行重试。
这个失败在本地通过pyspark shell和在K8s集群上都能看到。
本地执行的堆栈跟踪:
我怀疑文档不完善的配置可能是问题的原因,但是改变最大失败或重试配置没有带来任何好运。任何建议都将不胜感激,谢谢!
英文:
When running a stage using barrier executor, the expectation is that a failure in a task will result in the stage being retried. However, if an exception is thrown from a task, the stage is not retried and the job fails.
Running the pyspark code below will cause a single task to fail, failing the stage without retrying.
def test_func(index: int) -> list:
    if index == 0:
        raise RuntimeError("Thrown from test func")
    return []
start_rdd = sc.parallelize([i for i in range(10)], 10)
result = start_rdd.barrier().mapPartitionsWithIndex(lambda i, c: test_func(i))
result.collect()
This failure is seen running locally via the pyspark shell and on a K8s cluster.
Stack trace from local execution:
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/rdd.py", line 1197, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
  File "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(0, 0) finished unsuccessfully.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "<stdin>", line 1, in <lambda>
  File "<stdin>", line 3, in test_func
RuntimeError: Thrown from test func
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2111)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2857)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
I suspect a poorly documented config to be the culprit, but have not had any luck changing the max failure or retry configs. Any suggestions would be great, thanks!
答案1
得分: 0
经过进一步的调试,我发现问题是由于配置错误和期望不符的组合引起的。
首先,错误的期望是,在任务失败时,阶段将会重试。然而,这只有在故障发生在Shuffle Map阶段时才会发生。如果故障发生在结果阶段,作业将被中止。
接下来,在提交给K8s Spark操作器的SparkApplication
资源中发生了配置错误,具体来说,restartPolicy
被设置为Never
。
阻塞失败作业和告知Spark操作器在失败时不重试应用程序的组合导致了问题。解决方案是配置一个具有适当重试尝试次数的重试策略。
英文:
After further debugging I found the issue was a combination of incorrect configuration and expectations.
First, the incorrect expectation would be that on task failure, the stage would be retried. However, this only happens if failure happens in a shuffle map stage. If the failure happens in a result stage, the job will be aborted.
Next, misconfiguration was occurring in the SparkApplication
resource submitted to the K8s spark operator, specifically, the restartPolicy
was being set to Never
.
The combination of the barrier failing the job and the spark operator being told not to retry applications on failure lead to the issue. The solution was to configure a restart policy with an appropriate number of retry attempts.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论