为什么我的 Go 通道会返回相同的元素多次?

huangapple go评论93阅读模式
英文:

Why does my Go channel return the same element more then once

问题

我有一个简单的应用程序,用于读取MongoDB的复制oplog,将结果序列化为Go结构并发送到一个通道进行处理。目前,我正在从该通道中读取并简单地打印出结构中的值。

我尝试使用for/range从通道中读取值,直接从通道中读取值,以及将其放入带有超时的select语句中。结果都是一样的。每次运行代码时,我都会从通道中获得不同的结果。有时候,我从通道中读取相同的值1-3次,甚至有时候是4次,即使只有一次写入。

这通常只发生在初始加载时(拉取旧记录),在读取通道中的实时添加时似乎不会发生。是否存在一种问题,即在第一次读取时,读取速度过快,导致项目在第一次读取之前被移除?

package main

import (
	"fmt"
	"labix.org/v2/mgo"
	"labix.org/v2/mgo/bson"
)

type Operation struct {
	Id        int64  `bson:"h" json:"id"`
	Operator  string `bson:"op" json:"operator"`
	Namespace string `bson:"ns" json:"namespace"`
	Select    bson.M `bson:"o" json:"select"`
	Update    bson.M `bson:"o2" json:"update"`
	Timestamp int64  `bson:"ts" json:"timestamp"`
}

func Tail(collection *mgo.Collection, Out chan<- *Operation) {
	iter := collection.Find(nil).Tail(-1)
	var oper *Operation

	for {
		for iter.Next(&oper) {
			fmt.Println("\n<<", oper.Id)
			Out <- oper
		}

		if err := iter.Close(); err != nil {
			fmt.Println(err)
			return
		}
	}
}

func main() {
	session, err := mgo.Dial("127.0.0.1")

	if err != nil {
		panic(err)
	}
	defer session.Close()

	c := session.DB("local").C("oplog.rs")

	cOper := make(chan *Operation, 1)

	go Tail(c, cOper)

	for operation := range cOper {
		fmt.Println()
		fmt.Println("Id: ", operation.Id)
		fmt.Println("Operator: ", operation.Operator)
		fmt.Println("Namespace: ", operation.Namespace)
		fmt.Println("Select: ", operation.Select)
		fmt.Println("Update: ", operation.Update)
		fmt.Println("Timestamp: ", operation.Timestamp)
	}
}

以上是你提供的代码。

英文:

I have a simple application that I am working on to read MongoDB's replication oplog, serialize the results into a Go structure and send it to a channel to be processed. Currently I am reading from that channel and simply printing out the values inside of the structure.

I have tried reading the values from the channel using for/range, simple reading directly from it, and putting it inside of a select with a timeout. The results are all the same. Each time I run the code I get different results from the channel. I see each time the channel is being written too One time however reading from that channel I sometimes read out the same value 1-3 sometimes even 4 times, even with only a single write.

This usually happens only on the initial load (pulling in the older records) and doesn't seem to occur when reading live additions to the channel. Is there some problem where reading from the channel too fast happens before the item gets removed from it the first time its read?

package main
import (
&quot;fmt&quot;
&quot;labix.org/v2/mgo&quot;
&quot;labix.org/v2/mgo/bson&quot;
)
type Operation struct {
Id        int64  `bson:&quot;h&quot; json:&quot;id&quot;`
Operator  string `bson:&quot;op&quot; json:&quot;operator&quot;`
Namespace string `bson:&quot;ns&quot; json:&quot;namespace&quot;`
Select    bson.M `bson:&quot;o&quot; json:&quot;select&quot;`
Update    bson.M `bson:&quot;o2&quot; json:&quot;update&quot;`
Timestamp int64  `bson:&quot;ts&quot; json:&quot;timestamp&quot;`
}
func Tail(collection *mgo.Collection, Out chan&lt;- *Operation) {
iter := collection.Find(nil).Tail(-1)
var oper *Operation
for {
for iter.Next(&amp;oper) {
fmt.Println(&quot;\n&lt;&lt;&quot;, oper.Id)
Out &lt;- oper
}
if err := iter.Close(); err != nil {
fmt.Println(err)
return
}
}
}
func main() {
session, err := mgo.Dial(&quot;127.0.0.1&quot;)
if err != nil {
panic(err)
}
defer session.Close()
c := session.DB(&quot;local&quot;).C(&quot;oplog.rs&quot;)
cOper := make(chan *Operation, 1)
go Tail(c, cOper)
for operation := range cOper {
fmt.Println()
fmt.Println(&quot;Id: &quot;, operation.Id)
fmt.Println(&quot;Operator: &quot;, operation.Operator)
fmt.Println(&quot;Namespace: &quot;, operation.Namespace)
fmt.Println(&quot;Select: &quot;, operation.Select)
fmt.Println(&quot;Update: &quot;, operation.Update)
fmt.Println(&quot;Timestamp: &quot;, operation.Timestamp)
}
}

答案1

得分: 6

我认为你正在重复使用*Operation,这可能导致问题。例如:

c := make(chan *int, 1)

go func() {
    val := new(int)
    for i := 0; i < 10; i++ {
        *val = i
        c <- val
    }
    close(c)
}()

for val := range c {
    time.Sleep(time.Millisecond * 1)
    fmt.Println(*val)
}

这段代码的输出结果是:

2
3
4
5
6
7
8
9
9
9

而且更重要的是,它不是线程安全的。你可以尝试这样修改:

for {
    for {
        var oper *Operation
        if !iter.Next(&oper) {
            break
        }
        fmt.Println("<<", oper.Id)
        Out <- oper
    }
    ...
}

或者使用一个普通的Operation而不是*Operation(因为没有指针,值会被复制)。

英文:

I think you're reusing your *Operation which is causing issues. For example:

http://play.golang.org/p/_MeSBLWPwN

c := make(chan *int, 1)
go func() {
val := new(int)
for i :=0; i&lt;10; i++ {
*val = i
c &lt;- val
}
close(c)
}()
for val := range c {
time.Sleep(time.Millisecond * 1)
fmt.Println(*val)
}

This code results in:

2
3
4
5
6
7
8
9
9
9

And more importantly it's not thread safe. Try doing this maybe:

for {
for { 
var oper *Operation
if !iter.Next(&amp;oper) {
break
}
fmt.Println(&quot;\n&lt;&lt;&quot;, oper.Id)
Out &lt;- oper
}
...
}

Or use a plain Operation instead of a *Operation. (Because without the pointer the value is copied)

答案2

得分: 3

我认为你正在每次反序列化时都将其反序列化到同一个结构体实例中,因此导致通道读取相同的对象并由发送方重写。尝试将其初始化移到循环中,这样每次都会创建一个新的实例。

你还可以使用go run -racego build -race运行此代码,它会警告此类问题。

英文:

I think what you're doing is deserializing into the same instance of a struct each time, therefore having the same object read by the channel and rewritten by the sender. Try to simply move the initialization of it into the loop so you'll create a new one each time.

You can also run this code with go run -race or go build -race, it warns about this sort of stuff.

huangapple
  • 本文由 发表于 2014年2月1日 04:33:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/21489545.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定