英文:
HTTP Chunked streaming to WebSocket
问题
我想听取多个使用传输编码响应的HTTP流,然后逐行获取消息,并将消息推送到一个频道中。然后,我想从该频道中读取,并稍后通过WebSocket推送。
func subscribe(ws chan<- string, group string) (scanner *bufio.Scanner, err error){
res, _ := req(STREAM_URL, channelTemplate(group))
reader := bufio.NewScanner(res.Body)
return reader, reader.Err()
}
func main() {
ws := make(chan string)
request, _ := http.NewRequest("GET", URL, nil)
request.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, _ := client.Do(request)
ac := ACResponse{}
json.NewDecoder(resp.Body).Decode(&ac)
resp.Body.Close()
var scanners = make([]*bufio.Scanner, 0)
for _, group := range ac.Groups {
fmt.Println("Started worker for", group)
//listen to all stream URLs
scanner, err := subscribe(ws, group)
if err != nil {
panic(err)
}
// keep track of Scanner to read later
scanners = append(scanners, scanner)
}
for {
select {
case msg := <-ws:
fmt.Println("[events] ", msg)
default:
randScanner := rand.Intn(len(ac.Groups)-1)
fmt.Println("Reading from", randScanner)
reader := scanners[randScanner]
reader.Scan()
if err := reader.Err(); err != nil {
panic(err)
}
text := reader.Text()
ws <- text
}
}
}
程序在reader.Scan()
处阻塞。输出为Reading from 1
,然后没有其他输出。我查看了Wireshark,消息确实传输过来了。
如何在Go中更好地设计这个问题?
英文:
I want to listen to multiple HTTP streams that are transfer encoded responses, then fetch the messages from them line by line and then push the messages to one channel. I want to then read from the channel and push through a websocket later.
func subscribe(ws chan<- string, group string) (scanner *bufio.Scanner, err error){
res, _ := req(STREAM_URL, channelTemplate(group))
reader := bufio.NewScanner(res.Body)
return reader, reader.Err()
}
func main() {
ws := make(chan string)
request, _ := http.NewRequest("GET", URL, nil)
request.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, _ := client.Do(request)
ac := ACResponse{}
json.NewDecoder(resp.Body).Decode(&ac)
resp.Body.Close()
var scanners = make([]*bufio.Scanner, 0)
for _, group := range ac.Groups {
fmt.Println("Started worker for", group)
//listen to all stream URLs
scanner, err := subscribe(ws, group)
if err != nil {
panic(err)
}
// keep track of Scanner to read later
scanners = append(scanners, scanner)
}
for {
select {
case msg := <-ws:
fmt.Println("[events] ", msg)
default:
randScanner := rand.Intn(len(ac.Groups)-1)
fmt.Println("Reading from", randScanner)
reader := scanners[randScanner]
reader.Scan()
if err := reader.Err(); err != nil {
panic(err)
}
text := reader.Text()
ws <- text
}
}
}
The program is blocking at reader.Scan()
. The output is Reading from 1
and nothing else. I looked at wireshark, and the messages are coming through.
How can I design this problem better with Go?
答案1
得分: 1
主要问题是将数据发送到无缓冲通道ws
。要解决此问题,请将ws
更改为带缓冲的通道:
ws := make(chan string, 1)
第二个问题是在main()
函数中,即使扫描器达到文件末尾(EOF),它仍然继续读取。问题出在以下代码行:
reader.Scan()
if err := reader.Err(); err != nil {
panic(err)
}
text := reader.Text()
Scan()
在EOF时返回false,但是忽略了返回值。Err()
在EOF时返回nil。修改应用程序以使用Scan()
的返回值。
还有一个问题是main()
函数在读取任何一个扫描器时会阻塞。为了避免在单个连接上阻塞,可以启动一个goroutine来读取每个连接:
func subscribe(wg *sync.WaitGroup, ws chan<- string, group string) {
defer wg.Done()
res, err := req(STREAM_URL, channelTemplate(group))
if err != nil {
// 处理错误
}
defer resp.Body.Close()
reader := bufio.NewScanner(res.Body)
for reader.Scan() {
ws <- reader.Text()
}
if err := reader.Err(); err != nil {
// 处理错误
}
}
func main() {
ws := make(chan string)
request, _ := http.NewRequest("GET", URL, nil)
request.Header.Add("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(request)
if err != nil {
// 处理错误
}
var ac ACResponse
if err := json.NewDecoder(resp.Body).Decode(&ac); err != nil {
// 处理错误
}
resp.Body.Close()
var wg sync.WaitGroup
for _, group := range ac.Groups {
wg.Add(1)
go subscribe(&wg, ws, group)
}
go func() {
wg.Wait()
close(ws)
}()
for msg := range ws {
fmt.Println("[events] ", msg)
}
}
上述代码未经编译和测试。我已标记出需要进行错误处理的位置。我编写了代码以在所有连接达到EOF后退出main()
函数。这可能或可能不是您在应用程序中想要的行为。
英文:
Main blocks sending to the unbuffered channel ws
. To fix this issue, change ws
to a buffered channel:
ws := make(chan string, 1)
A second issue is that main() continues to read scanners after they reach EOF. The problem is on these lines:
reader.Scan()
if err := reader.Err(); err != nil {
panic(err)
}
text := reader.Text()
Scan() returns false at EOF, but the return from scan is ignored. Err() returns nil on EOF. Modify the application to use the return value from Scan().
Yet another issue is that main blocks on read of any one scanner. To avoid blocking on a single connection, start a goroutine to read each connection:
func subscribe(wg *sync.WaitGroup, ws chan<- string, group string) {
defer wg.Done()
res, err := req(STREAM_URL, channelTemplate(group))
if err ! nil {
// handle error
}
defer resp.Body.Close()
reader := bufio.NewScanner(res.Body)
for reader.Scan() {
ws <- reader.Text()
}
if err := reader.Err(); err != nil {
// handle error
}
}
func main() {
ws := make(chan string)
request, _ := http.NewRequest("GET", URL, nil)
request.Header.Add("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(request)
if err != nil {
// handle error
}
var ac ACResponse
if err := json.NewDecoder(resp.Body).Decode(&ac); err != nil {
// handle error
}
resp.Body.Close()
var wg sync.WaitGroup
for _, group := range ac.Groups {
wg.Add(1)
go subscribe(&wg, ws, group)
}
go func() {
wg.Wait()
close(ws)
}()
for msg := range ws {
fmt.Println("[events] ", msg)
}
}
The above code is uncompiled and untested. I've marked where error handling is required. I wrote the code to exit main after all connections reach EOF. That may or may not be what you want in your application.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论