Memory issues running spark locally in Intellij (scala)

huangapple go评论54阅读模式
英文:

Memory issues running spark locally in Intellij (scala)

问题

I'm very new to Scala and Spark. I've been trying to accomplish a script that reads several of the same format excel files (separated by year: e.g. 2011.xlsx, 2012.xlsx, etc) into one dataframe. The total amount of data to be read into the dataframe is a peace-meal 350mb. Each file is approximately 30mb and there are roughly 12 files. However, I keep running into java.lang.OutOfMemoryErrors like below:

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RemoteBlock-temp-file-clean-thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Spark Context Cleaner"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Executor task launch worker for task 0.0 in stage 0.0 (TID 0)"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "executor-kill-mark-cleanup"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Executor task launch worker for task 8.0 in stage 0.0 (TID 8)"
java.lang.OutOfMemoryError: Java heap space

I am running this code locally using Intellij IDEA:

import com.crealytics.spark.excel._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.{DataFrame, SparkSession, types}

import java.io File

object sparkJob extends App {

  val session = SparkSession.builder().
    config("spark.driver.bindAddress", "127.0.0.1").
    config("spark.executor.memory", "8g").
    config("spark.driver.memory", "8g").
    config("spark.memory.offHeap.enabled", true).
    config("spark.memory.offHeap.size", "4g").
    master("local[*]").
    appName("etl").
    getOrCreate()

  val dataSchema = types.StructType(Array(
    StructField("Delivery Date", types.StringType, nullable = false),
    StructField("Delivery Hour", types.IntegerType, nullable = false),
    StructField("Delivery Interval", types.IntegerType, nullable = false),
    StructField("Repeated Hour Flag", types.StringType, nullable = false),
    StructField("Settlement Point Name", types.StringType, nullable = false),
    StructField("Settlement Point Type", types.StringType, nullable = false),
    StructField("Settlement Point Price", types.DecimalType(10, 0), nullable = false)
  ))

  val dir = new File("data/")
  val files = dir.listFiles.map(_.getPath).toList

  def read_excel(filePath: String): DataFrame = {
    session.read.excel(header=true). 
      schema(dataSchema).
      load(filePath)
  }

  val df = files.map(f => read_excel(f))
  val mdf = df.reduce(_.union(_))

  mdf.show(5)
}

Things I've tried:

VM Options: -Xmx -Xms, and expanding various memory types inside the code's spark session config. My machine has 32gb of RAM, so that isn't an issue.

英文:

I'm very new to Scala and Spark. I've been trying to accomplish a script that reads several of the same format excel files (separated by year: e.g. 2011.xlsx, 2012.xlsx, etc) into one dataframe. The total amount of data to be read into the dataframe is a peace-meal 350mb. Each file is approximately 30mb and there are roughly 12 files. However, I keep running to java.lang.OutofMemoryErrors like below:

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "RemoteBlock-temp-file-clean-thread"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Spark Context Cleaner"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Executor task launch worker for task 0.0 in stage 0.0 (TID 0)"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "executor-kill-mark-cleanup"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Executor task launch worker for task 8.0 in stage 0.0 (TID 8)"
java.lang.OutOfMemoryError: Java heap space

I am running this code locally using IntellijIDEA:

import com.crealytics.spark.excel._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.{DataFrame, SparkSession, types}

import java.io.File

object sparkJob extends App {

  val session = SparkSession.builder().
    config("spark.driver.bindAddress", "127.0.0.1").
    config("spark.executor.memory", "8g").
    config("spark.driver.memory", "8g").
    config("spark.memory.offHeap.enabled", true).
    config("spark.memory.offHeap.size", "4g").
    master("local[*]").
    appName("etl").
    getOrCreate()

  val dataSchema = types.StructType(Array(
    StructField("Delivery Date", types.StringType, nullable = false),
    StructField("Delivery Hour", types.IntegerType, nullable = false),
    StructField("Delivery Interval", types.IntegerType, nullable = false),
    StructField("Repeated Hour Flag", types.StringType, nullable = false),
    StructField("Settlement Point Name", types.StringType, nullable = false),
    StructField("Settlement Point Type", types.StringType, nullable = false),
    StructField("Settlement Point Price", types.DecimalType(10, 0), nullable = false)
  ))

  val dir = new File("data/")
  val files = dir.listFiles.map(_.getPath).toList


  def read_excel(filePath: String): DataFrame = {
    session.read.excel(header=true). 
      schema(dataSchema).
      load(filePath)
  }

  val df = files.map(f => read_excel(f))
  val mdf = df.reduce(_.union(_))

  mdf.show(5)
}

Things I've tried:

VM Options: -Xmx -Xms, and expanding various memory types inside the code's spark session config. My machine has 32gb of RAM, so that isn't an issue.

答案1

得分: 0

Use parallelize instead of map to read files in parallel. This way Spark will distribute jobs among cluster nodes and use parallel processing to improve performance. For example, you can create an RDD from the list of files and then use map on the RDD:

使用parallelize而不是map来并行读取文件。这样,Spark将在集群节点之间分配任务并使用并行处理以提高性能。例如,您可以从文件列表创建一个RDD,然后在RDD上使用map:

val filesRDD = session.sparkContext.parallelize(files)
val df = filesRDD.map(f => read_excel(f))

Use cache to store the DataFrame. This way, the data will be cached and will not have to be read from disk every time an action is performed on it:

使用缓存来存储DataFrame。这样,数据将被缓存,每次对其执行操作时都不必从磁盘上读取:

val mdf = df.reduce(_.union(_)).cache()

the last attempt you can try to do is to set: spark.executor.memory=12g, but I think it is an extreme solution, it might be interesting to debug the excel decoding library to see if the high memory usage is given by it.

最后尝试的一步是设置:spark.executor.memory=12g,但我认为这是一个极端的解决方案,可能会有兴趣调试Excel解码库,以查看高内存使用是否由它引起。

英文:

Use parallelize instead of map to read files in parallel. This way Spark will distribute jobs among cluster nodes and use parallel processing to improve performance. For example, you can create an RDD from the list of files and then use map on the RDD:

val filesRDD = session.sparkContext.parallelize(files)
val df = filesRDD.map(f => read_excel(f))

Use cache to store the DataFrame. This way, the data will be cached and will not have to be read from disk every time an action is performed on it:

val mdf = df.reduce(_.union(_)).cache()

the last attempt you can try to do is to set: spark.executor.memory=12g, but I think it is an extreme solution, it might be interesting to debug the excel decoding library to see if the high memory usage is given by it .

huangapple
  • 本文由 发表于 2023年4月1日 01:03:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/75901024.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定