Spark Java PCA: Java Heap Space and Missing output location for shuffle

huangapple go评论85阅读模式
英文:

Spark Java PCA: Java Heap Space and Missing output location for shuffle

问题

我试图对一个包含4,827行和40,107列的数据框进行主成分分析(PCA),但是出现了Java堆空间错误和shuffle操作的丢失输出位置错误(根据执行者的stderr文件)。错误发生在PCA的**"treeAggregate at RowMatrix.scala:122"**阶段。

集群信息

这是一个独立的集群,有16个工作节点,每个工作节点有1个执行者,每个执行者有4个核心和21,504MB内存。主节点有15GB内存,我通过"Java -jar -Xmx15g myapp.jar"命令分配。此外,"spark.sql.shuffle.partitions"设置为192,"spark.driver.maxResultSize"设置为6GB。

简化后的代码

df1.persist  // 在Spark UI的Storage标签中显示为3GB
df2 = df1.groupby(col1).pivot(col2).mean(col3)  // 这是一个包含4,827列和40,107行的数据框
df2.collectFirstColumnAsList
df3 = df1.groupby(col2).pivot(col1).mean(col3)  // 这是一个包含40,107列和4,827行的数据框

-----在这里挂起大约1.5小时为即将创建的数据框创建元数据-----

df4 = (..在df3上执行Imputer或na.fill..)
df5 = (..在df4上执行VectorAssembler..)
(..在df5上执行PCA出现"Missing output location for shuffle"错误..)
df1.unpersist

我已经看到并尝试了许多解决方案,但没有任何结果。其中包括:

  1. 将df5或df4重新分区为16、64、192、256、1000、4000个分区(尽管数据看起来并不倾斜)。
  2. 将"spark.sql.shuffle.partitions"更改为16、64、192、256、1000、4000个分区。
  3. 每个执行者使用1或2个核心,以便为每个任务提供更多内存。
  4. 使用2个具有2个核心或4个核心的执行者。
  5. 将"spark.memory.fraction"更改为0.8,将"spark.memory.storageFraction"更改为0.4。

始终出现相同的错误!怎么可能消耗掉这么多内存呢?是否可能实际上数据框不适合内存?如果您需要任何其他信息或截图,请告诉我。

编辑 1

我将集群更改为2个拥有1个执行者的Spark工作节点,每个执行者的"spark.sql.shuffle.partitions"设置为48。每个执行者有115GB内存和8个核心。下面是加载文件(2.2GB)、将每行转换为密集向量并进行PCA的代码。

文件中的每一行都具有以下格式(共有4,568行,每行有40,107个双精度值):

"[x1,x2,x3,...]"

代码如下:

Dataset<Row> df1 = sp.read().format("com.databricks.spark.csv").option("header", "true").load("/home/ubuntu/yolo.csv");
StructType schema2 = new StructType(new StructField[] {
    new StructField("intensity", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> df = df1.map((Row originalrow) -> {
    String yoho = originalrow.get(0).toString();
    int sizeyoho = yoho.length();
    String yohi = yoho.substring(1, sizeyoho - 1);
    String[] yi = yohi.split(",");
    int s = yi.length;
    double[] tmplist = new double[s];
    for (int i = 0; i < s; i++) {
        tmplist[i] = Double.parseDouble(yi[i]);
    }
    Row newrow = RowFactory.create(Vectors.dense(tmplist));
    return newrow;
}, RowEncoder.apply(schema2));
PCAModel pcaexp = new PCA()
    .setInputCol("intensity")
    .setOutputCol("pcaFeatures")
    .setK(2)
    .fit(df);

其中一个执行者的stderr中出现的确切错误为:

ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 43)
java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
...

这是SparkUI的Stages标签截图:

Stage Tab截图

这是失败的Stage(TreeAggregate at RowMatrix.scala:122):

Stage截图

编辑 2

控制台截图

Spark Stages截图

编辑 3

我读取了整个文件,但每行只取10个值创建密集向量。仍然出现相同的错误!我的主节点有235GB内存,有3个工作节点(每个工作节点有1个执行者,4个核心),每个执行者有64GB内存。为什么会发生这种情况?(别忘了文件的总大小只有2.3GB)

Dataset<Row> df1 = sp.read().format("com.databricks.spark.csv").option("header", "true").load("/home/ubuntu/yolo.csv");
StructType schema2 = new StructType(new StructField[] {
    new StructField("intensity", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> df = df1.map((Row originalrow) -> {
    String yoho = originalrow.get(0).toString();
    int sizeyoho = yoho.length();
    String yohi = yoho.substring(1, sizeyoho - 1);
    String[] yi = yohi.split(",");
    int s = yi.length;
    double[] tmplist = new double[s];
    for (int i = 0; i < 10; i++) {
        tmplist[i] = Double.parseDouble(yi[i]);
    }
    Row newrow = RowFactory.create(Vectors.dense(tmplist));
    return newrow;
}, RowEncoder.apply(schema2));

PCAModel pcaexp = new PCA()
    .

<details>
<summary>英文:</summary>

I try to do a PCA on a dataframe with **4.827 rows and 40.107 columns** but I take a Java heap space error and missing output location for shuffle (according to the sdterr file on executors). The error takes place during the **&quot;treeAggregate at RowMatrix.scala:122&quot;** stage of PCA. 

**The cluster**

It is a standalone cluster with 16 worker nodes, each one having 1 executor with 4 cores and 21.504mb memory. The master node has 15g memory which I give with &quot;Java -jar -Xmx15g myapp.jar&quot;. Also &quot;spark.sql.shuffle.partitions&quot; are 192 and &quot;spark.driver.maxResultSize&quot; is 6g.

**Simplified code**

    df1.persist (From the Storage Tab in spark UI it says it is 3Gb)
    df2=df1.groupby(col1).pivot(col2).mean(col3) (This is a df with 4.827 columns and 40.107 rows)
    df2.collectFirstColumnAsList
    df3=df1.groupby(col2).pivot(col1).mean(col3) (This is a df with 40.107 columns and 4.827 rows)
    
    -----it hangs here for around 1.5 hours creating metadata for upcoming dataframe-----

    df4 = (..Imputer or na.fill on df3..)
    df5 = (..VectorAssembler on df4..)
    (..PCA on df5 with error Missing output location for shuffle..)
    df1.unpersist

I have seen and tried many solutions but without any result. Among them:


 1. Re-partitioning the df5 or df4 to 16, 64, 192, 256, 1000, 4000 (although data do not look skewed)
 2. Changing the spark.sql.shuffle.partitions to 16, 64, 192, 256, 1000, 4000
 3. Using 1 and 2 cores per executor so to have more memory for every task. 
 4. Having 2 executors with 2 cores or 4 cores.
 5. Changing &quot;spark.memory.fraction&quot; to 0.8 and &quot;spark.memory.storageFraction&quot; to 0.4. 

Always the same error! How is it possible to blow away all this memory?? Is it possible the df actually not fitting in memory? Please let me know if you need any other information or printscreens.

**EDIT 1**

I changed the cluster to 2 spark workers with 1 executor each with spark.sql.shuffle.partitions=48. Each executor has 115g and 8 cores. Below is the code where I load the file(2.2Gb), convert each line into a dense vector and feed the PCA.

Each row in the file has this format(4.568 rows with 40.107 double values each):

     &quot;[x1,x2,x3,...]&quot;
and the code:

    Dataset&lt;Row&gt; df1 = sp.read().format(&quot;com.databricks.spark.csv&quot;).option(&quot;header&quot;, &quot;true&quot;).load(&quot;/home/ubuntu/yolo.csv&quot;);
    StructType schema2 = new StructType(new StructField[] {
                            new StructField(&quot;intensity&quot;,new VectorUDT(),false,Metadata.empty())
                });
    Dataset&lt;Row&gt; df = df1.map((Row originalrow) -&gt; {
                        String yoho =originalrow.get(0).toString();
                        int sizeyoho=yoho.length();
                        String yohi = yoho.substring(1, sizeyoho-1);
                        String[] yi = yohi.split(&quot;,&quot;);
                        int s = yi.length;
                        double[] tmplist= new double[s];
                        for(int i=0;i&lt;s;i++){
                            tmplist[i]=Double.parseDouble(yi[i]);
                        }
                        
                        Row newrow = RowFactory.create(Vectors.dense(tmplist));
                        return newrow;
                }, RowEncoder.apply(schema2));
    PCAModel pcaexp = new PCA()
                        .setInputCol(&quot;intensity&quot;)
                        .setOutputCol(&quot;pcaFeatures&quot;)
                        .setK(2)
                        .fit(df);

 The exact error I get on the stderr of one of the 2 workers is:

    ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 43)
    java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:456)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)


And this is the Stages Tab of SparkUI:

[![stagestab][1]][1]

And this is the Stage that fails(TreeAggregate at RowMatrix.scala:122):

[![treeaggregate][2]][2]

**EDIT 2**

[![console][3]][3]


[![sparkstages][4]][4]


**EDIT 3**

I read the whole file but taking only 10 values from each row and creating the dense vector. I still get the same error! I have a master with 235g Ram and 3 workers(1 executor each with 4 cores) and 64g Ram per executor. How could this be happening? (Not forget the total size of the file is only 2.3Gb!)

    Dataset&lt;Row&gt; df1 = sp.read().format(&quot;com.databricks.spark.csv&quot;).option(&quot;header&quot;, &quot;true&quot;).load(&quot;/home/ubuntu/yolo.csv&quot;);

    StructType schema2 = new StructType(new StructField[] {
                            new StructField(&quot;intensity&quot;,new VectorUDT(),false,Metadata.empty())
                });
    Dataset&lt;Row&gt; df = df1.map((Row originalrow) -&gt; {
                        String yoho =originalrow.get(0).toString();
                        int sizeyoho=yoho.length();
                        String yohi = yoho.substring(1, sizeyoho-1);
                        String[] yi = yohi.split(&quot;,&quot;);//this string array has all 40.107 values
                        int s = yi.length;
                        double[] tmplist= new double[s];
                        for(int i=0;i&lt;10;i++){//I narrow it down to take only the first 10 values of each row
                            tmplist[i]=Double.parseDouble(yi[i]);
                        }
                        Row newrow = RowFactory.create(Vectors.dense(tmplist));
                        return newrow;
                }, RowEncoder.apply(schema2));
          
    PCAModel pcaexp = new PCA()
                        .setInputCol(&quot;intensity&quot;)
                        .setOutputCol(&quot;pcaFeatures&quot;)
                        .setK(2)
                        .fit(df);

  [1]: https://i.stack.imgur.com/Rj0So.png
  [2]: https://i.stack.imgur.com/qtXaR.png
  [3]: https://i.stack.imgur.com/hbRrT.png
  [4]: https://i.stack.imgur.com/2RZcI.png

</details>


# 答案1
**得分**: 1

以下是您要的翻译

**丢失洗牌输出位置** 出现在您的 Spark 应用程序进行大规模洗牌阶段时它尝试在执行器之间重新分配大量数据并且您的集群网络中存在一些问题

Spark 表示某个阶段没有内存您正在执行需要不同阶段的转换操作它们也会消耗内存此外您首先持久化了数据框您应该检查存储级别因为可能是将数据存储在内存中

您正在链接多个 Spark 宽转换例如首次执行数据透视阶段时Spark 会创建一个阶段并执行洗牌以根据您的列进行分组也许您的数据存在不均衡某些执行器消耗的内存比其他执行器多错误可能会在其中一个执行器中发生

除了数据框转换外PCA 估算器将数据框转换为 RDD从而增加了计算协方差矩阵所需的内存并且它使用 Breeze 矩阵的稠密表示形式其元素是 **不分布**例如SVD 是使用 Breeze 完成的这会对其中一个执行器造成很大压力

也许您可以将生成的数据框保存在 HDFS或其他位置),然后在另一个 Spark 应用程序中执行 PCA

主要问题在于在执行 SVD 之前算法需要计算 Grammian 矩阵并使用 RDD 的 treeAggregate 方法这会创建一个非常大的 Double 矩阵该矩阵将发送到驱动程序但由于驱动程序的内存不足因此会出现错误您需要大幅增加驱动程序的内存您还遇到网络错误如果一个执行器失去连接作业就会崩溃而不会尝试重新执行

个人建议尝试在驱动程序中直接使用 Breeze或 Smile进行 PCA即收集 RDD 字段因为数据集要比协方差矩阵小得多可以使用浮点数表示手动执行

用于仅使用 Breeze 计算 PCA 的代码不依赖 Spark 或 TreeAggregate

```scala
import breeze.linalg._
import breeze.linalg.svd._

object PCACode {

  def mean(v: Vector[Double]): Double = v.valuesIterator.sum / v.size

  def zeroMean(m: DenseMatrix[Double]): DenseMatrix[Double] = {
    val copy = m.copy
    for (c <- 0 until m.cols) {
      val col = copy(::, c)
      val colMean = mean(col)
      col -= colMean
    }
    copy
  }

  def pca(data: DenseMatrix[Double], components: Int): DenseMatrix[Double] = {
    val d = zeroMean(data)
    val SVD(_, _, v) = svd(d.t)
    val model = v(0 until components, ::)
    val filter = model.t * model
    filter * d
  }
  
  def main(args: Array[String]): Unit = {
    val df: DataFrame = ???

    /** 收集数据并进行处理。将字符串转换为双精度浮点数,等等 **/
    val data: Array[mutable.WrappedArray[Double]] =
      df.rdd.map(row => (row.getAs[mutable.WrappedArray[Double]](0))).collect()

    /** 一旦有了数组,创建矩阵并执行 PCA **/
    val matrix = DenseMatrix(data.toSeq: _*)
    val pcaRes = pca(matrix, 2)

    println("PCA 结果\n" + pcaRes)
  }
}

此代码将在驱动程序中执行 PCA,请检查内存。如果出现崩溃,可能需要使用浮点精度。

英文:

The "Missing output location for shuffle" occurs when your Spark application do big shuffle stages, it tries to reallocate huge amount of data among executors and there are some problems in your cluster network.

Spark says that you don´t have memory in some stage. You are doing transformations that requires different stages and they consume memory too. Besides, you persist the dataframe first, and you should check the storage level, because it is posible that you are persisting in memory.

You are chaining several Spark wide transformations: doing the first pivot stage ,for example, Spark creates a stage and performs a shuffle to group for your column and maybe you have data skew and there are executors that consume much more memory than others, and maybe the error can happen in one of them.

Besides the Dataframe transformations, the PCA estimator converts the dataframe to a RDD increasing much more the memory to calculate the covarianze matrix, and it works with dense representations of Breeze matrices of NxN elements which are not distributed. For example, the SVD is made with Breeze. That put a lot of pressure in one of the executors.

Maybe you can save the resulting dataframe in HDFS(or whatever) and do the PCA another Spark application.

The main problem. that you have is that before de SVD the algorithm needs to compute the Grammian Matrix and it uses a treeAggregate from RDD. This creates a very big Double matrix that will be sent to the driver, and there is the error because your driver hasn´t memory enough. You need to increase dramatically the driver memory. You have networks errors, if one executor losses the connection the job crashes it doesn´t try to re-execute.

Personally, I would try to do the PCA directly in Breeze(or Smile) in the driver, I mean, collect the RDD field because the dataset is quite smaller than the covarianze matrix and do manually with a Float representation.

Code to compute the PCA only with Breeze, neither Spark nor TreeAgregation:

import breeze.linalg._
import breeze.linalg.svd._
object PCACode {
def mean(v: Vector[Double]): Double = v.valuesIterator.sum / v.size
def zeroMean(m: DenseMatrix[Double]): DenseMatrix[Double] = {
val copy = m.copy
for (c &lt;- 0 until m.cols) {
val col = copy(::, c)
val colMean = mean(col)
col -= colMean
}
copy
}
def pca(data: DenseMatrix[Double], components: Int): DenseMatrix[Double] = {
val d = zeroMean(data)
val SVD(_, _, v) = svd(d.t)
val model = v(0 until components, ::)
val filter = model.t * model
filter * d
}
def main(args: Array[String]) : Unit = {
val df : DataFrame = ???
/** Collect the data and do the processing. Convert string to double, etc **/
val data: Array[mutable.WrappedArray[Double]] =
df.rdd.map(row =&gt; (row.getAs[mutable.WrappedArray[Double]](0))).collect()
/** Once you have the Array, create the matrix and do the PCA **/
val matrix = DenseMatrix(data.toSeq:_*)
val pcaRes = pca(matrix, 2)
println(&quot;result pca \n&quot; + pcaRes)
}
}

This code will do the PCA in the driver, check the memory. If it crashes it could be do with a Float precission.

huangapple
  • 本文由 发表于 2020年10月27日 17:07:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/64551151.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定