在Go语言中进行并行压缩

huangapple go评论78阅读模式
英文:

Parallel zip compression in Go

问题

我正在尝试从大量的中小型文件中构建一个zip归档文件。我希望能够并发地进行此操作,因为压缩是CPU密集型的,并且我正在运行在一个多核服务器上。另外,我不想将整个归档文件保存在内存中,因为它可能会很大。

我的问题是,我是否需要压缩每个文件,然后手动将所有内容与zip头、校验和等组合在一起?

非常感谢任何帮助。

英文:

I am trying build a zip archive from a large number of small-medium sized files. I want to be able to do this concurrently, since compression is CPU intensive, and I'm running on a multi core server. Also I don't want to have the whole archive in memory, since its might turn out to be large.

My question is that do I have to compress every file and then combine manually combine everything together with zip header, checksum etc?

Any help would be greatly appreciated.

答案1

得分: 8

我不认为你可以合并zip头。

你可以这样做:在一个单独的goroutine中顺序运行zip.Writer,然后为每个要读取的文件生成一个新的goroutine,并将它们传递给正在压缩的goroutine。

这样做可以减少通过顺序读取文件时产生的IO开销,尽管它可能不会利用多个核心进行归档本身。

这是一个可行的示例。请注意,为了保持简单,

  • 它没有很好地处理错误,如果出现问题,只会引发panic,
  • 它没有过多使用defer语句,以展示事情发生的顺序。

由于defer是LIFO的2,当你将它们堆叠在一起时,有时可能会令人困惑。

package main

import (
	"archive/zip"
	"io"
	"os"
	"sync"
)

func ZipWriter(files chan *os.File) *sync.WaitGroup {
	f, err := os.Create("out.zip")
	if err != nil {
		panic(err)
	}
	var wg sync.WaitGroup
	wg.Add(1)
	zw := zip.NewWriter(f)
	go func() {
		// 注意顺序(LIFO):
		defer wg.Done() // 2. 信号我们已经完成
		defer f.Close() // 1. 关闭文件
		var err error
		var fw io.Writer
		for f := range files {
			// 循环直到通道关闭。
			if fw, err = zw.Create(f.Name()); err != nil {
				panic(err)
			}
			io.Copy(fw, f)
			if err = f.Close(); err != nil {
				panic(err)
			}
		}
		// 在调用f.Close()之前,必须关闭zip writer!
		if err = zw.Close(); err != nil {
			panic(err)
		}
	}()
	return &wg
}

func main() {
	files := make(chan *os.File)
	wait := ZipWriter(files)

	// 将所有文件发送到zip writer。
	var wg sync.WaitGroup
	wg.Add(len(os.Args)-1)
	for i, name := range os.Args {
		if i == 0 {
			continue
		}
		// 并行读取每个文件:
		go func(name string) {
			defer wg.Done()
			f, err := os.Open(name)
			if err != nil {
				panic(err)
			}
			files <- f
		}(name)
	}

	wg.Wait()
	// 一旦我们完成了文件的发送,我们可以关闭通道。
	close(files)
	// 这将导致ZipWriter退出循环,关闭文件,并解除下一个互斥锁的阻塞:
	wait.Wait()
}

用法:go run example.go /path/to/*.log

以下是事情发生的顺序:

  1. 打开输出文件进行写入。
  2. 使用该文件创建zip.Writer
  3. 启动一个goroutine在通道上监听文件。
  4. 遍历每个文件,可以在每个文件上使用一个goroutine完成。
  5. 将每个文件发送到第3步创建的goroutine。
  6. 在处理完每个文件后,关闭文件以释放资源。
  7. 一旦每个文件都被发送到第3步的goroutine,关闭通道。
  8. 等待压缩完成(按顺序进行)。
  9. 一旦压缩完成(通道耗尽),应关闭zip writer。
  10. 只有在关闭zip writer后,才应关闭输出文件。
  11. 最后一切都关闭了,所以关闭sync.WaitGroup以告诉调用函数我们可以继续了(这里也可以使用通道,但sync.WaitGroup更优雅)。
  12. 当你从zip writer得到一切都正常关闭的信号时,你可以退出main并正常终止。

这可能不能回答你的问题,但我曾经使用类似的代码为一个Web服务实时生成zip归档文件。尽管实际的压缩是在单个goroutine中完成的,但性能表现不错。克服IO瓶颈已经是一个改进。

英文:

I don't think you can combine the zip headers.

What you could do is, run the zip.Writer sequentially, in a separate goroutine, and then spawn a new goroutine for each file that you want to read, and pipe those to the goroutine that is zipping them.

This should reduce the IO overhead that you get by reading the files sequentially, although it probably won't leverage multiple cores for the archiving itself.

Here's a working example. Note that, to keep things simple,

  • it does not handle errors nicely, just panics if something goes wrong,
  • and it does not use the defer statement too much, to demonstrate the order in which things should happen.

Since defer is LIFO, it can sometimes be confusing when you stack a lot of them together.

package main
import (
&quot;archive/zip&quot;
&quot;io&quot;
&quot;os&quot;
&quot;sync&quot;
)
func ZipWriter(files chan *os.File) *sync.WaitGroup {
f, err := os.Create(&quot;out.zip&quot;)
if err != nil {
panic(err)
}
var wg sync.WaitGroup
wg.Add(1)
zw := zip.NewWriter(f)
go func() {
// Note the order (LIFO):
defer wg.Done() // 2. signal that we&#39;re done
defer f.Close() // 1. close the file
var err error
var fw io.Writer
for f := range files {
// Loop until channel is closed.
if fw, err = zw.Create(f.Name()); err != nil {
panic(err)
}
io.Copy(fw, f)
if err = f.Close(); err != nil {
panic(err)
}
}
// The zip writer must be closed *before* f.Close() is called!
if err = zw.Close(); err != nil {
panic(err)
}
}()
return &amp;wg
}
func main() {
files := make(chan *os.File)
wait := ZipWriter(files)
// Send all files to the zip writer.
var wg sync.WaitGroup
wg.Add(len(os.Args)-1)
for i, name := range os.Args {
if i == 0 {
continue
}
// Read each file in parallel:
go func(name string) {
defer wg.Done()
f, err := os.Open(name)
if err != nil {
panic(err)
}
files &lt;- f
}(name)
}
wg.Wait()
// Once we&#39;re done sending the files, we can close the channel.
close(files)
// This will cause ZipWriter to break out of the loop, close the file,
// and unblock the next mutex:
wait.Wait()
}

Usage: go run example.go /path/to/*.log.

This is the order in which things should be happening:

  1. Open output file for writing.
  2. Create a zip.Writer with that file.
  3. Kick off a goroutine listening for files on a channel.
  4. Go through each file, this can be done in one goroutine per file.
  5. Send each file to the goroutine created in step 3.
  6. After processing each file in said goroutine, close the file to free up resources.
  7. Once each file has been sent to said goroutine, close the channel.
  8. Wait until the zipping has been done (which is done sequentially).
  9. Once zipping is done (channel exhausted), the zip writer should be closed.
  10. Only when the zip writer is closed, should the output file be closed.
  11. Finally everything is closed, so close the sync.WaitGroup to tell the calling function that we're good to go. (A channel could also be used here, but sync.WaitGroup seems more elegant.)
  12. When you get the signal from the zip writer that everything is properly closed, you can exit from main and terminate nicely.

This might not answer your question, but I've been using similar code to generate zip archives on-the-fly for a web service some time ago. It performed quite well, even though the actual zipping was done in a single goroutine. Overcoming the IO bottleneck can already be an improvement.

答案2

得分: 2

从外观上看,您无法使用标准库archive/zip包并行压缩,原因如下:

  1. 压缩是由zip.Writer.CreateCreateHeader返回的io.Writer执行的。
  2. 调用Create/CreateHeader会隐式关闭上一次调用返回的写入器。

因此,将Create返回的写入器传递给多个goroutine并并行写入它们是行不通的。

如果您想编写自己的并行zip写入器,可能会按照以下方式进行结构化:

  1. 使用compress/flate模块的多个goroutine压缩文件,并跟踪CRC32值和未压缩数据的长度。输出应该定向到临时文件。注意数据的压缩大小。
  2. 一旦所有内容都被压缩,开始写入Zip文件,从头开始。
  3. 为每个压缩文件写入文件头,然后写入相应临时文件的内容。
  4. 在文件末尾写入中央目录记录和结束记录。此时应该有所有所需的信息。

为了增加并行性,可以使用通道来指示每个文件的压缩何时完成,从而在步骤1中与其余步骤并行执行。

由于文件格式的限制,如果不将压缩数据存储在内存或临时文件中,您将无法进行并行压缩。

英文:

From the look of it, you won't be able to parallelise the compression using the standard library archive/zip package because:

  1. Compression is performed by the io.Writer returned by zip.Writer.Create or CreateHeader.
  2. Calling Create/CreateHeader implicitly closes the writer returned by the previous call.

So passing the writers returned by Create to multiple goroutines and writing to them in parallel will not work.

If you wanted to write your own parallel zip writer, you'd probably want to structure it something like this:

  1. Have multiple goroutines compress files using the compress/flate module, and keep track of the CRC32 value and length of the uncompressed data. The output should be directed to temporary files. Note the compressed size of the data.
  2. Once everything has been compressed, start writing the Zip file starting with the header.
  3. Write out the file header followed by the contents of the corresponding temporary file for each compressed file.
  4. Write out the central directory record and end record at the end of the file. All the required information should be available at this point.

For added parallelism, step 1 could be performed in parallel with the remaining steps by using a channel to indicate when compression of each file completes.

Due to the file format, you won't be able to perform parallel compression without either storing compressed data in memory or in temporary files.

答案3

得分: 2

使用Go1.17版本,可以使用archive/zip包进行并行压缩和合并zip文件。

下面是一个示例。在示例中,我创建了zip工作者来创建单独的zip文件,并创建了一个条目提供者工作者,通过通道向zip工作者提供要添加到zip文件中的条目。实际文件可以提供给zip工作者,但我跳过了这部分。

package main

import (
	"archive/zip"
	"context"
	"fmt"
	"io"
	"log"
	"os"
	"strings"

	"golang.org/x/sync/errgroup"
)

const numOfZipWorkers = 10

type entry struct {
	name string
	rc   io.ReadCloser
}

func main() {
	log.SetFlags(log.LstdFlags | log.Lshortfile)

	entCh := make(chan entry, numOfZipWorkers)
	zpathCh := make(chan string, numOfZipWorkers)

	group, ctx := errgroup.WithContext(context.Background())

	for i := 0; i < numOfZipWorkers; i++ {
		group.Go(func() error {
			return zipWorker(ctx, entCh, zpathCh)
		})
	}

	group.Go(func() error {
		defer close(entCh) // Signal workers to stop.
		return entryProvider(ctx, entCh)
	})

	err := group.Wait()
	if err != nil {
		log.Fatal(err)
	}

	f, err := os.OpenFile("output.zip", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
	if err != nil {
		log.Fatal(err)
	}

	zw := zip.NewWriter(f)

	close(zpathCh)
	for path := range zpathCh {
		zrd, err := zip.OpenReader(path)
		if err != nil {
			log.Fatal(err)
		}
		for _, zf := range zrd.File {
			err := zw.Copy(zf)
			if err != nil {
				log.Fatal(err)
			}
		}
		_ = zrd.Close()
		_ = os.Remove(path)
	}
	err = zw.Close()
	if err != nil {
		log.Fatal(err)
	}
	err = f.Close()
	if err != nil {
		log.Fatal(err)
	}
}

func entryProvider(ctx context.Context, entCh chan<- entry) error {
	for i := 0; i < 2*numOfZipWorkers; i++ {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case entCh <- entry{
			name: fmt.Sprintf("file_%d", i+1),
			rc:   io.NopCloser(strings.NewReader(fmt.Sprintf("content %d", i+1))),
		}:
		}
	}
	return nil
}

func zipWorker(ctx context.Context, entCh <-chan entry, zpathch chan<- string) error {
	f, err := os.CreateTemp(".", "tmp-part-*")
	if err != nil {
		return err
	}

	zw := zip.NewWriter(f)
Loop:
	for {
		var (
			ent entry
			ok  bool
		)
		select {
		case <-ctx.Done():
			err = ctx.Err()
			break Loop
		case ent, ok = <-entCh:
			if !ok {
				break Loop
			}
		}

		hdr := &zip.FileHeader{
			Name:   ent.name,
			Method: zip.Deflate, // zip.Store can also be used.
		}
		hdr.SetMode(0644)

		w, e := zw.CreateHeader(hdr)
		if e != nil {
			_ = ent.rc.Close()
			err = e
			break
		}

		_, e = io.Copy(w, ent.rc)
		_ = ent.rc.Close()
		if e != nil {
			err = e
			break
		}
	}

	if e := zw.Close(); e != nil && err == nil {
		err = e
	}
	if e := f.Close(); e != nil && err == nil {
		err = e
	}
	if err == nil {
		select {
		case <-ctx.Done():
			err = ctx.Err()
		case zpathch <- f.Name():
		}
	}
	return err
}

英文:

With Go1.17, parallel compression and merging of zip files are possible using the archive/zip package.

An example is below. In the example, I create zip workers to create individual zip files and an entry provider worker which provides entries to be added to a zip file via a channel to zip workers. Actual files can be provided to the zip workers but I skipped that part.

package main

import (
	&quot;archive/zip&quot;
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;io&quot;
	&quot;log&quot;
	&quot;os&quot;
	&quot;strings&quot;

	&quot;golang.org/x/sync/errgroup&quot;
)

const numOfZipWorkers = 10

type entry struct {
	name string
	rc   io.ReadCloser
}

func main() {
	log.SetFlags(log.LstdFlags | log.Lshortfile)

	entCh := make(chan entry, numOfZipWorkers)
	zpathCh := make(chan string, numOfZipWorkers)

	group, ctx := errgroup.WithContext(context.Background())

	for i := 0; i &lt; numOfZipWorkers; i++ {
		group.Go(func() error {
			return zipWorker(ctx, entCh, zpathCh)
		})
	}

	group.Go(func() error {
		defer close(entCh) // Signal workers to stop.
		return entryProvider(ctx, entCh)
	})

	err := group.Wait()
	if err != nil {
		log.Fatal(err)
	}

	f, err := os.OpenFile(&quot;output.zip&quot;, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
	if err != nil {
		log.Fatal(err)
	}

	zw := zip.NewWriter(f)

	close(zpathCh)
	for path := range zpathCh {
		zrd, err := zip.OpenReader(path)
		if err != nil {
			log.Fatal(err)
		}
		for _, zf := range zrd.File {
			err := zw.Copy(zf)
			if err != nil {
				log.Fatal(err)
			}
		}
		_ = zrd.Close()
		_ = os.Remove(path)
	}
	err = zw.Close()
	if err != nil {
		log.Fatal(err)
	}
	err = f.Close()
	if err != nil {
		log.Fatal(err)
	}
}

func entryProvider(ctx context.Context, entCh chan&lt;- entry) error {
	for i := 0; i &lt; 2*numOfZipWorkers; i++ {
		select {
		case &lt;-ctx.Done():
			return ctx.Err()
		case entCh &lt;- entry{
			name: fmt.Sprintf(&quot;file_%d&quot;, i+1),
			rc:   io.NopCloser(strings.NewReader(fmt.Sprintf(&quot;content %d&quot;, i+1))),
		}:
		}
	}
	return nil
}

func zipWorker(ctx context.Context, entCh &lt;-chan entry, zpathch chan&lt;- string) error {
	f, err := os.CreateTemp(&quot;.&quot;, &quot;tmp-part-*&quot;)
	if err != nil {
		return err
	}

	zw := zip.NewWriter(f)
Loop:
	for {
		var (
			ent entry
			ok  bool
		)
		select {
		case &lt;-ctx.Done():
			err = ctx.Err()
			break Loop
		case ent, ok = &lt;-entCh:
			if !ok {
				break Loop
			}
		}

		hdr := &amp;zip.FileHeader{
			Name:   ent.name,
			Method: zip.Deflate, // zip.Store can also be used.
		}
		hdr.SetMode(0644)

		w, e := zw.CreateHeader(hdr)
		if e != nil {
			_ = ent.rc.Close()
			err = e
			break
		}

		_, e = io.Copy(w, ent.rc)
		_ = ent.rc.Close()
		if e != nil {
			err = e
			break
		}
	}

	if e := zw.Close(); e != nil &amp;&amp; err == nil {
		err = e
	}
	if e := f.Close(); e != nil &amp;&amp; err == nil {
		err = e
	}
	if err == nil {
		select {
		case &lt;-ctx.Done():
			err = ctx.Err()
		case zpathch &lt;- f.Name():
		}
	}
	return err
}

huangapple
  • 本文由 发表于 2014年4月11日 15:14:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/23005917.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定