英文:
How to structure a Go web server, using channels and goroutines?
问题
我正在实现一个服务器来流式传输多个浮点数数组。我需要一些帮助来设计我的系统以实现以下目标:
- 音频处理必须是独立的,即使没有任何请求进来,它也能正常工作。我的当前方法是使DataProcess函数等待直到有请求。
- 由于通道只能向一个请求提供数据,如何让2个或更多的请求获取我在DataProcess中准备的数据?
- 为了实际流式传输数据,请求处理程序不能等待整个DataProcess完成,有没有办法在DataProcess的每次迭代完成时立即将数据传递给处理程序?
感谢任何回复。这是我目前的想法:
package main
import (
"fmt"
"io"
"net/http"
"strconv"
"time"
)
func main() {
c := AudioProcess()
handleHello := makeHello(c)
http.HandleFunc("/", handleHello)
http.ListenAndServe(":8000", nil)
}
func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
for item := range c { // this loop runs when channel c is closed
io.WriteString(w, item)
}
}
}
func AudioProcess() chan string {
c := make(chan string)
go func() {
for i := 0; i <= 10; i++ { // Iterate the audio file
c <- strconv.Itoa(i) // have my frame of samples, send to channel c
time.Sleep(time.Second)
fmt.Println("send ", i) // logging
}
close(c) // done processing, close channel c
}()
return c
}
请注意,我只会翻译代码部分,不会回答关于翻译的问题。
英文:
I am implementing a server to stream many arrays of floats. I need some help to design my system to achieve the following:
- The audio process must be independent and does its work even if there isn't any request coming in. My current approach make the DataProcess function to wait until there is a request.
- Because the channel can only give data to 1 request, how can 2 or more requests get the data I've prepared in the DataProcess?
- To actually stream data, the request handler cant wait for the whole DataProcess to finish, is there anyway to give the handler data as soon as we complete each iteration in the DataProcess?
Any reply is appreciated. This is my current thoughts:
package main
import (
"fmt"
"io"
"net/http"
"strconv"
"time"
)
func main() {
c := AudioProcess()
handleHello := makeHello(c)
http.HandleFunc("/", handleHello)
http.ListenAndServe(":8000", nil)
}
func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
for item := range c { // this loop runs when channel c is closed
io.WriteString(w, item)
}
}
}
func AudioProcess() chan string {
c := make(chan string)
go func() {
for i := 0; i <= 10; i++ { // Iterate the audio file
c <- strconv.Itoa(i) // have my frame of samples, send to channel c
time.Sleep(time.Second)
fmt.Println("send ", i) // logging
}
close(c) // done processing, close channel c
}()
return c
}
答案1
得分: 1
我不完全确定这是否解决了你的问题,因为我不完全了解你的用例,但是无论如何,我已经提供了下面的解决方案。
我在HTTP路由器中使用了Gin,因为对我来说更方便,但我相信你可以根据自己的情况调整代码。我匆忙完成了这个(抱歉),所以可能存在我不知道的问题,但如果有任何问题,请告诉我。
简而言之:
- 我创建了一个
Manager
来管理多个Client
。它还包含一个sync.Mutex
来确保在任何给定时间只有一个“线程”在修改clients
; - 有一个
InitBackgroundTask()
函数,它会生成一个随机的float64
数,并将其传递给Manager
中的所有clients
(如果有的话)。如果没有任何clients
,我们只是睡眠并继续执行... - index处理程序处理添加和删除clients。通过UUID来标识clients;
- 现在可能发生3件事情。当clients通过
<-c.Writer.CloseNotify()
通道断开连接时(因为该方法返回并调用defer
),它们会自动被删除。我们还可以在下一个后台任务的“tick”中接收到随机的float64
数。最后,如果在20秒内没有收到任何内容,我们也可以终止。
我在这里对你的需求做了几个假设(例如,后台任务每隔Y分钟返回X)。如果你想要更精细的流式传输,我建议使用WebSockets(下面的模式仍然可以使用)。
如果你有任何问题,请告诉我。
代码:
package main
import (
"github.com/gin-gonic/gin"
"github.com/satori/go.uuid"
"log"
"math/rand"
"net/http"
"sync"
"time"
)
type Client struct {
uuid string
out chan float64
}
type Manager struct {
clients map[string]*Client
mutex sync.Mutex
}
func NewManager() *Manager {
m := new(Manager)
m.clients = make(map[string]*Client)
return m
}
func (m *Manager) AddClient(c *Client) {
m.mutex.Lock()
defer m.mutex.Unlock()
log.Printf("add client: %s\n", c.uuid)
m.clients[c.uuid] = c
}
func (m *Manager) DeleteClient(id string) {
m.mutex.Lock()
defer m.mutex.Unlock()
// log.Println("delete client: %s", c.uuid)
delete(m.clients, id)
}
func (m *Manager) InitBackgroundTask() {
for {
f64 := rand.Float64()
log.Printf("active clients: %d\n", len(m.clients))
for _, c := range m.clients {
c.out <- f64
}
log.Printf("sent output (%+v), sleeping for 10s...\n", f64)
time.Sleep(time.Second * 10)
}
}
func main() {
r := gin.Default()
m := NewManager()
go m.InitBackgroundTask()
r.GET("/", func(c *gin.Context) {
cl := new(Client)
cl.uuid = uuid.NewV4().String()
cl.out = make(chan float64)
defer m.DeleteClient(cl.uuid)
m.AddClient(cl)
select {
case <-c.Writer.CloseNotify():
log.Printf("%s : disconnected\n", cl.uuid)
case out := <-cl.out:
log.Printf("%s : received %+v\n", out)
c.JSON(http.StatusOK, gin.H{
"output": out,
})
case <-time.After(time.Second * 20):
log.Println("timed out")
}
})
r.Run()
}
注意:如果你在Chrome上进行测试,你可能需要在URL的末尾添加一个随机参数,以便实际进行请求,例如?rand=001
,?rand=002
等等。
英文:
I'm not entirely sure if this addresses your problem as I'm not fully aware of your use case, but nevertheless, I've come up with a solution below.
I've used Gin for the HTTP router because it was more comfortable to me, but I'm pretty sure you can adapt the code to fit yours. I did this in a hurry (sorry), so there may be problems I'm not aware of, but do let me know if there are any.
In short:
- I created a
Manager
that takes care of severalClient
. It also contains async.Mutex
to ensure only one thread is modifying theclients
at any given time; - There is an
InitBackgroundTask()
that will generate a randomfloat64
number, and pass it to ALLclients
in aManager
(if there are any). If there aren't anyclients
, we just sleep, and carry on... - The index handler deals with adding, and removing clients. Clients are identified through a UUID;
- 3 things can happen now. Clients are automatically removed when they disconnect via the
<-c.Writer.CloseNotify()
channel (because the method returns thereby calling thedefer
). We can also receive the randomfloat64
number in the next background task tick. Finally, we can also terminate if we have not received anything in 20s.
I made several assumptions about your needs here (e.g. that the background task will return X every Y minutes). If you are looking for more fine grain streaming, I'd recommend using websockets instead (and the pattern below can still be used).
Let me know if you have any questions.
Code:
package main
import (
"github.com/gin-gonic/gin"
"github.com/satori/go.uuid"
"log"
"math/rand"
"net/http"
"sync"
"time"
)
type Client struct {
uuid string
out chan float64
}
type Manager struct {
clients map[string]*Client
mutex sync.Mutex
}
func NewManager() *Manager {
m := new(Manager)
m.clients = make(map[string]*Client)
return m
}
func (m *Manager) AddClient(c *Client) {
m.mutex.Lock()
defer m.mutex.Unlock()
log.Printf("add client: %s\n", c.uuid)
m.clients[c.uuid] = c
}
func (m *Manager) DeleteClient(id string) {
m.mutex.Lock()
defer m.mutex.Unlock()
// log.Println("delete client: %s", c.uuid)
delete(m.clients, id)
}
func (m *Manager) InitBackgroundTask() {
for {
f64 := rand.Float64()
log.Printf("active clients: %d\n", len(m.clients))
for _, c := range m.clients {
c.out <- f64
}
log.Printf("sent output (%+v), sleeping for 10s...\n", f64)
time.Sleep(time.Second * 10)
}
}
func main() {
r := gin.Default()
m := NewManager()
go m.InitBackgroundTask()
r.GET("/", func(c *gin.Context) {
cl := new(Client)
cl.uuid = uuid.NewV4().String()
cl.out = make(chan float64)
defer m.DeleteClient(cl.uuid)
m.AddClient(cl)
select {
case <-c.Writer.CloseNotify():
log.Printf("%s : disconnected\n", cl.uuid)
case out := <-cl.out:
log.Printf("%s : received %+v\n", out)
c.JSON(http.StatusOK, gin.H{
"output": out,
})
case <-time.After(time.Second * 20):
log.Println("timed out")
}
})
r.Run()
}
Note: if you're testing this on Chrome, you might have to append a random parameter at the end of the URL so that the request will actually be made, e.g. ?rand=001
, ?rand=002
and so on.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论