并发文件系统扫描

huangapple go评论117阅读模式
英文:

Concurrent filesystem scanning

问题

我想获取目录中文件的文件信息(文件名和字节大小)。但是有很多子目录(约1000个)和文件(约40,000个)。

实际上,我的解决方案是使用filepath.Walk()来获取每个文件的文件信息。但这需要很长时间。

func visit(path string, f os.FileInfo, err error) error {
    if f.Mode().IsRegular() {
        fmt.Printf("Visited: %s File name: %s Size: %d bytes\n", path, f.Name(), f.Size())
    }
    return nil
}

func main() {
    flag.Parse()
    root := "C:/Users/HERNOUX-06523/go/src/boilerpipe" //flag.Arg(0)
    filepath.Walk(root, visit)
}

是否可以使用filepath.Walk()进行并行/并发处理?

英文:

I want to obtain file information (file name & size in bytes) for the files in a directory. But there are a lot of sub-directory (~ 1000) and files (~40 000).

Actually my solution is to use filepath.Walk() to obtain file information for each file. But this is quite long.

func visit(path string, f os.FileInfo, err error) error {
	if f.Mode().IsRegular() {
		fmt.Printf("Visited: %s File name: %s Size: %d bytes\n", path, f.Name(), f.Size())
	}
	return nil
}
func main() {
	flag.Parse()
	root := "C:/Users/HERNOUX-06523/go/src/boilerpipe" //flag.Arg(0)
	filepath.Walk(root, visit)
}

Is it possible to do parallel/concurrent processing using filepath.Walk()?

答案1

得分: 13

你可以通过修改visit()函数来进行并发处理,而不是进入子文件夹,而是为每个子文件夹启动一个新的goroutine。

为了实现这一点,如果条目是一个目录,在visit()函数中返回特殊的filepath.SkipDir错误。不要忘记检查visit()中的path是否是goroutine应该处理的子文件夹,因为它也被传递给visit(),如果没有这个检查,你将无限地为初始文件夹启动goroutines。

此外,你还需要一种记录后台仍在工作的goroutine数量的方法,你可以使用sync.WaitGroup来实现。

这是一个简单的实现示例:

var wg sync.WaitGroup

func walkDir(dir string) {
    defer wg.Done()

    visit := func(path string, f os.FileInfo, err error) error {
        if f.IsDir() && path != dir {
            wg.Add(1)
            go walkDir(path)
            return filepath.SkipDir
        }
        if f.Mode().IsRegular() {
            fmt.Printf("Visited: %s File name: %s Size: %d bytes\n",
                path, f.Name(), f.Size())
        }
        return nil
    }

    filepath.Walk(dir, visit)
}

func main() {
    flag.Parse()
    root := "folder/to/walk" //flag.Arg(0)

    wg.Add(1)
    walkDir(root)
    wg.Wait()
}

一些注意事项:

根据文件在子文件夹中的分布情况,这种方法可能无法充分利用你的CPU/存储资源,例如,如果99%的文件都在一个子文件夹中,那个goroutine仍然会占据大部分时间。

另外要注意,fmt.Printf()调用是串行的,所以这也会减慢处理速度。我假设这只是一个示例,在实际情况下,你将在内存中进行一些处理/统计。不要忘记保护从visit()函数访问的变量的并发访问。

不用担心子文件夹的数量很多。这是正常的,Go运行时甚至可以处理数十万个goroutine。

还要注意,性能瓶颈很可能是存储/硬盘速度,所以你可能无法获得你期望的性能。在某个点(硬盘限制)之后,你将无法提高性能。

此外,为每个子文件夹启动一个新的goroutine可能不是最优的方法,通过限制遍历文件夹的goroutine数量,你可能会获得更好的性能。为此,请查看并使用工作池(worker pool):

https://stackoverflow.com/questions/38170852/is-this-an-idiomatic-worker-thread-pool-in-go/38172204#38172204

英文:

You may do concurrent processing by modifying your visit() function to not go into subfolders, but launch a new goroutine for each subfolder.

In order to do that, return the special filepath.SkipDir error from your visit() function if the entry is a directory. Don't forget to check if the path inside visit() is the subfolder the goroutine is ought to process, because that is also passed to visit(), and without this check you would launch goroutines endlessly for the initial folder.

Also you will need some kind of "counter" of how many goroutines are still working in the background, for that you may use sync.WaitGroup.

Here's a simple implementation of this:

var wg sync.WaitGroup

func walkDir(dir string) {
	defer wg.Done()

	visit := func(path string, f os.FileInfo, err error) error {
		if f.IsDir() && path != dir {
			wg.Add(1)
			go walkDir(path)
			return filepath.SkipDir
		}
		if f.Mode().IsRegular() {
			fmt.Printf("Visited: %s File name: %s Size: %d bytes\n",
				path, f.Name(), f.Size())
		}
		return nil
	}

	filepath.Walk(dir, visit)
}

func main() {
	flag.Parse()
	root := "folder/to/walk" //flag.Arg(0)

	wg.Add(1)
	walkDir(root)
	wg.Wait()
}

Some notes:

Depending on the "distribution" of files among subfolders, this may not fully utilize your CPU / storage, as if for example 99% of all the files are in one subfolder, that goroutine will still take the majority of time.

Also note that fmt.Printf() calls are serialized, so that will also slow down the process. I assume this was just an example, and in reality you will do some kind of processing / statistics in-memory. Don't forget to also protect concurrent access to variables accessed from your visit() function.

Don't worry about the high number of subfolders. It is normal and the Go runtime is capable of handling even hundreds of thousands of goroutines.

Also note that most likely the performance bottleneck will be your storage / hard disk speed, so you may not gain the performance you wish. After a certain point (your hard disk limit), you won't be able to improve performance.

Also launching a new goroutine for each subfolder may not be optimal, it may be that you get better performance by limiting the number of goroutines walking your folders. For that, check out and use a worker pool:

https://stackoverflow.com/questions/38170852/is-this-an-idiomatic-worker-thread-pool-in-go/38172204#38172204

答案2

得分: 0

我版本的解决这个问题的代码如下:

func ConcurrentDirWalker(wg *sync.WaitGroup, dir string, paths chan<- string) {
    walker := func(path string, info os.FileInfo, err error) error {
        if info.IsDir() && path != dir {
            wg.Add(1)
            go func() {
                defer wg.Done()
                ConcurrentDirWalker(wg, path, paths)
            }()
            return filepath.SkipDir
        }
        if info.Mode().IsRegular(){
            paths <- path
        }
        return nil
    }
    filepath.Walk(dir, walker)
}

collectorCh := make(chan string, 1000)
files := make([]string, 0)

var wgCollector sync.WaitGroup
wgCollector.Add(1)
go func() {
    defer wgCollector.Done()
    for path := range collectorCh {
        files = append(files, path)
    }
}()

root := "/users/me/dev/"
var wgWorkers sync.WaitGroup
wgWorkers.Add(1)
go func() {
    defer wgWorkers.Done()
    ConcurrentDirWalker(&wgWorkers, root, collectorCh)
}()

wgWorkers.Wait()
close(collectorCh)
wgCollector.Wait()

希望对你有帮助!

英文:

My version of solving this problem

func ConcurrentDirWalker(wg *sync.WaitGroup, dir string, paths chan&lt;- string) {
    walker := func(path string, info os.FileInfo, err error) error {
        if info.IsDir() &amp;&amp; path != dir {
            wg.Add(1)
            go func() {
                defer wg.Done()
                ConcurrentDirWalker(wg, path, paths)
            }()
            return filepath.SkipDir
        }
        if info.Mode().IsRegular(){
            paths &lt;- path
        }
        return nil
    }
    filepath.Walk(dir, walker)
}

collectorCh := make(chan string, 1000)
files := make([]string, 0)

var wgCollector sync.WaitGroup
wgCollector.Add(1)
go func() {
    defer wgCollector.Done()
    for path := range collectorCh {
        files = append(files, path)
    }
}()

root := &quot;/users/me/dev/&quot;
var wgWorkers sync.WaitGroup
wgWorkers.Add(1)
go func() {
    defer wgWorkers.Done()
    ConcurrentDirWalker(&amp;wgWorkers, root, collectorCh)
}()

wgWorkers.Wait()
close(collectorCh)
wgCollector.Wait()

huangapple
  • 本文由 发表于 2017年5月30日 15:13:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/44255814.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定