对于以下的使用场景,我应该使用哪种Go并发模式?

huangapple go评论77阅读模式
英文:

Which go concurrency pattern should I use for the following use case?

问题

我有两个go routines

  • NewFileEntryCheck()
  • UploadFromQueueToCloud()

go NewFileEntryCheck()

这个函数将接受一个fsnotify.Watcher()作为参数,并在指定目录中无限循环地监视任何文件更改。

如果有任何新的file CREATE事件发生,它将将文件名添加到Queue中。

go UploadFromQueueToCloud()

这个go routine应该从Queue中获取文件名,从路径中读取文件并将其上传到云端。

现在,我的问题是,如何使UploadFromQueueToCloud()函数从队列中获取值并将其上传到云端?

由于NewFileEntryCheck()函数将持续检查新文件并将其添加到队列中,队列的长度是不固定的。我不能通过循环len(Queue)并通过通道接收数据。

是否有其他方法来实现这一点?

英文:

I'm having two go routines.

  • NewFileEntryCheck()
  • UploadFromQueueToCloud()

go NewFileEntryCheck()

This function will take a fsnotify.Watcher() as argument and it will watch for any file change in the specified directory inside an infinite loop.

If there is any new file CREATE event occurs, it will add the filename to the Queue

go UploadFromQueueToCloud()

This go routine should take the filename from the Queue read the file from the path and upload it to cloud.

Now, my question is, How can I make the UploadFromQueueToCloud() function to get value from the Queue and upload it to cloud?

Since the NewFileEntryCheck() function will continuously check for new files and add it to the queue, the length of the Queue is not fixed. I can't loop through the len(Queue) and receive data through the channel.

Is there any other approach to achieve this?

答案1

得分: 1

这只是发布-订阅模型的一个常见示例。你有一个发布者 (NewFileEntryCheck) 和一个订阅者 (UploadFromQueueToCloud)。因为这可以用发布-订阅模型来建模,所以你应该以类似的方式设计。

你可能已经从 @jimb 那里收到了一个关于 Queue 是否是通道的评论。嗯,这是有原因的。因为 Golang 的通道行为就像一个队列。所以一个解决方案肯定是使用通道来建模。

使用通道,你可以从一个 Go 协程中放入任务/文件名。从另一个 Go 协程 UploadFromQueueToCloud 中,你可以消费该通道并异步处理数据。

以下是示例代码:

func UploadFromQueueToCloud(channel *chan string, exit *chan bool) {
    for {
        select {
        case file := <-channel:
            // 上传到云端...
            // 其他业务逻辑
        case <-exit:
            break
        }
    }
}

func NewFileEntryCheck(notifier *fsnotify.Watcher) {
    channel := make(chan string)
    exit := make(chan bool)
    for {
        if some breaking condition {
            exit <- true
            break
        }
        // 检查文件通知器
        // 如果发生文件创建,
        // 将文件名放入 FileName
        channel <- FileName
        go UploadFromQueueToCloud(channel, exit)
    }
}

注意:你可以通过将通道更改为带缓冲的通道来增加吞吐量。

希望对你有所帮助。

英文:

This is just a common example of pub-sub model. You have one publisher (NewFileEntryCheck) and one subscriber (UploadFromQueueToCloud). Because this can be modelled in the form of Pub-Sub model, you should make the design similarly.

There is already a comment that you might have received from @jimb asking that if the Queue is a channel. Well there is a reason for it. It's because golang channel behaves just like a queue. So one solution is definitely to model it using a channel.

Using channel, you can put the task/file name from one go routine. From another go routine UploadFromQueueToCloud, you can consume that channel and process the data asynchronously.

Here is the sample code

func UploadFromQueueToCloud(channel *chan string, exit *chan bool) {
   	for {
		select {
           case file := &lt;- channel:
               // Upload the cloud..
               // Other business logics
           case exit := &lt;- exit:
               break;
        }
	}
}

func NewFileEntryCheck(notifier *fsnotify.Watcher) {
   channel := make(chan string)
   exit := make(chan bool)
   for {
        if (some breaking condition) {
             exit &lt;- true;
             break;
        }
        // Check for the file notifier
        // If file CREATE happened,
        // get the file name into FileName
        channel &lt;- FileName;
        go UploadFromQueueToCloud(channel, exit)
   }
} 

Note: You can increase the throughput by changing the channel into a buffered channel.

Hope that helps.

答案2

得分: 0

我的案例是:

func main() {
    allFileCheck := make(chan bool)
    fileUpload := make(chan string)
    ctx, cancel := context.WithCancel(context.Background())
    go NewFileEntryCheck(ctx, fileUpload, allFileCheck)

L:
    for {
        select {
        case file := <-fileUpload:
            go UploadFromQueueToCloud(ctx, file)
        case <-allFileCheck:
            fmt.Println("完成")
            break L
        case <-time.After(5 * time.Second):
            cancel()
        }
    }
}

func UploadFromQueueToCloud(ctx context.Context, file string) {
    fmt.Printf("上传文件 %v\n", file)
}

func NewFileEntryCheck(ctx context.Context, fileUpload chan string, allFileCheck chan bool) {
    fmt.Println("检查文件")
    for i := 0; i < 5; i++ {
        fileUpload <- "名称 " + strconv.Itoa(i)
    }
    allFileCheck <- true
}

希望这可以帮助到你!

英文:

My case is:

func main() {
	allFileCheck := make(chan bool)
	fileUpload := make(chan string)
	ctx, cancel := context.WithCancel(context.Background())
	go NewFileEntryCheck(ctx, fileUpload, allFileCheck)

L:
	for {
		select {
		case file := &lt;- fileUpload:
			go UploadFromQueueToCloud(ctx, file)
		case &lt;-allFileCheck:
			fmt.Println(&quot;done&quot;)
			break L
		case &lt;-time.After(5 * time.Second):
			cancel()
		}
	}
}

func UploadFromQueueToCloud(ctx context.Context, file string) {
	fmt.Printf(&quot;Upload file %v\n&quot;, file)
}

func NewFileEntryCheck(ctx context.Context, fileUpload chan string, allFileCheck chan bool) {
	fmt.Println(&quot;Check file&quot;)
	for i := 0; i &lt; 5; i++ {
		fileUpload &lt;- &quot;name &quot; + strconv.Itoa(i)
	}
	allFileCheck &lt;- true
}

huangapple
  • 本文由 发表于 2022年10月18日 20:27:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/74110976.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定