英文:
golang, goroutines race condition in test
问题
我需要在发布消息到主题之前订阅一个主题(主题是一个频道),但是在创建线程时,我需要运行go Func来持续监听频道以处理消息(例如从发布或订阅一个新的订阅)。测试是有效的(但不是每次都有效),有时当我运行测试时,它会在我监听主题(频道)之前发布一条消息到频道(主题)。
以下是要翻译的内容:
我有这个测试:
func Test_useCase_publish(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
tt.fields.storage = &RepositoryMock{
GetTopicFunc: func(ctx context.Context, topicName vos.TopicName) (entities.Topic, error) {
return tt.fields.topic, nil
},
}
useCase := New(tt.fields.storage)
subscribed := make(chan struct{})
go func() {
tt.fields.topic.Activate()
ch, _, err := useCase.Subscribe(tt.args.ctx, tt.args.message.TopicName)
require.NoError(t, err)
close(subscribed)
msg, ok := <-ch
if ok {
fmt.Println("msg", msg)
assert.Equal(t, tt.want, msg)
}
}()
<-subscribed
err := useCase.Publish(tt.args.ctx, tt.args.message)
assert.ErrorIs(t, err, tt.wantErr)
})
}
}
主题:
func (t Topic) Activate() {
go t.listenForSubscriptions()
go t.listenForMessages()
go t.listenForKills()
}
func (t *Topic) listenForSubscriptions() {
for newSubCh := range t.newSubCh {
t.Subscribers.Store(newSubCh.GetID(), newSubCh)
}
}
func (t *Topic) listenForKills() {
for subscriberID := range t.killSubCh {
t.Subscribers.Delete(subscriberID)
}
}
func (t *Topic) listenForMessages() {
for msg := range t.newMessageCh {
m := msg
t.Subscribers.Range(func(key, value interface{}) bool {
if key == nil || value == nil {
return false
}
if subscriber, ok := value.(Subscriber); ok {
subscriber.ReceiveMessage(m)
}
return true
})
}
}
func (t Topic) Dispatch(message vos.Message) {
t.newMessageCh <- message
}
func (t *Topic) listenForMessages() {
for msg := range t.newMessageCh {
m := msg
t.Subscribers.Range(func(key, value interface{}) bool {
if key == nil || value == nil {
return false
}
if subscriber, ok := value.(Subscriber); ok {
subscriber.ReceiveMessage(m)
}
return true
})
}
}
订阅:
func (u useCase) Subscribe(ctx context.Context, topicName vos.TopicName) (chan vos.Message, vos.SubscriberID, error) {
if err := topicName.Validate(); err != nil {
return nil, "", err
}
topic, err := u.storage.GetTopic(ctx, topicName)
if err != nil {
if !errors.Is(err, entities.ErrTopicNotFound) {
return nil, "", err
}
topic, err = u.createTopic(ctx, topicName)
if err != nil {
return nil, "", err
}
subscriber := entities.NewSubscriber(topic)
subscriptionCh, id := subscriber.Subscribe()
return subscriptionCh, id, nil
}
subscriber := entities.NewSubscriber(topic)
subscriptionCh, id := subscriber.Subscribe()
return subscriptionCh, id, nil
}
func (s Subscriber) Subscribe() (chan vos.Message, vos.SubscriberID) {
s.topic.addSubscriber(s)
return s.subscriptionCh, s.GetID()
}
func (s Subscriber) ReceiveMessage(msg vos.Message) {
s.subscriptionCh <- msg
}
发布者:
func (u useCase) Publish(ctx context.Context, message vos.Message) error {
if err := message.Validate(); err != nil {
return err
}
topic, err := u.storage.GetTopic(ctx, message.TopicName)
if err != nil {
return err
}
topic.Dispatch(message)
return nil
}
当我调用订阅(我向订阅频道发送一条消息并将订阅添加到我的线程)时,当我向主题发布消息时,我会向主题频道发送一条消息。
英文:
I need to subscribe to a topic(topic is a channel) before publishing to a topic, but when creating a thread I need to run go Func to keep listening to channels to process messages (for example from publish or subscribe a new subscribe )
the test works (but not every time), sometimes when I run the test it ends up posting a message on the channel (topic) before I'm listening to the topic (channel)
i have this test:
func Test_useCase_publish(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
tt.fields.storage = &RepositoryMock{
GetTopicFunc: func(ctx context.Context, topicName vos.TopicName) (entities.Topic, error) {
return tt.fields.topic, nil
},
}
useCase := New(tt.fields.storage)
subscribed := make(chan struct{})
go func() {
tt.fields.topic.Activate()
ch, _, err := useCase.Subscribe(tt.args.ctx, tt.args.message.TopicName)
require.NoError(t, err)
close(subscribed)
msg, ok := <-ch
if ok {
fmt.Println("msg", msg)
assert.Equal(t, tt.want, msg)
}
}()
<-subscribed
err := useCase.Publish(tt.args.ctx, tt.args.message)
assert.ErrorIs(t, err, tt.wantErr)
})
}
}
topic :
func (t Topic) Activate() {
go t.listenForSubscriptions()
go t.listenForMessages()
go t.listenForKills()
}
func (t *Topic) listenForSubscriptions() {
for newSubCh := range t.newSubCh {
t.Subscribers.Store(newSubCh.GetID(), newSubCh)
}
}
func (t *Topic) listenForKills() {
for subscriberID := range t.killSubCh {
t.Subscribers.Delete(subscriberID)
}
}
func (t *Topic) listenForMessages() {
for msg := range t.newMessageCh {
m := msg
t.Subscribers.Range(func(key, value interface{}) bool {
if key == nil || value == nil {
return false
}
if subscriber, ok := value.(Subscriber); ok {
subscriber.ReceiveMessage(m)
}
return true
})
}
func (t Topic) Dispatch(message vos.Message) {
t.newMessageCh <- message
}
func (t *Topic) listenForMessages() {
for msg := range t.newMessageCh {
m := msg
t.Subscribers.Range(func(key, value interface{}) bool {
if key == nil || value == nil {
return false
}
if subscriber, ok := value.(Subscriber); ok {
subscriber.ReceiveMessage(m)
}
return true
})
}
}
subscribe:
func (u useCase) Subscribe(ctx context.Context, topicName vos.TopicName) (chan vos.Message, vos.SubscriberID, error) {
if err := topicName.Validate(); err != nil {
return nil, "", err
}
topic, err := u.storage.GetTopic(ctx, topicName)
if err != nil {
if !errors.Is(err, entities.ErrTopicNotFound) {
return nil, "", err
}
topic, err = u.createTopic(ctx, topicName)
if err != nil {
return nil, "", err
}
subscriber := entities.NewSubscriber(topic)
subscriptionCh, id := subscriber.Subscribe()
return subscriptionCh, id, nil
}
subscriber := entities.NewSubscriber(topic)
subscriptionCh, id := subscriber.Subscribe()
return subscriptionCh, id, nil
}
func (s Subscriber) Subscribe() (chan vos.Message, vos.SubscriberID) {
s.topic.addSubscriber(s)
return s.subscriptionCh, s.GetID()
}
func (s Subscriber) ReceiveMessage(msg vos.Message) {
s.subscriptionCh <- msg
}
publisher :
func (u useCase) Publish(ctx context.Context, message vos.Message) error {
if err := message.Validate(); err != nil {
return err
}
topic, err := u.storage.GetTopic(ctx, message.TopicName)
if err != nil {
return err
}
topic.Dispatch(message)
return nil
}
when I call subscribe (I send a message to a subscribe to channel and add a subscribe to my thread) when I post a message to a topic I send a message to topic channel
答案1
得分: 1
你展示的代码中缺少一些部分,比如.Subscribe()
和.Publish()
的代码,或者通道是如何实例化的(它们是缓冲还是非缓冲的)。
不过有一点可以确定:
从(t *Topic) listenForSubscriptions()
的代码来看,这个订阅方法没有向订阅者发送任何信号表明已经注册成功。
所以我猜测:你的useCase.Subscribe(...)
调用中包含了创建的通道写入newSubCH
的信息,但它没有得到t.Subcribers.Store(...)
已完成的信息。
因此,根据goroutine的调度方式,你的测试函数中的消息发送可能会在通道实际注册之前发生。
要解决这个问题,你可以添加一些代码来向调用者发送一个信号。一种可能的方式是:
type subscribeReq struct{
ch chan Message
done chan struct{}
}
// 将 Topic.newSubCh 改为 chan *subscribeReq
func (t *Topic) listenForSubscriptions() {
for req := range t.newSubCh {
t.Subscribers.Store(newSubCh.GetID(), req.ch)
close(req.done)
}
}
另一个问题是:你的测试函数没有检查使用go func(){ ... }()
调用启动的goroutine是否完成,所以你的单元测试过程可能在goroutine执行fmt.Println(msg)
之前就退出了。
一种常见的检查方法是使用sync.WaitGroup
:
t.Run(tt.name, func(t *testing.T) {
...
useCase := New(tt.fields.storage)
subscribed := make(chan struct{})
wg := &sync.WaitGroup{} // 创建一个 *sync.WaitGroup
wg.Add(1) // 增加 1(你只启动了一个 goroutine)
go func() {
defer wg.Done() // 在返回时调用 wg.Done()
...
}()
// 发送消息,检查是否发生错误
wg.Wait() // 在此处阻塞,直到 goroutine 完成
})
英文:
Some points are missing from the code you show, such as the code for .Subscribe()
and .Publish()
, or how the channels are instanciated (are they buffered/unbuffered ?).
One point can be said, though :
from the looks of (t *Topic) listenForSubscriptions()
: this subscribing method does not send any signal to the subscriber that it has been registered.
So my guess is : your useCase.Subscribe(...)
call has the information that the created channel has been written on newSubCH
, but it hasn't got the inforamtion that t.Subcribers.Store(...)
has completed.
So, depending on how the goroutines are scheduled, the message sending in your test function can occur before the channel has actually been registered.
To fix this, you add something that will send a signal back to the caller. One possible way :
type subscribeReq struct{
ch chan Message
done chan struct{}
}
// turn Topic.newSubCh into a chan *subscribeReq
func (t *Topic) listenForSubscriptions() {
for req := range t.newSubCh {
t.Subscribers.Store(newSubCh.GetID(), req.ch)
close(req.done)
}
}
Another point : your test function does not check if the goroutine spun with your go func(){ ... }()
call completes at all, so your unit test process may also exit before the goroutine has had the chance to execute fmt.Println(msg)
.
A common way to check this is to use a sync.WaitGroup
:
t.Run(tt.name, func(t *testing.T) {
...
useCase := New(tt.fields.storage)
subscribed := make(chan struct{})
wg := &sync.WaitGroup{} // create a *sync.WaitGroup
wg.Add(1) // increment by 1 (you start only 1 goroutine)
go func() {
defer wg.Done() // have the goroutine call wg.Done() when returning
...
}()
// send message, check that no error occurs
wg.Wait() // block here until the goroutine has completed
})
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论