Spark 读取 .7z 文件

huangapple go评论92阅读模式
英文:

Spark Reading .7z files

问题

我正在尝试使用Scala或Java读取Spark的.7z文件。我找不到任何合适的方法或功能。

对于zip文件,我可以读取,因为ZipInputStream类接受一个输入流,但是对于7Z文件,SevenZFile类不接受任何输入流。
链接:https://commons.apache.org/proper/commons-compress/javadocs/api-1.16/org/apache/commons/compress/archivers/sevenz/SevenZFile.html

以下是zip文件的代码:

spark.sparkContext.binaryFiles("fileName").flatMap { case (name: String, content: PortableDataStream) =>
    val zis = new ZipInputStream(content.open)
    Stream.continually(zis.getNextEntry)
        .takeWhile(_ != null)
        .flatMap { _ =>
            val br = new BufferedReader(new InputStreamReader(zis))
            Stream.continually(br.readLine()).takeWhile(_ != null)
        }
}

我正在尝试类似的方法来处理7z文件,类似于:

spark.sparkContext.binaryFiles("filename").flatMap { case (name: String, content: PortableDataStream) =>
    val zis = new SevenZFile(content.open)
    Stream.continually(zis.getNextEntry)
        .takeWhile(_ != null)
        .flatMap { _ =>
            val br = new BufferedReader(new InputStreamReader(zis))
            Stream.continually(br.readLine()).takeWhile(_ != null)
        }
}

但是SevenZFile不接受这些格式。正在寻找解决方法。

如果文件在本地文件系统中,以下解决方案有效,但我的文件在HDFS中:

以下是本地文件系统的代码:

public static void decompress(String in, File destination) throws IOException {
    SevenZFile sevenZFile = new SevenZFile(new File(in));
    SevenZArchiveEntry entry;
    while ((entry = sevenZFile.getNextEntry()) != null) {
        if (entry.isDirectory()) {
            continue;
        }
        File curfile = new File(destination, entry.getName());
        File parent = curfile.getParentFile();
        if (!parent.exists()) {
            parent.mkdirs();
        }
        FileOutputStream out = new FileOutputStream(curfile);
        byte[] content = new byte[(int) entry.getSize()];
        sevenZFile.read(content, 0, content.length);
        out.write(content);
        out.close();
    }
}

经过这些年的Spark发展,应该有更简便的方法来实现。

英文:

I am trying to read the spark .7z files using scala or java. I dont find any appropriate methods or functionality.

For the zip file, i am able to read as the ZipInputStream class takes a Input stream, but for the 7Z files the class SevenZFile doesnt take any input stream.
https://commons.apache.org/proper/commons-compress/javadocs/api-1.16/org/apache/commons/compress/archivers/sevenz/SevenZFile.html

Zip file code

spark.sparkContext.binaryFiles("fileName").flatMap{case (name: String, content: PortableDataStream) =>
        val zis = new ZipInputStream(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}

I am trying similar code for the 7z files something like

spark.sparkContext.binaryFiles(""filename"").flatMap{case (name: String, content: PortableDataStream) =>
        val zis = new SevenZFile(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}

But SevenZFile doesnt accept these formats.Looking for ideas.

If the file is in local filessytem following solution works, but my file is in hdfs

Local fileSystem Code

 public static void decompress(String in, File destination) throws IOException {
        SevenZFile sevenZFile = new SevenZFile(new File(in));
        SevenZArchiveEntry entry;
        while ((entry = sevenZFile.getNextEntry()) != null){
            if (entry.isDirectory()){
                continue;
            }
            File curfile = new File(destination, entry.getName());
            File parent = curfile.getParentFile();
            if (!parent.exists()) {
                parent.mkdirs();
            }
            FileOutputStream out = new FileOutputStream(curfile);
            byte[] content = new byte[(int) entry.getSize()];
            sevenZFile.read(content, 0, content.length);
            out.write(content);
            out.close();
        }
    }

After all these years of spark evolution there should be easy way to do it.

答案1

得分: 4

不使用基于java.io.File的方法,你可以尝试使用SeekableByteChannel方法,如在这个替代构造函数中所示。

你可以使用SeekableInMemoryByteChannel来读取字节数组。只要你可以从S3或其他位置获取7zip文件并将它们作为字节数组处理,就应该没问题。

话虽如此,Spark 真的不适用于处理类似 zip 和 7zip 文件的情况。我可以从个人经验告诉你,一旦文件太大,超出了 Spark 执行器的处理能力,它的效果会很差。

像 Apache NiFi 这样的工具在扩展归档文件并处理它们方面会更加出色。顺便提一句,我目前正在处理一个大型数据转储,其中经常需要处理几个含有数百万个文件的 50GB 压缩包,而 NiFi 处理起来非常顺畅。

英文:

Instead of using the java.io.File-based approach, you could try the SeekableByteChannel method as shown in this alternative constructor.

You can use a SeekableInMemoryByteChannel to read a byte array. So as long as you can pick up the 7zip files from S3 or whatever and hand them off as byte arrays you should be alright.

With all of that said, Spark is really not well-suited for processing things like zip and 7zip files. I can tell you from personal experience I've seen it fail badly once the files are too large for Spark's executors to handle.

Something like Apache NiFi will work much better for expanding archives and processing them. FWIW, I'm currently handling a large data dump that has me frequently dealing with 50GB tarballs that have several million files in them, and NiFi handles them very gracefully.

huangapple
  • 本文由 发表于 2020年10月23日 03:36:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/64489360.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定