合并数千万个文件的最快方法是什么?

huangapple go评论74阅读模式
英文:

What is the fastest way to merge tens of millions of files

问题

有50万个文件存储在一个Ubuntu计算机上,我想将这50万个文件合并成几个大文件,如何以最快的速度完成?

目前,我已经通过ls -1命令将要处理的文件名保存在filename.txt中。

我尝试编写了一个Go程序,它读取一个文件,然后将输出读取到另一个文件中,但我发现速度太慢了。实际的读取速度大约是每秒合并30-40个文件,需要超过16天才能完成。

有没有更好的快速合并的方法?

以下是我编写的Go代码:

const fileSizeLimit = (1 << 30) * 4 // 4GB
const filesStorePath = "<>"

func main() {
	fileNamesFile := ""
	outBasePath := ""

	startId := 0

	//del := false
	flag.StringVar(&fileNamesFile, "d", "", "filenames file")
	flag.StringVar(&outBasePath, "o", "", "out dir")
	flag.IntVar(&startId, "f", 0, "start fn")
	//flag.BoolVar(&del, "del", false, "del file")

	flag.Parse()

	start := time.Now()

	fmt.Printf("start:%s\n", start.Format("2006-01-02 15:04:05"))
	fmt.Printf("file names = %s\n", fileNamesFile)
	fmt.Printf("out dir = %s\n", outBasePath)

	allList, _ := ioutil.ReadFile(fileNamesFile)
	all := strings.Split(string(allList), "\n")
	total := len(all)
	store := newStoreItems(outBasePath, startId)

	uiLiveWriter := uilive.New()
	uiLiveWriter.Start()

	finish := make(chan bool, 1)
	pos := 0
	readCount := 0

	go func() {
		for i := pos; i < total; i++ {
			pos = i
			fn := all[i]

			f := path.Join(filesStorePath, fn)
			if content, err := ioutil.ReadFile(f); err == nil {
				store.write(content)
			}
		}
	}()

	go func() {
		ticker := time.NewTicker(1 * time.Second)
		// 当前文件
		for true {
			select {
			case <-ticker.C:
				t := time.Since(start)
				cost := t.Seconds()
				content := fmt.Sprintf("read %d/%d(%.2f%%), file=%d/%d, speed=%d/s\ttime %s\n",
					pos, total, float64(pos)/float64(total)*100,
					store.index, store.getSize(),
					int(float64(readCount)/cost),
					(time.Duration(cost) * time.Second).String())

				_, _ = fmt.Fprint(uiLiveWriter, content)
			}
		}
	}()

	osSignals := make(chan os.Signal, 1)
	signal.Notify(osSignals, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
	go func() {
		s := <-osSignals
		fmt.Println("stop !", s)

		finish <- false
	}()

	<-finish
	close(finish)

	_, _ = fmt.Fprintln(uiLiveWriter, "Finished ")
	uiLiveWriter.Stop() // flush and stop rendering
	fmt.Println("readCount", readCount)
	fmt.Println("exit 0")
}

type storeItems struct {
	basePath string
	w        *bufio.Writer
	file     *os.File
	size     int
	rowSize  int64
	index    int
	lock     sync.Mutex
}

func newStoreItems(storePath string, startFn int) *storeItems {
	fn := path.Join(storePath, strconv.Itoa(startFn))
	f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
	if err != nil {
		fmt.Printf("create [%s] fail! err: %s \n", fn, err)
	}

	return &storeItems{
		basePath: storePath,
		w:        bufio.NewWriterSize(f, util.GIGABYTE),
		file:     f,
		size:     0,
		index:    startFn,
	}
}

func (s *storeItems) getSize() int {
	return s.size
}

func (s *storeItems) nextFile() *os.File {
	if s.file != nil {
		_ = s.w.Flush()
		_ = s.file.Close()
	}
	nextIndex := s.index + 1

	s.file, _ = os.OpenFile(path.Join(s.basePath, strconv.Itoa(nextIndex)),
		os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
	s.w = bufio.NewWriterSize(s.file, util.GIGABYTE)
	s.index = nextIndex
	s.size = 0
	return s.file
}

func (s *storeItems) write(b []byte) {
	_, _ = s.w.Write(b)
	_, _ = s.w.WriteRune('\n')
	s.size += len(b) + 1

	if s.w.Size() >= fileSizeLimit {
		// cut off file
		s.nextFile()
	}
}

执行输出:

start:2022-07-22 05:03:09
file names = ***
out dir = ***
read 9057/50803783(0.02%), file=0/48151629, speed=40/s  time 3m41s

观察到的系统读写情况:读取速度为4 M/s ~ 9 M/s

我还尝试使用awkcat命令,但效果与Go代码差不多。

head ~/filename.txt -n 10000 | xargs awk '1' >> ~/out/0
sed -i '1,10000d' ~/filename.txt
英文:

There are 50 million files, stored on an ubuntu computer, I want to merge these 50 million into several large files, how to do it the fastest?
At present, I have saved the filename to be processed in filename.txt through the ls -1 command

I tried writing a go program that reads a file, and in turn reads the output to a file, but I found it too slow. The actual reading speed is about 1s to merge 30-40 files, and it takes more than 16 days to finish it.

Is there any good way to merge quickly?

Here is the go code I wrote:

const fileSizeLimit = (1 &lt;&lt; 30) * 4 // 4GB
const filesStorePath = &quot;&lt;&gt;&quot;
func main() {
	fileNamesFile := &quot;&quot;
	outBasePath := &quot;&quot;

	startId := 0

	//del := false
	flag.StringVar(&amp;fileNamesFile, &quot;d&quot;, &quot;&quot;, &quot;filenames file&quot;)
	flag.StringVar(&amp;outBasePath, &quot;o&quot;, &quot;&quot;, &quot;out dir&quot;)
	flag.IntVar(&amp;startId, &quot;f&quot;, 0, &quot;start fn&quot;)
	//flag.BoolVar(&amp;del, &quot;del&quot;, false, &quot;del file&quot;)

	flag.Parse()

	start := time.Now()

	fmt.Printf(&quot;start%s\n&quot;, start.Format(&quot;2006-01-02 15:04:05&quot;))
	fmt.Printf(&quot;file names = %s\n&quot;, fileNamesFile)
	fmt.Printf(&quot;out dir = %s\n&quot;, outBasePath)


	allList, _ := ioutil.ReadFile(fileNamesFile)
	all := strings.Split(string(allList), &quot;\n&quot;)
	total := len(all)
	store := newStoreItems(outBasePath, startId)

	uiLiveWriter := uilive.New()
	uiLiveWriter.Start()

	finish := make(chan bool, 1)
	pos := 0
	readCount := 0

	go func() {
		for i := pos; i &lt; total; i++ {
			pos = i
			fn := all[i]

			f := path.Join(filesStorePath, fn)
			if content, err := ioutil.ReadFile(f); err == nil {
				store.write(content)
			}
		}
	}()

	go func() {
		ticker := time.NewTicker(1 * time.Second)
		// 当前文件
		for true {
			select {
			case &lt;-ticker.C:
				t := time.Since(start)
				cost := t.Seconds()
				content := fmt.Sprintf(&quot;read %d/%d(%.2f%%), file=%d/%d, speed=%d/s\ttime %s\n&quot;,
					pos, total, float64(pos)/float64(total)*100,
					store.index, store.getSize(),
					int(float64(readCount) / cost),
					(time.Duration(cost) * time.Second).String())

				_, _ = fmt.Fprint(uiLiveWriter, content)
			}
		}
	}()

	osSignals := make(chan os.Signal, 1)
	signal.Notify(osSignals, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
	go func() {
		s := &lt;-osSignals
		fmt.Println(&quot;stop !&quot;, s)

		finish &lt;- false
	}()

	&lt;-finish
	close(finish)

	_, _ = fmt.Fprintln(uiLiveWriter, &quot;Finished &quot;)
	uiLiveWriter.Stop() // flush and stop rendering
	fmt.Println(&quot;readCount&quot;, readCount)
	fmt.Println(&quot;exit 0&quot;)


}


type storeItems struct {
	basePath string
	w        *bufio.Writer
	file     *os.File
	size     int
	rowSize  int64
	index    int
	lock     sync.Mutex
}

func newStoreItems(storePath string, startFn int) *storeItems {
	fn := path.Join(storePath, strconv.Itoa(startFn))
	f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
	if err != nil {
		fmt.Printf(&quot;create [%s] fail! err: %s \n&quot;, fn, err)
	}

	return &amp;storeItems{
		basePath: storePath,
		w:  bufio.NewWriterSize(f, util.GIGABYTE),
		file: f,
		size:  0,
		index: startFn,
	}
}

func (s *storeItems) getSize() int {
	return s.size
}

func (s *storeItems) nextFile() *os.File {
	if s.file != nil {
		_ = s.w.Flush()
		_ = s.file.Close()
	}
	nextIndex := s.index+1

	s.file, _ = os.OpenFile(path.Join(s.basePath, strconv.Itoa(nextIndex)),
		os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
	s.w = bufio.NewWriterSize(s.file, util.GIGABYTE)
	s.index = nextIndex
	s.size = 0
	return s.file
}

func (s *storeItems) write(b []byte) {
	_, _ = s.w.Write(b)
	_, _ = s.w.WriteRune(&#39;\n&#39;)
	s.size += len(b) + 1

	if s.w.Size() &gt;= fileSizeLimit {
		// cut off file
		s.nextFile()
	}
}

execute output:

start:2022-07-22 05:03:09
file names = ***
out dir = ***
read 9057/50803783(0.02%), file=0/48151629, speed=40/s  time 3m41s

Observed system reads and writes: read: 4 M/s ~ 9 M/s

I have also tried using awk and cat commands, but the effect is about the same as go.

head ~/filename.txt -n 10000 | xargs awk &#39;1&#39; &gt;&gt; ~/out/0
sed -i &#39;1,10000d&#39; ~/filename.txt

答案1

得分: 1

我会使用不同的工具来完成这个任务:使用cat命令来合并现有内容,使用split命令来创建具有所需输出大小的块。例如:

cat filename.txt | xargs cat | split -b 1M

在我的电脑上,使用一百万个测试文件,这个命令的运行速度大约是每秒处理 100,000 个文件,所以在 10 分钟内可以完成 50M 个文件的处理。不过,我是在 tmpfs 上运行的,并且每个文件只有 4 个字节。

所以,这些数字反映了最理想的情况。如果在你的情况下,磁盘(或文件系统)的速度是瓶颈,那么我认为你无法做太多改变。不过,我不认为你的设置会将运行时间从十分钟增加到两周:)

英文:

I'd use separate tools here: cat for joining the existing content, and split for creating chunks that have the desired output size. E.g.:

cat filename.txt | xargs cat | split -b 1M

With a million test files this runs at about 100K files/s on my PC, so it would complete for 50M files within 10 minutes. I did run this on tmpfs though and with only 4 bytes/file, though.

So, those numbers reflect the best case scenarion. If disk (or filesystem) speed is the bottleneck in your case then there's little that you can do about it I think. Still, I wouldn't expect that your setup would increase the runtime from ten minutes to two weeks : )

huangapple
  • 本文由 发表于 2022年7月22日 13:09:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/73075364.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定