Golang上传整个目录并发返回打开文件太多的错误。

huangapple go评论96阅读模式
英文:

Golang upload whole dir concurent return to many open files

问题

我正在尝试将整个目录上传到服务器。对于小目录来说是可以工作的,但是对于100多张图片的目录,它会返回错误"打开文件太多"。我在读取完文件后立即关闭文件。有没有什么办法可以解决这个问题?

以下是我的代码:

func uploadDir(path string) error {
    dir, err := os.Open(path)
    if err != nil {
        return err
    }

    files, err := dir.Readdirnames(-1)
    if err != nil {
        return err
    }
    dir.Close()

    errChan := make(chan error)
    resChan := make(chan *client.PutResult)
    remaining := len(files)
    for _, file := range files {
        file := file
        go func() {
            file, err := os.Open(path + "/" + file)
            if err != nil {
                errChan <- err
            }
            c := client.NewClient(os.Getenv("DROPS_SERVER"))
            res, err := c.Upload(client.NewUploadHandleFromReader(file))
            file.Close()
            if err != nil {
                errChan <- err
            }
            resChan <- res
        }()
    }

    for {
        select {
        case res := <-resChan:
            log.Println(res)
            remaining--
        case err := <-errChan:
            if err != nil {
                return err
            }
        }
        if remaining == 0 {
            break
        }
    }
    return nil
}

希望对你有帮助!

英文:

I am trying to upload a whole dir to the server. It works with small directories but whit 100 + pictures it return error "to many open files". I close the file right after it gets read from. Any idea how to fix this?

this is my code

    func uploadDir(path string) error {
	dir, err := os.Open(path)
	if err != nil {
		return err
	}

	files, err := dir.Readdirnames(-1)
	if err != nil {
		return err
	}
	dir.Close()

	errChan := make(chan error)
	resChan := make(chan *client.PutResult)
	remaining := len(files)
	for _, file := range files {
		file := file
		go func() {
			file, err := os.Open(path + &quot;/&quot; + file)
			if err != nil {
				errChan &lt;- err
			}
			c := client.NewClient(os.Getenv(&quot;DROPS_SERVER&quot;))
			res, err := c.Upload(client.NewUploadHandleFromReader(file))
			file.Close()
			if err != nil {
				errChan &lt;- err
			}
			resChan &lt;- res
		}()
	}

	for {
		select {
		case res := &lt;-resChan:
			log.Println(res)
            remaining--
		case err := &lt;-errChan:
			if err != nil {
				return err
			}
		}
		if remaining == 0 {
			break
		}
	}
	return nil
}

答案1

得分: 5

原始代码没有限制活动goroutine的数量,因此也没有限制打开文件描述符的数量。一些操作系统对打开文件描述符的数量有限制。修复的方法是创建固定数量的worker goroutine。

func uploadDir(path string) error {

    // 读取目录并关闭。

    dir, err := os.Open(path)
    if err != nil {
        return err
    }
    names, err := dir.Readdirnames(-1)
    if err != nil {
        return err
    }
    dir.Close()

    // 将名称复制到一个通道中,供worker消费。关闭通道以便在所有工作完成时停止worker。

    namesChan := make(chan string, len(names))
    for _, name := range names {
        namesChan <- name
    }
    close(namesChan)

    // 创建最多8个worker

    workers := 8
    if len(names) < workers {
        workers = len(names)
    }

    errChan := make(chan error, 1)
    resChan := make(chan *client.PutResult, len(names))

    // 运行worker

    for i := 0; i < workers; i++ {
        go func() {
            // 从namesChan消费工作。当没有更多工作时,循环将结束。
            for name := range namesChan {
                file, err := os.Open(filepath.Join(path, name))
                if err != nil {
                    select {
                    case errChan <- err:
                        // 将打破父goroutine的循环
                    default:
                        // 不关心,第一个错误获胜
                    }
                    return
                }
                c := client.NewClient(os.Getenv("DROPS_SERVER"))
                res, err := c.Upload(client.NewUploadHandleFromReader(file))
                file.Close()
                if err != nil {
                    select {
                    case errChan <- err:
                        // 将打破父goroutine的循环
                    default:
                        // 不关心,第一个错误获胜
                    }
                    return
                }
                resChan <- res
            }
        }()
    }

    // 收集worker的结果

    for i := 0; i < len(names); i++ {
        select {
        case res := <-resChan:
            log.Println(res)
        case err := <-errChan:
            return err
        }
    }
    return nil
}

作为奖励,我修改了通道的大小和发送操作,以便在出现错误时goroutine不会被阻塞。

英文:

The original code does not limit the number of active go routines and therefore does not limit the number of open file descriptors. Several operating systems have limits on the number of open file descriptors. The fix is to create a fixed number of worker go routines.

func uploadDir(path string) error {
// Read directory and close.
dir, err := os.Open(path)
if err != nil {
return err
}
names, err := dir.Readdirnames(-1)
if err != nil {
return err
}
dir.Close()
// Copy names to a channel for workers to consume. Close the
// channel so that workers stop when all work is complete.
namesChan := make(chan string, len(names))
for _, name := range names {
namesChan &lt;- name
}
close(namesChan)
// Create a maximum of 8 workers
workers := 8
if len(names) &lt; workers {
workers = len(names)
}
errChan := make(chan error, 1)
resChan := make(chan *client.PutResult, len(names))
// Run workers
for i := 0; i &lt; workers; i++ {
go func() {
// Consume work from namesChan. Loop will end when no more work.
for name := range namesChan {
file, err := os.Open(filepath.Join(path, name))
if err != nil {
select {
case errChan &lt;- err:
// will break parent goroutine out of loop
default:
// don&#39;t care, first error wins
}
return
}
c := client.NewClient(os.Getenv(&quot;DROPS_SERVER&quot;))
res, err := c.Upload(client.NewUploadHandleFromReader(file))
file.Close()
if err != nil {
select {
case errChan &lt;- err:
// will break parent goroutine out of loop
default:
// don&#39;t care, first error wins
}
return
}
resChan &lt;- res
}
}()
}
// Collect results from workers 
for i := 0; i &lt; len(names); i++ {
select {
case res := &lt;-resChan:
log.Println(res)
case err := &lt;-errChan:
return err
}
}
return nil
}

As a bonus, I modified channel sizes and send operations so that goroutines are not stuck when there's an error.

答案2

得分: 2

为了发送目录,我会简单地在本地对目录进行归档/压缩,然后上传它。

然而,如果你真的想这样做,一个简单的技巧是设置一个最大上传限制(即最大打开文件限制)。

在任何系统上(osx/linux,不确定windows),你都有一个最大打开文件描述符限制。你可以手动增加这个数字以允许更多的文件打开,但要小心内存消耗。
如果我记得正确的话,默认限制是1024。

以下是一个示例代码,用于上传目录:

package main

import (
	"log"
	"os"
)

func uploadDir(path string, maxOpen int) error {
	dir, err := os.Open(path)
	if err != nil {
		return err
	}

	files, err := dir.Readdirnames(-1)
	if err != nil {
		return err
	}
	dir.Close()

	errChan := make(chan error)
	resChan := make(chan *client.PutResult)
	doneChan := make(chan bool)
	remaining := len(files)

	limit := make(chan struct{}, maxOpen)
	for i := 0; i < maxOpen; i++ {
		limit <- struct{}{}
	}
	for _, file := range files {
		file := file
		go func() {
			<-limit
			defer func() {
				limit <- struct{}{}
			}()
			file, err := os.Open(path + "/" + file)
			if err != nil {
				errChan <- err
			}
			c := client.NewClient(os.Getenv("DROPS_SERVER"))
			res, err := c.Upload(client.NewUploadHandleFromReader(file))
			file.Close()
			if err != nil {
				errChan <- err
			}
			resChan <- res
			doneChan <- true
		}()
	}

	for {
		select {
		case _ = <-doneChan:
			remaining--
		case res := <-resChan:
			log.Println(res)
		case err := <-errChan:
			if err != nil {
				return err
			}
		}
		if remaining == 0 {
			break
		}
	}
	return nil
}

希望对你有所帮助!

英文:

In order to send directories, I would simply archive/compress the directory locally and then upload it.

However, if you really want to do this, an easy trick is to set a max upload limit (i.e. max open file limit)

On any system (osx/linux, not sure about windows), you have a maximum open fd limit. You can manually increase that number to allow more, but be careful about your memory consumption.
If I recall correctly, the default limit is 1024.

http://play.golang.org/p/yp-vvxiJJx

package main
import (
&quot;log&quot;
&quot;os&quot;
)
func uploadDir(path string, maxOpen int) error {
dir, err := os.Open(path)
if err != nil {
return err
}
files, err := dir.Readdirnames(-1)
if err != nil {
return err
}
dir.Close()
errChan := make(chan error)
resChan := make(chan *client.PutResult)
doneChan := make(chan bool)
remaining := len(files)
limit := make(chan struct{}, maxOpen)
for i := 0; i &lt; maxOpen; i++ {
limit &lt;- struct{}{}
}
for _, file := range files {
file := file
go func() {
&lt;-limit
defer func() {
limit &lt;- struct{}{}
}()
file, err := os.Open(path + &quot;/&quot; + file)
if err != nil {
errChan &lt;- err
}
c := client.NewClient(os.Getenv(&quot;DROPS_SERVER&quot;))
res, err := c.Upload(client.NewUploadHandleFromReader(file))
file.Close()
if err != nil {
errChan &lt;- err
}
resChan &lt;- res
doneChan &lt;- true
}()
}
for {
select {
case _ = &lt;-doneChan:
remaining--
case res := &lt;-resChan:
log.Println(res)
case err := &lt;-errChan:
if err != nil {
return err
}
}
if remaining == 0 {
break
}
}
return nil
}

huangapple
  • 本文由 发表于 2014年9月9日 04:19:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/25732514.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定