Golang测试使用通道不退出。

huangapple go评论89阅读模式
英文:

Golang test with channels does not exit

问题

以下是翻译好的内容:

以下的Golang测试永远不会退出。我怀疑这与通道死锁有关,但作为一个Go新手,我不太确定。

const userName = "xxxxxxxxxxxx"

func TestSynchroninze(t *testing.T) {
    c, err := channel.New(github.ChannelName, authToken)
    if err != nil {
        t.Fatalf("Could not create channel: %s", err)
        return
    }

    state := channel.NewState(nil)
    ctx := context.Background()
    ctx = context.WithValue(ctx, "userId", userName)
    user := api.User{}

    output, errs := c.Synchronize(state, ctx)

    if err = <-errs; err != nil {
        t.Fatalf("Error performing synchronize: %s", err)
        return
    }

    for o := range output {
        switch oo := o.Data.(type) {
        case api.User:
            user = oo
            glog.Infof("we have a USER %s\n", user)
        default:
            t.Errorf("Encountered unexpected data type: %T", oo)
        }
    }
}

这是被测试的方法。

type github struct {
    client *api.Client
}

func newImplementation(t auth.UserToken) implementation.Implementation {
    return &github{client: api.NewClient(t)}
}

// -------------------------------------------------------------------------------------

const (
    kLastUserFetch = "lastUserFetch"
)

type synchronizeFunc func(implementation.MutableState, chan *implementation.Output, context.Context) error

// -------------------------------------------------------------------------------------

func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
    output := make(chan *implementation.Output)
    errors := make(chan error, 1) // buffer allows preflight errors

    // Close output channels once we're done
    defer func() {
        go func() {
            // wg.Wait()

            close(errors)
            close(output)
        }()
    }()

    err := g.fetchUser(state, output, ctx)
    if err != nil {
        errors <- err
    }

    return output, errors
}

func (g *github) fetchUser(state implementation.MutableState, output chan *implementation.Output, ctx context.Context) error {
    var err error

    var user = api.User{}
    userId, _ := ctx.Value("userId").(string)
    user, err = g.client.GetUser(userId, ctx.Done())

    if err == nil {
        glog.Info("No error in fetchUser")
        output <- &implementation.Output{Data: user}
        state.SetTime(kLastUserFetch, time.Now())
    }

    return err
}

func (c *Client) GetUser(id string, quit <-chan struct{}) (user User, err error) {
    // Execute request
    var data []byte
    data, err = c.get("users/"+id, nil, quit)
    glog.Infof("USER DATA %s", data)

    // Parse response
    if err == nil && len(data) > 0 {
        err = json.Unmarshal(data, &user)

        data, _ = json.Marshal(user)
    }

    return
}

这是控制台中的输出(大部分用户详细信息已删除)。

I1228 13:25:05.291010   21313 client.go:177] GET https://api.github.com/users/xxxxxxxx
I1228 13:25:06.010085   21313 client.go:36] USER DATA {"login":"xxxxxxxx","id":00000000,"avatar_url":"https://avatars.githubusercontent.com/u/0000000?v=3",...}
I1228 13:25:06.010357   21313 github.go:90] No error in fetchUser

这是api包中的相关部分。

package api

type Client struct {
    authToken auth.UserToken
    http      *http.Client
}

func NewClient(authToken auth.UserToken) *Client {
    return &Client{
        authToken: authToken,
        http:      auth.NewClient(authToken),
    }
}



// -------------------------------------------------------------------------------------
type User struct {
    Id             int    `json:"id,omitempty"`
    Username       string `json:"login,omitempty"`
    Email          string `json:"email,omitempty"`
    FullName       string `json:"name,omitempty"`
    ProfilePicture string `json:"avatar_url,omitempty"`
    Bio            string `json:"bio,omitempty"`
    Website        string `json:"blog,omitempty"`
    Company        string `json:"company,omitempty"`
}

channel包:

package channel

type Channel struct {
    implementation.Descriptor
    imp implementation.Implementation
}

// New returns a channel implementation with a given name and auth token.
func New(name string, token auth.UserToken) (*Channel, error) {
    if desc, ok := implementation.Lookup(name); ok {
        if imp := implementation.New(name, token); imp != nil {
            return &Channel{Descriptor: desc, imp: imp}, nil
        }
    }

    return nil, ErrInvalidChannel
}

implementation包:

package implementation

import "golang.org/x/net/context"

// -------------------------------------------------------------------------------------

// Implementation is the interface implemented by subpackages.
type Implementation interface {
    // Synchronize performs a synchronization using the given state. A context parameters
    // is provided to provide cancellation as well as implementation-specific behaviors.
    //
    // If a fatal error occurs (see package error definitions), the state can be discarded
    // to prevent the persistence of an invalid state.
    Synchronize(state MutableState, ctx context.Context) (<-chan *Output, <-chan error)

    // FetchDetails gets details for a given timeline item. Any changes to the TimelineItem
    // (including the Meta value) will be persisted.
    FetchDetails(item *TimelineItem, ctx context.Context) (interface{}, error)
}

这是原始的Synchronize方法。在我的测试中,我删除了一些细节,以尝试简化问题。通过删除一个go func调用,我相信我引入了一个新的问题,可能会让事情变得混乱。

这是原始的Synchronize方法。其中涉及到一些Wait Groups和包含单个函数的函数数组,因为该方法最终将同步多个函数。

func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
    wg := sync.WaitGroup{}
    output := make(chan *implementation.Output)
    errors := make(chan error, 1) // buffer allows preflight errors

    // Close output channels once we're done
    defer func() {
        go func() {
            wg.Wait()

            close(errors)
            close(output)
        }()
    }()

    // Perform fetch functions in separate routines
    funcs := []synchronizeFunc{
        g.fetchUser,
    }

    for _, f := range funcs {
        wg.Add(1)

        go func(f synchronizeFunc) {
            defer wg.Done()

            if err := f(state, output, ctx); err != nil {
                errors <- err
            }
        }(f)
    }

    glog.Info("after go sync...")

    return output, errors
}
英文:

The following Golang test never exits. I suspect it has something to do with a channel deadlock but being a go-noob, I am not very certain.

const userName = &quot;xxxxxxxxxxxx&quot;
func TestSynchroninze(t *testing.T) {
c, err := channel.New(github.ChannelName, authToken)
if err != nil {
t.Fatalf(&quot;Could not create channel: %s&quot;, err)
return
}
state := channel.NewState(nil)
ctx := context.Background()
ctx = context.WithValue(ctx, &quot;userId&quot;, userName)
user := api.User{}
output, errs := c.Synchronize(state, ctx)
if err = &lt;-errs; err != nil {
t.Fatalf(&quot;Error performing synchronize: %s&quot;, err)
return
}
for o := range output {
switch oo := o.Data.(type) {
case api.User:
user = oo
glog.Infof(&quot;we have a USER %s\n&quot;, user)
default:
t.Errorf(&quot;Encountered unexpected data type: %T&quot;, oo)
}
}
}

Here are the methods being tested.

type github struct {
client *api.Client
}
func newImplementation(t auth.UserToken) implementation.Implementation {
return &amp;github{client: api.NewClient(t)}
}
// -------------------------------------------------------------------------------------
const (
kLastUserFetch = &quot;lastUserFetch&quot;
)
type synchronizeFunc func(implementation.MutableState, chan *implementation.Output, context.Context) error
// -------------------------------------------------------------------------------------
func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (&lt;-chan *implementation.Output, &lt;-chan error) {
output := make(chan *implementation.Output)
errors := make(chan error, 1) // buffer allows preflight errors
// Close output channels once we&#39;re done
defer func() {
go func() {
// wg.Wait()
close(errors)
close(output)
}()
}()
err := g.fetchUser(state, output, ctx)
if err != nil {
errors &lt;- err
}
return output, errors
}
func (g *github) fetchUser(state implementation.MutableState, output chan *implementation.Output, ctx context.Context) error {
var err error
var user = api.User{}
userId, _ := ctx.Value(&quot;userId&quot;).(string)
user, err = g.client.GetUser(userId, ctx.Done())
if err == nil {
glog.Info(&quot;No error in fetchUser&quot;)
output &lt;- &amp;implementation.Output{Data: user}
state.SetTime(kLastUserFetch, time.Now())
}
return err
}
func (c *Client) GetUser(id string, quit &lt;-chan struct{}) (user User, err error) {
// Execute request
var data []byte
data, err = c.get(&quot;users/&quot;+id, nil, quit)
glog.Infof(&quot;USER DATA %s&quot;, data)
// Parse response
if err == nil &amp;&amp; len(data) &gt; 0 {
err = json.Unmarshal(data, &amp;user)
data, _ = json.Marshal(user)
}
return
}

Here is what I see in the console (most of the user details removed)

I1228 13:25:05.291010   21313 client.go:177] GET https://api.github.com/users/xxxxxxxx
I1228 13:25:06.010085   21313 client.go:36] USER DATA {&quot;login&quot;:&quot;xxxxxxxx&quot;,&quot;id&quot;:00000000,&quot;avatar_url&quot;:&quot;https://avatars.githubusercontent.com/u/0000000?v=3&quot;,...}
I1228 13:25:06.010357   21313 github.go:90] No error in fetchUser

==========EDIT=============

Here is the relevant portion of the api package.

package api
type Client struct {
authToken auth.UserToken
http      *http.Client
}
func NewClient(authToken auth.UserToken) *Client {
return &amp;Client{
authToken: authToken,
http:      auth.NewClient(authToken),
}
}
// -------------------------------------------------------------------------------------
type User struct {
Id             int    `json:&quot;id,omitempty&quot;`
Username       string `json:&quot;login,omitempty&quot;`
Email          string `json:&quot;email,omitempty&quot;`
FullName       string `json:&quot;name,omitempty&quot;`
ProfilePicture string `json:&quot;avatar_url,omitempty&quot;`
Bio            string `json:&quot;bio,omitempty&quot;`
Website        string `json:&quot;blog,omitempty&quot;`
Company        string `json:&quot;company,omitempty&quot;`
}

And the channel package

package channel
type Channel struct {
implementation.Descriptor
imp implementation.Implementation
}
// New returns a channel implementation with a given name and auth token.
func New(name string, token auth.UserToken) (*Channel, error) {
if desc, ok := implementation.Lookup(name); ok {
if imp := implementation.New(name, token); imp != nil {
return &amp;Channel{Descriptor: desc, imp: imp}, nil
}
}
return nil, ErrInvalidChannel
}

and the implementation package...

package implementation
import &quot;golang.org/x/net/context&quot;
// -------------------------------------------------------------------------------------
// Implementation is the interface implemented by subpackages.
type Implementation interface {
// Synchronize performs a synchronization using the given state. A context parameters
// is provided to provide cancellation as well as implementation-specific behaviors.
//
// If a fatal error occurs (see package error definitions), the state can be discarded
// to prevent the persistence of an invalid state.
Synchronize(state MutableState, ctx context.Context) (&lt;-chan *Output, &lt;-chan error)
// FetchDetails gets details for a given timeline item. Any changes to the TimelineItem
// (including the Meta value) will be persisted.
FetchDetails(item *TimelineItem, ctx context.Context) (interface{}, error)
}

======Edit #2=======

This is the original Synchronize method. I removed some details in my testing to try and simplify the problem. By removing a go func call, I believe I introduced a new problem which could be confusing things.

Here is the original Synchronize method. There are some things with Wait Groups and a function array containing a single function because this method will eventually be synchronizing multiple functions.

func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (&lt;-chan *implementation.Output, &lt;-chan error) {
wg := sync.WaitGroup{}
output := make(chan *implementation.Output)
errors := make(chan error, 1) // buffer allows preflight errors
// Close output channels once we&#39;re done
defer func() {
go func() {
wg.Wait()
close(errors)
close(output)
}()
}()
// Perform fetch functions in separate routines
funcs := []synchronizeFunc{
g.fetchUser,
}
for _, f := range funcs {
wg.Add(1)
go func(f synchronizeFunc) {
defer wg.Done()
if err := f(state, output, ctx); err != nil {
errors &lt;- err
}
}(f)
}
glog.Info(&quot;after go sync...&quot;)
return output, errors
}

答案1

得分: 1

我认为这两个问题出在以下代码中:

output <- &implementation.Output{Data: user}

该通道没有缓冲区。它会阻塞,直到其他goroutine从中读取。但在你的代码中,是同一个goroutine,所以它会阻塞。

第二个问题是:

// Close output channels once we're done
defer func() {
go func() {
// wg.Wait()

    close(errors)
close(output)
}()

}()

当该函数退出时,你启动了一个goroutine。该goroutine被调度,函数返回,但它从未调用该goroutine。

我建议将所有这些逻辑统一到一个函数中:

func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
output := make(chan *implementation.Output)
errors := make(chan error, 1) // 缓冲区允许预先出错

go func() {
defer close(output)
defer close(errors)
err := g.fetchUser(state, output, ctx)
if err != nil {
errors <- err
}
}()
return output, errors

}

英文:

I think the two problems are in

     output &lt;- &amp;implementation.Output{Data: user}

the channel does not have a buffer. It will block until some other goroutine reads from it. But in your code is the same goroutine so it will block.

and second:

    // Close output channels once we&#39;re done
defer func() {
go func() {
// wg.Wait()
close(errors)
close(output)
}()
}()

you launch a go routine when the routine exits. The goroutine is scheduled, the function returns but it never calls the goroutine.

I would suggest to unify all that logic in one:

 func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (&lt;-chan *implementation.Output, &lt;-chan error) {
output := make(chan *implementation.Output)
errors := make(chan error, 1) // buffer allows preflight errors
go func() {
defer close(output)
defer close(errors)
err := g.fetchUser(state, output, ctx)
if err != nil {
errors &lt;- err
}
}()        
return output, errors
}

huangapple
  • 本文由 发表于 2015年12月29日 02:47:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/34498981.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定