Golang测试使用通道不退出。

huangapple go评论115阅读模式
英文:

Golang test with channels does not exit

问题

以下是翻译好的内容:

以下的Golang测试永远不会退出。我怀疑这与通道死锁有关,但作为一个Go新手,我不太确定。

  1. const userName = "xxxxxxxxxxxx"
  2. func TestSynchroninze(t *testing.T) {
  3. c, err := channel.New(github.ChannelName, authToken)
  4. if err != nil {
  5. t.Fatalf("Could not create channel: %s", err)
  6. return
  7. }
  8. state := channel.NewState(nil)
  9. ctx := context.Background()
  10. ctx = context.WithValue(ctx, "userId", userName)
  11. user := api.User{}
  12. output, errs := c.Synchronize(state, ctx)
  13. if err = <-errs; err != nil {
  14. t.Fatalf("Error performing synchronize: %s", err)
  15. return
  16. }
  17. for o := range output {
  18. switch oo := o.Data.(type) {
  19. case api.User:
  20. user = oo
  21. glog.Infof("we have a USER %s\n", user)
  22. default:
  23. t.Errorf("Encountered unexpected data type: %T", oo)
  24. }
  25. }
  26. }

这是被测试的方法。

  1. type github struct {
  2. client *api.Client
  3. }
  4. func newImplementation(t auth.UserToken) implementation.Implementation {
  5. return &github{client: api.NewClient(t)}
  6. }
  7. // -------------------------------------------------------------------------------------
  8. const (
  9. kLastUserFetch = "lastUserFetch"
  10. )
  11. type synchronizeFunc func(implementation.MutableState, chan *implementation.Output, context.Context) error
  12. // -------------------------------------------------------------------------------------
  13. func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
  14. output := make(chan *implementation.Output)
  15. errors := make(chan error, 1) // buffer allows preflight errors
  16. // Close output channels once we're done
  17. defer func() {
  18. go func() {
  19. // wg.Wait()
  20. close(errors)
  21. close(output)
  22. }()
  23. }()
  24. err := g.fetchUser(state, output, ctx)
  25. if err != nil {
  26. errors <- err
  27. }
  28. return output, errors
  29. }
  30. func (g *github) fetchUser(state implementation.MutableState, output chan *implementation.Output, ctx context.Context) error {
  31. var err error
  32. var user = api.User{}
  33. userId, _ := ctx.Value("userId").(string)
  34. user, err = g.client.GetUser(userId, ctx.Done())
  35. if err == nil {
  36. glog.Info("No error in fetchUser")
  37. output <- &implementation.Output{Data: user}
  38. state.SetTime(kLastUserFetch, time.Now())
  39. }
  40. return err
  41. }
  42. func (c *Client) GetUser(id string, quit <-chan struct{}) (user User, err error) {
  43. // Execute request
  44. var data []byte
  45. data, err = c.get("users/"+id, nil, quit)
  46. glog.Infof("USER DATA %s", data)
  47. // Parse response
  48. if err == nil && len(data) > 0 {
  49. err = json.Unmarshal(data, &user)
  50. data, _ = json.Marshal(user)
  51. }
  52. return
  53. }

这是控制台中的输出(大部分用户详细信息已删除)。

  1. I1228 13:25:05.291010 21313 client.go:177] GET https://api.github.com/users/xxxxxxxx
  2. I1228 13:25:06.010085 21313 client.go:36] USER DATA {"login":"xxxxxxxx","id":00000000,"avatar_url":"https://avatars.githubusercontent.com/u/0000000?v=3",...}
  3. I1228 13:25:06.010357 21313 github.go:90] No error in fetchUser

这是api包中的相关部分。

  1. package api
  2. type Client struct {
  3. authToken auth.UserToken
  4. http *http.Client
  5. }
  6. func NewClient(authToken auth.UserToken) *Client {
  7. return &Client{
  8. authToken: authToken,
  9. http: auth.NewClient(authToken),
  10. }
  11. }
  12. // -------------------------------------------------------------------------------------
  13. type User struct {
  14. Id int `json:"id,omitempty"`
  15. Username string `json:"login,omitempty"`
  16. Email string `json:"email,omitempty"`
  17. FullName string `json:"name,omitempty"`
  18. ProfilePicture string `json:"avatar_url,omitempty"`
  19. Bio string `json:"bio,omitempty"`
  20. Website string `json:"blog,omitempty"`
  21. Company string `json:"company,omitempty"`
  22. }

channel包:

  1. package channel
  2. type Channel struct {
  3. implementation.Descriptor
  4. imp implementation.Implementation
  5. }
  6. // New returns a channel implementation with a given name and auth token.
  7. func New(name string, token auth.UserToken) (*Channel, error) {
  8. if desc, ok := implementation.Lookup(name); ok {
  9. if imp := implementation.New(name, token); imp != nil {
  10. return &Channel{Descriptor: desc, imp: imp}, nil
  11. }
  12. }
  13. return nil, ErrInvalidChannel
  14. }

implementation包:

  1. package implementation
  2. import "golang.org/x/net/context"
  3. // -------------------------------------------------------------------------------------
  4. // Implementation is the interface implemented by subpackages.
  5. type Implementation interface {
  6. // Synchronize performs a synchronization using the given state. A context parameters
  7. // is provided to provide cancellation as well as implementation-specific behaviors.
  8. //
  9. // If a fatal error occurs (see package error definitions), the state can be discarded
  10. // to prevent the persistence of an invalid state.
  11. Synchronize(state MutableState, ctx context.Context) (<-chan *Output, <-chan error)
  12. // FetchDetails gets details for a given timeline item. Any changes to the TimelineItem
  13. // (including the Meta value) will be persisted.
  14. FetchDetails(item *TimelineItem, ctx context.Context) (interface{}, error)
  15. }

这是原始的Synchronize方法。在我的测试中,我删除了一些细节,以尝试简化问题。通过删除一个go func调用,我相信我引入了一个新的问题,可能会让事情变得混乱。

这是原始的Synchronize方法。其中涉及到一些Wait Groups和包含单个函数的函数数组,因为该方法最终将同步多个函数。

  1. func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
  2. wg := sync.WaitGroup{}
  3. output := make(chan *implementation.Output)
  4. errors := make(chan error, 1) // buffer allows preflight errors
  5. // Close output channels once we're done
  6. defer func() {
  7. go func() {
  8. wg.Wait()
  9. close(errors)
  10. close(output)
  11. }()
  12. }()
  13. // Perform fetch functions in separate routines
  14. funcs := []synchronizeFunc{
  15. g.fetchUser,
  16. }
  17. for _, f := range funcs {
  18. wg.Add(1)
  19. go func(f synchronizeFunc) {
  20. defer wg.Done()
  21. if err := f(state, output, ctx); err != nil {
  22. errors <- err
  23. }
  24. }(f)
  25. }
  26. glog.Info("after go sync...")
  27. return output, errors
  28. }
英文:

The following Golang test never exits. I suspect it has something to do with a channel deadlock but being a go-noob, I am not very certain.

  1. const userName = &quot;xxxxxxxxxxxx&quot;
  2. func TestSynchroninze(t *testing.T) {
  3. c, err := channel.New(github.ChannelName, authToken)
  4. if err != nil {
  5. t.Fatalf(&quot;Could not create channel: %s&quot;, err)
  6. return
  7. }
  8. state := channel.NewState(nil)
  9. ctx := context.Background()
  10. ctx = context.WithValue(ctx, &quot;userId&quot;, userName)
  11. user := api.User{}
  12. output, errs := c.Synchronize(state, ctx)
  13. if err = &lt;-errs; err != nil {
  14. t.Fatalf(&quot;Error performing synchronize: %s&quot;, err)
  15. return
  16. }
  17. for o := range output {
  18. switch oo := o.Data.(type) {
  19. case api.User:
  20. user = oo
  21. glog.Infof(&quot;we have a USER %s\n&quot;, user)
  22. default:
  23. t.Errorf(&quot;Encountered unexpected data type: %T&quot;, oo)
  24. }
  25. }
  26. }

Here are the methods being tested.

  1. type github struct {
  2. client *api.Client
  3. }
  4. func newImplementation(t auth.UserToken) implementation.Implementation {
  5. return &amp;github{client: api.NewClient(t)}
  6. }
  7. // -------------------------------------------------------------------------------------
  8. const (
  9. kLastUserFetch = &quot;lastUserFetch&quot;
  10. )
  11. type synchronizeFunc func(implementation.MutableState, chan *implementation.Output, context.Context) error
  12. // -------------------------------------------------------------------------------------
  13. func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (&lt;-chan *implementation.Output, &lt;-chan error) {
  14. output := make(chan *implementation.Output)
  15. errors := make(chan error, 1) // buffer allows preflight errors
  16. // Close output channels once we&#39;re done
  17. defer func() {
  18. go func() {
  19. // wg.Wait()
  20. close(errors)
  21. close(output)
  22. }()
  23. }()
  24. err := g.fetchUser(state, output, ctx)
  25. if err != nil {
  26. errors &lt;- err
  27. }
  28. return output, errors
  29. }
  30. func (g *github) fetchUser(state implementation.MutableState, output chan *implementation.Output, ctx context.Context) error {
  31. var err error
  32. var user = api.User{}
  33. userId, _ := ctx.Value(&quot;userId&quot;).(string)
  34. user, err = g.client.GetUser(userId, ctx.Done())
  35. if err == nil {
  36. glog.Info(&quot;No error in fetchUser&quot;)
  37. output &lt;- &amp;implementation.Output{Data: user}
  38. state.SetTime(kLastUserFetch, time.Now())
  39. }
  40. return err
  41. }
  42. func (c *Client) GetUser(id string, quit &lt;-chan struct{}) (user User, err error) {
  43. // Execute request
  44. var data []byte
  45. data, err = c.get(&quot;users/&quot;+id, nil, quit)
  46. glog.Infof(&quot;USER DATA %s&quot;, data)
  47. // Parse response
  48. if err == nil &amp;&amp; len(data) &gt; 0 {
  49. err = json.Unmarshal(data, &amp;user)
  50. data, _ = json.Marshal(user)
  51. }
  52. return
  53. }

Here is what I see in the console (most of the user details removed)

  1. I1228 13:25:05.291010 21313 client.go:177] GET https://api.github.com/users/xxxxxxxx
  2. I1228 13:25:06.010085 21313 client.go:36] USER DATA {&quot;login&quot;:&quot;xxxxxxxx&quot;,&quot;id&quot;:00000000,&quot;avatar_url&quot;:&quot;https://avatars.githubusercontent.com/u/0000000?v=3&quot;,...}
  3. I1228 13:25:06.010357 21313 github.go:90] No error in fetchUser

==========EDIT=============

Here is the relevant portion of the api package.

  1. package api
  2. type Client struct {
  3. authToken auth.UserToken
  4. http *http.Client
  5. }
  6. func NewClient(authToken auth.UserToken) *Client {
  7. return &amp;Client{
  8. authToken: authToken,
  9. http: auth.NewClient(authToken),
  10. }
  11. }
  12. // -------------------------------------------------------------------------------------
  13. type User struct {
  14. Id int `json:&quot;id,omitempty&quot;`
  15. Username string `json:&quot;login,omitempty&quot;`
  16. Email string `json:&quot;email,omitempty&quot;`
  17. FullName string `json:&quot;name,omitempty&quot;`
  18. ProfilePicture string `json:&quot;avatar_url,omitempty&quot;`
  19. Bio string `json:&quot;bio,omitempty&quot;`
  20. Website string `json:&quot;blog,omitempty&quot;`
  21. Company string `json:&quot;company,omitempty&quot;`
  22. }

And the channel package

  1. package channel
  2. type Channel struct {
  3. implementation.Descriptor
  4. imp implementation.Implementation
  5. }
  6. // New returns a channel implementation with a given name and auth token.
  7. func New(name string, token auth.UserToken) (*Channel, error) {
  8. if desc, ok := implementation.Lookup(name); ok {
  9. if imp := implementation.New(name, token); imp != nil {
  10. return &amp;Channel{Descriptor: desc, imp: imp}, nil
  11. }
  12. }
  13. return nil, ErrInvalidChannel
  14. }

and the implementation package...

  1. package implementation
  2. import &quot;golang.org/x/net/context&quot;
  3. // -------------------------------------------------------------------------------------
  4. // Implementation is the interface implemented by subpackages.
  5. type Implementation interface {
  6. // Synchronize performs a synchronization using the given state. A context parameters
  7. // is provided to provide cancellation as well as implementation-specific behaviors.
  8. //
  9. // If a fatal error occurs (see package error definitions), the state can be discarded
  10. // to prevent the persistence of an invalid state.
  11. Synchronize(state MutableState, ctx context.Context) (&lt;-chan *Output, &lt;-chan error)
  12. // FetchDetails gets details for a given timeline item. Any changes to the TimelineItem
  13. // (including the Meta value) will be persisted.
  14. FetchDetails(item *TimelineItem, ctx context.Context) (interface{}, error)
  15. }

======Edit #2=======

This is the original Synchronize method. I removed some details in my testing to try and simplify the problem. By removing a go func call, I believe I introduced a new problem which could be confusing things.

Here is the original Synchronize method. There are some things with Wait Groups and a function array containing a single function because this method will eventually be synchronizing multiple functions.

  1. func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (&lt;-chan *implementation.Output, &lt;-chan error) {
  2. wg := sync.WaitGroup{}
  3. output := make(chan *implementation.Output)
  4. errors := make(chan error, 1) // buffer allows preflight errors
  5. // Close output channels once we&#39;re done
  6. defer func() {
  7. go func() {
  8. wg.Wait()
  9. close(errors)
  10. close(output)
  11. }()
  12. }()
  13. // Perform fetch functions in separate routines
  14. funcs := []synchronizeFunc{
  15. g.fetchUser,
  16. }
  17. for _, f := range funcs {
  18. wg.Add(1)
  19. go func(f synchronizeFunc) {
  20. defer wg.Done()
  21. if err := f(state, output, ctx); err != nil {
  22. errors &lt;- err
  23. }
  24. }(f)
  25. }
  26. glog.Info(&quot;after go sync...&quot;)
  27. return output, errors
  28. }

答案1

得分: 1

我认为这两个问题出在以下代码中:

output <- &implementation.Output{Data: user}

该通道没有缓冲区。它会阻塞,直到其他goroutine从中读取。但在你的代码中,是同一个goroutine,所以它会阻塞。

第二个问题是:

// Close output channels once we're done
defer func() {
go func() {
// wg.Wait()

  1. close(errors)
  2. close(output)
  3. }()

}()

当该函数退出时,你启动了一个goroutine。该goroutine被调度,函数返回,但它从未调用该goroutine。

我建议将所有这些逻辑统一到一个函数中:

func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
output := make(chan *implementation.Output)
errors := make(chan error, 1) // 缓冲区允许预先出错

  1. go func() {
  2. defer close(output)
  3. defer close(errors)
  4. err := g.fetchUser(state, output, ctx)
  5. if err != nil {
  6. errors <- err
  7. }
  8. }()
  9. return output, errors

}

英文:

I think the two problems are in

  1. output &lt;- &amp;implementation.Output{Data: user}

the channel does not have a buffer. It will block until some other goroutine reads from it. But in your code is the same goroutine so it will block.

and second:

  1. // Close output channels once we&#39;re done
  2. defer func() {
  3. go func() {
  4. // wg.Wait()
  5. close(errors)
  6. close(output)
  7. }()
  8. }()

you launch a go routine when the routine exits. The goroutine is scheduled, the function returns but it never calls the goroutine.

I would suggest to unify all that logic in one:

  1. func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (&lt;-chan *implementation.Output, &lt;-chan error) {
  2. output := make(chan *implementation.Output)
  3. errors := make(chan error, 1) // buffer allows preflight errors
  4. go func() {
  5. defer close(output)
  6. defer close(errors)
  7. err := g.fetchUser(state, output, ctx)
  8. if err != nil {
  9. errors &lt;- err
  10. }
  11. }()
  12. return output, errors
  13. }

huangapple
  • 本文由 发表于 2015年12月29日 02:47:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/34498981.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定