使用GO语言提取tar文件中的tar文件的最快方法是什么?

huangapple go评论92阅读模式
英文:

fastest way to extract tar files in side tar file using GO

问题

我有一个包含多个tar文件的tar文件。我目前正在使用tar Reader递归地提取这些tar文件,通过手动移动文件来进行操作。这个过程非常繁重和缓慢,特别是当处理包含数千个文件和目录的大型tar文件时。

我没有找到任何能够快速进行递归提取的好的包。此外,我尝试使用命令tar -xf file.tar --same-owner"来提取内部tar文件,但在权限问题上遇到了问题(这只在Mac上发生)。

我的问题是:
有没有办法并行化手动提取过程,以便内部tar文件可以并行提取?

我有一个提取任务的方法,我正在尝试将其并行化:

	var wg sync.WaitGroup
	wg.Add(len(tarFiles))

	for {
		header, err := tarBallReader.Next()
		if err != nil {
			break
		}
		go extractFileAsync(parentFolder, header, tarBallReader, depth, &wg)
	}
	wg.Wait()

在添加了go协程之后,文件变得损坏,并且进程陷入无限循环。

以下是主tar文件的示例内容:

1d2755f3375860aaaf2b5f0474692df2e0d4329569c1e8187595bf4b3bf3f3b9/
1d2755f3375860aaaf2b5f0474692df2e0d4329569c1e8187595bf4b3bf3f3b9/VERSION
1d2755f3375860aaaf2b5f0474692df2e0d4329569c1e8187595bf4b3bf3f3b9/json
1d2755f3375860aaaf2b5f0474692df2e0d4329569c1e8187595bf4b3bf3f3b9/layer.tar
348188998f2a69b4ac0ca96b42990292eef67c0abfa05412e2fb7857645f4280/
348188998f2a69b4ac0ca96b42990292eef67c0abfa05412e2fb7857645f4280/VERSION
348188998f2a69b4ac0ca96b42990292eef67c0abfa05412e2fb7857645f4280/json
348188998f2a69b4ac0ca96b42990292eef67c0abfa05412e2fb7857645f4280/layer.tar
54c027bf04447fdb035ddc13a6ae5493a3f997bdd3577607b0980954522efb9e.json
9dd3c29af50daaf86744a8ade86ecf12f6a5a6ffc27a5a7398628e4a21770ee3/
9dd3c29af50daaf86744a8ade86ecf12f6a5a6ffc27a5a7398628e4a21770ee3/VERSION
9dd3c29af50daaf86744a8ade86ecf12f6a5a6ffc27a5a7398628e4a21770ee3/json
9dd3c29af50daaf86744a8ade86ecf12f6a5a6ffc27a5a7398628e4a21770ee3/layer.tar
b6c49400b643245cdbe17b7a7eb14f0f7def5a93326b99560241715c1e95502e/
b6c49400b643245cdbe17b7a7eb14f0f7def5a93326b99560241715c1e95502e/VERSION
b6c49400b643245cdbe17b7a7eb14f0f7def5a93326b99560241715c1e95502e/json
b6c49400b643245cdbe17b7a7eb14f0f7def5a93326b99560241715c1e95502e/layer.tar
c662ec0dc487910e7b76b2a4d67ab1a9ca63ce1784f636c2637b41d6c7ac5a1e/
c662ec0dc487910e7b76b2a4d67ab1a9ca63ce1784f636c2637b41d6c7ac5a1e/VERSION
c662ec0dc487910e7b76b2a4d67ab1a9ca63ce1784f636c2637b41d6c7ac5a1e/json
c662ec0dc487910e7b76b2a4d67ab1a9ca63ce1784f636c2637b41d6c7ac5a1e/layer.tar
da87454b77f6ac7fab1f465c10a07a1eb4b46df8058d98892794618cac8eacdc/
da87454b77f6ac7fab1f465c10a07a1eb4b46df8058d98892794618cac8eacdc/VERSION
da87454b77f6ac7fab1f465c10a07a1eb4b46df8058d98892794618cac8eacdc/json
da87454b77f6ac7fab1f465c10a07a1eb4b46df8058d98892794618cac8eacdc/layer.tar
ea1c2adfdc777d8746e50ad3e679789893a991606739c9bc7e01f273fa0b6e12/
ea1c2adfdc777d8746e50ad3e679789893a991606739c9bc7e01f273fa0b6e12/VERSION
ea1c2adfdc777d8746e50ad3e679789893a991606739c9bc7e01f273fa0b6e12/json
ea1c2adfdc777d8746e50ad3e679789893a991606739c9bc7e01f273fa0b6e12/layer.tar
f3b6608e814053048d79e519be79f654a2e9364dfdc8fb87b71e2fc57bbff115/
f3b6608e814053048d79e519be79f654a2e9364dfdc8fb87b71e2fc57bbff115/VERSION
f3b6608e814053048d79e519be79f654a2e9364dfdc8fb87b71e2fc57bbff115/json
f3b6608e814053048d79e519be79f654a2e9364dfdc8fb87b71e2fc57bbff115/layer.tar
manifest.json
repositories

或者简单地运行docker save <image>:<tag> -o image.tar并检查tar文件的内容。

英文:

I have a tar file that contains multiple tar files in it. I'm currently extracting these tars recursively using the tar Reader by moving manually over the files. This process is very heavy and slow, especially when dealing with large tar files that contain thousands of files and directories.

I didn't find any good package that is able to do this recursive extraction fast. plus I tried using the command tar -xf file.tar --same-owner&quot; for the inner tars, but had a problem with permissions issue (which happens only on mac).

my question is:
Is there a way to parallelize the manual extraction process so that the inner tars will be extracted in parallel?

I have a method for the extraction task which I'm trying to make parallel:

	var wg sync.WaitGroup
	wg.Add(len(tarFiles))

	for {
		header, err := tarBallReader.Next()
		if err != nil {
			break
		}
		go extractFileAsync(parentFolder, header, tarBallReader, depth, &amp;wg)
	}
	wg.Wait()

after adding the go routines, the files are getting corrupted and the process is stuck on an endless loop.

example of the main tar content:

1d2755f3375860aaaf2b5f0474692df2e0d4329569c1e8187595bf4b3bf3f3b9/
1d2755f3375860aaaf2b5f0474692df2e0d4329569c1e8187595bf4b3bf3f3b9/VERSION
1d2755f3375860aaaf2b5f0474692df2e0d4329569c1e8187595bf4b3bf3f3b9/json
1d2755f3375860aaaf2b5f0474692df2e0d4329569c1e8187595bf4b3bf3f3b9/layer.tar
348188998f2a69b4ac0ca96b42990292eef67c0abfa05412e2fb7857645f4280/
348188998f2a69b4ac0ca96b42990292eef67c0abfa05412e2fb7857645f4280/VERSION
348188998f2a69b4ac0ca96b42990292eef67c0abfa05412e2fb7857645f4280/json
348188998f2a69b4ac0ca96b42990292eef67c0abfa05412e2fb7857645f4280/layer.tar
54c027bf04447fdb035ddc13a6ae5493a3f997bdd3577607b0980954522efb9e.json
9dd3c29af50daaf86744a8ade86ecf12f6a5a6ffc27a5a7398628e4a21770ee3/
9dd3c29af50daaf86744a8ade86ecf12f6a5a6ffc27a5a7398628e4a21770ee3/VERSION
9dd3c29af50daaf86744a8ade86ecf12f6a5a6ffc27a5a7398628e4a21770ee3/json
9dd3c29af50daaf86744a8ade86ecf12f6a5a6ffc27a5a7398628e4a21770ee3/layer.tar
b6c49400b643245cdbe17b7a7eb14f0f7def5a93326b99560241715c1e95502e/
b6c49400b643245cdbe17b7a7eb14f0f7def5a93326b99560241715c1e95502e/VERSION
b6c49400b643245cdbe17b7a7eb14f0f7def5a93326b99560241715c1e95502e/json
b6c49400b643245cdbe17b7a7eb14f0f7def5a93326b99560241715c1e95502e/layer.tar
c662ec0dc487910e7b76b2a4d67ab1a9ca63ce1784f636c2637b41d6c7ac5a1e/
c662ec0dc487910e7b76b2a4d67ab1a9ca63ce1784f636c2637b41d6c7ac5a1e/VERSION
c662ec0dc487910e7b76b2a4d67ab1a9ca63ce1784f636c2637b41d6c7ac5a1e/json
c662ec0dc487910e7b76b2a4d67ab1a9ca63ce1784f636c2637b41d6c7ac5a1e/layer.tar
da87454b77f6ac7fab1f465c10a07a1eb4b46df8058d98892794618cac8eacdc/
da87454b77f6ac7fab1f465c10a07a1eb4b46df8058d98892794618cac8eacdc/VERSION
da87454b77f6ac7fab1f465c10a07a1eb4b46df8058d98892794618cac8eacdc/json
da87454b77f6ac7fab1f465c10a07a1eb4b46df8058d98892794618cac8eacdc/layer.tar
ea1c2adfdc777d8746e50ad3e679789893a991606739c9bc7e01f273fa0b6e12/
ea1c2adfdc777d8746e50ad3e679789893a991606739c9bc7e01f273fa0b6e12/VERSION
ea1c2adfdc777d8746e50ad3e679789893a991606739c9bc7e01f273fa0b6e12/json
ea1c2adfdc777d8746e50ad3e679789893a991606739c9bc7e01f273fa0b6e12/layer.tar
f3b6608e814053048d79e519be79f654a2e9364dfdc8fb87b71e2fc57bbff115/
f3b6608e814053048d79e519be79f654a2e9364dfdc8fb87b71e2fc57bbff115/VERSION
f3b6608e814053048d79e519be79f654a2e9364dfdc8fb87b71e2fc57bbff115/json
f3b6608e814053048d79e519be79f654a2e9364dfdc8fb87b71e2fc57bbff115/layer.tar
manifest.json
repositories

or simply you can run docker save &lt;image&gt;:&lt;tag&gt; -o image.tar and check the content of the tar.

答案1

得分: 1

可能是因为在执行过程中调用wg.Done()的次数与len(tarFiles)不相等,导致你的代码在wg.Wait()处挂起。

以下是修复后的代码示例:

    var wg sync.WaitGroup
    // wg.Add(len(tarFiles))

    for {
        header, err := tarBallReader.Next()
        if err != nil {
            break
        }
        wg.Add(1)
        go extractFileAsync(parentFolder, header, tarBallReader, depth, &wg)
    }
    wg.Wait()

func extractFileAsync(...) {
    defer wg.Done()

    // 一些代码

}

更新:修正了可能存在的竞态条件。感谢 @craigb 的指正。

这是我对类似问题的解决方案(简化版):

package main

import (
	"archive/tar"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strings"
	"sync"
)

type Semaphore struct {
	Wg sync.WaitGroup
	Ch chan int
}

// 同时运行的goroutine数量限制。
// 取决于处理器核心数量、存储性能、内存量等因素。
const grMax = 10

const tarFileName = "docker_image.tar"
const dstDir = "output/docker"

func extractTar(tarFileName string, dstDir string) error {
	f, err := os.Open(tarFileName)
	if err != nil {
		return err
	}

	sem := Semaphore{}
	sem.Ch = make(chan int, grMax)

	if err := Untar(dstDir, f, &sem, true); err != nil {
		return err
	}

	fmt.Println("extractTar: 等待完成")
	sem.Wg.Wait()
	return nil
}

func Untar(dst string, r io.Reader, sem *Semaphore, godeep bool) error {
	tr := tar.NewReader(r)

	for {
		header, err := tr.Next()

		switch {
		case err == io.EOF:
			return nil
		case err != nil:
			return err
		}

		// 目标位置,用于创建目录/文件
		target := filepath.Join(dst, header.Name)

		switch header.Typeflag {
		// 如果是目录且不存在,则创建目录
		case tar.TypeDir:
			if _, err := os.Stat(target); err != nil {
				if err := os.MkdirAll(target, 0755); err != nil {
					return err
				}
			}
		// 如果是文件,则创建文件
		case tar.TypeReg:
			if err := saveFile(tr, target, os.FileMode(header.Mode)); err != nil {
				return err
			}
			ext := filepath.Ext(target)

			// 如果是tar文件且在顶层目录,进行解压缩
			if ext == ".tar" && godeep {
				sem.Wg.Add(1)
				// 使用缓冲通道限制同时运行的goroutine数量
				sem.Ch <- 1
				// 文件解压缩到与文件名(不包含扩展名)相同的目录中
				newDir := filepath.Join(dst, strings.TrimSuffix(header.Name, ".tar"))
				if err := os.Mkdir(newDir, 0755); err != nil {
					return err
				}
				go func(target string, newDir string, sem *Semaphore) {
					fmt.Println("启动goroutine,通道长度:", len(sem.Ch))
					fmt.Println("开始:", target)
					defer sem.Wg.Done()
					defer func() { <-sem.Ch }()
					// 打开内部tar文件
					ft, err := os.Open(target)
					if err != nil {
						fmt.Println(err)
						return
					}
					defer ft.Close()
					// 这里的godeep参数为false,以避免解压缩当前归档文件内的归档文件。
					if err := Untar(newDir, ft, sem, false); err != nil {
						fmt.Println(err)
						return
					}
					fmt.Println("完成:", target)
				}(target, newDir, sem)
			}
		}
	}
	return nil
}

func saveFile(r io.Reader, target string, mode os.FileMode) error {
	f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, mode)
	if err != nil {
		return err
	}
	defer f.Close()

	if _, err := io.Copy(f, r); err != nil {
		return err
	}

	return nil
}

func main() {
	err := extractTar(tarFileName, dstDir)
	if err != nil {
		fmt.Println(err)
	}
}

希望对你有帮助!

英文:

Probably your code hangs on wg.Wait() due to the fact that the number of calls to wg.Done() during execution is not equal to len(tarFiles).

That should work:

    var wg sync.WaitGroup
    // wg.Add(len(tarFiles))

    for {
        header, err := tarBallReader.Next()
        if err != nil {
            break
        }
        wg.Add(1)
        go extractFileAsync(parentFolder, header, tarBallReader, depth, &amp;wg)
    }
    wg.Wait()

func extractFileAsync(...) {
    defer wg.Done()

    // some code

}

UPD: correction of a possible race condition. Thanks @craigb

Here is my solution to a similar problem (simplified):

package main

import (
	&quot;archive/tar&quot;
	&quot;fmt&quot;
	&quot;io&quot;
	&quot;os&quot;
	&quot;path/filepath&quot;
	&quot;strings&quot;
	&quot;sync&quot;
)

type Semaphore struct {
	Wg sync.WaitGroup
	Ch chan int
}

// Limit on the number of simultaneously running goroutines.
// Depends on the number of processor cores, storage performance, amount of RAM, etc.
const grMax = 10

const tarFileName = &quot;docker_image.tar&quot;
const dstDir = &quot;output/docker&quot;

func extractTar(tarFileName string, dstDir string) error {
	f, err := os.Open(tarFileName)
	if err != nil {
		return err
	}

	sem := Semaphore{}
	sem.Ch = make(chan int, grMax)

	if err := Untar(dstDir, f, &amp;sem, true); err != nil {
		return err
	}

	fmt.Println(&quot;extractTar: wait for complete&quot;)
	sem.Wg.Wait()
	return nil
}


func Untar(dst string, r io.Reader, sem *Semaphore, godeep bool) error {

	tr := tar.NewReader(r)

	for {
		header, err := tr.Next()

		switch {
		case err == io.EOF:
			return nil
		case err != nil:
			return err
		}

		// the target location where the dir/file should be created
		target := filepath.Join(dst, header.Name)

		switch header.Typeflag {

		// if its a dir and it doesn&#39;t exist create it
		case tar.TypeDir:
			if _, err := os.Stat(target); err != nil {
				if err := os.MkdirAll(target, 0755); err != nil {
					return err
				}
			}

		// if it&#39;s a file create it
		case tar.TypeReg:
			if err := saveFile(tr, target, os.FileMode(header.Mode)); err != nil {
				return err
			}
			ext := filepath.Ext(target)

			// if it&#39;s tar file and we are on top level, extract it
			if ext == &quot;.tar&quot; &amp;&amp; godeep {
				sem.Wg.Add(1)
				// A buffered channel is used to limit the number of simultaneously running goroutines
				sem.Ch &lt;- 1
				// the file is unpacked to a directory with the file name (without extension)
				newDir := filepath.Join(dst, strings.TrimSuffix(header.Name, &quot;.tar&quot;))
				if err := os.Mkdir(newDir, 0755); err != nil {
					return err
				}
				go func(target string, newDir string, sem *Semaphore) {
					fmt.Println(&quot;start goroutine, chan length:&quot;, len(sem.Ch))
					fmt.Println(&quot;START:&quot;, target)
					defer sem.Wg.Done()
					defer func() {&lt;-sem.Ch}()
					// the internal tar file opens
					ft, err := os.Open(target)
					if err != nil {
						fmt.Println(err)
						return
					}
					defer ft.Close()
					// the godeep parameter is false here to avoid unpacking archives inside the current archive.
					if err := Untar(newDir, ft, sem, false); err != nil {
						fmt.Println(err)
						return
					}
					fmt.Println(&quot;DONE:&quot;, target)
				}(target, newDir, sem)
			}
		}
	}
	return nil
}

func saveFile(r io.Reader, target string, mode os.FileMode) error {
	f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, mode)
	if err != nil {
		return err
	}
	defer f.Close()

	if _, err := io.Copy(f, r); err != nil {
		return err
	}
	
	return nil
}

func main() {
	err := extractTar(tarFileName, dstDir)
	if err != nil {
		fmt.Println(err)
	}
}

huangapple
  • 本文由 发表于 2022年11月4日 00:30:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/74306502.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定