英文:
Golang test with channels does not exit
问题
以下是翻译好的内容:
以下的Golang测试永远不会退出。我怀疑这与通道死锁有关,但作为一个Go新手,我不太确定。
const userName = "xxxxxxxxxxxx"
func TestSynchroninze(t *testing.T) {
c, err := channel.New(github.ChannelName, authToken)
if err != nil {
t.Fatalf("Could not create channel: %s", err)
return
}
state := channel.NewState(nil)
ctx := context.Background()
ctx = context.WithValue(ctx, "userId", userName)
user := api.User{}
output, errs := c.Synchronize(state, ctx)
if err = <-errs; err != nil {
t.Fatalf("Error performing synchronize: %s", err)
return
}
for o := range output {
switch oo := o.Data.(type) {
case api.User:
user = oo
glog.Infof("we have a USER %s\n", user)
default:
t.Errorf("Encountered unexpected data type: %T", oo)
}
}
}
这是被测试的方法。
type github struct {
client *api.Client
}
func newImplementation(t auth.UserToken) implementation.Implementation {
return &github{client: api.NewClient(t)}
}
// -------------------------------------------------------------------------------------
const (
kLastUserFetch = "lastUserFetch"
)
type synchronizeFunc func(implementation.MutableState, chan *implementation.Output, context.Context) error
// -------------------------------------------------------------------------------------
func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
output := make(chan *implementation.Output)
errors := make(chan error, 1) // buffer allows preflight errors
// Close output channels once we're done
defer func() {
go func() {
// wg.Wait()
close(errors)
close(output)
}()
}()
err := g.fetchUser(state, output, ctx)
if err != nil {
errors <- err
}
return output, errors
}
func (g *github) fetchUser(state implementation.MutableState, output chan *implementation.Output, ctx context.Context) error {
var err error
var user = api.User{}
userId, _ := ctx.Value("userId").(string)
user, err = g.client.GetUser(userId, ctx.Done())
if err == nil {
glog.Info("No error in fetchUser")
output <- &implementation.Output{Data: user}
state.SetTime(kLastUserFetch, time.Now())
}
return err
}
func (c *Client) GetUser(id string, quit <-chan struct{}) (user User, err error) {
// Execute request
var data []byte
data, err = c.get("users/"+id, nil, quit)
glog.Infof("USER DATA %s", data)
// Parse response
if err == nil && len(data) > 0 {
err = json.Unmarshal(data, &user)
data, _ = json.Marshal(user)
}
return
}
这是控制台中的输出(大部分用户详细信息已删除)。
I1228 13:25:05.291010 21313 client.go:177] GET https://api.github.com/users/xxxxxxxx
I1228 13:25:06.010085 21313 client.go:36] USER DATA {"login":"xxxxxxxx","id":00000000,"avatar_url":"https://avatars.githubusercontent.com/u/0000000?v=3",...}
I1228 13:25:06.010357 21313 github.go:90] No error in fetchUser
这是api
包中的相关部分。
package api
type Client struct {
authToken auth.UserToken
http *http.Client
}
func NewClient(authToken auth.UserToken) *Client {
return &Client{
authToken: authToken,
http: auth.NewClient(authToken),
}
}
// -------------------------------------------------------------------------------------
type User struct {
Id int `json:"id,omitempty"`
Username string `json:"login,omitempty"`
Email string `json:"email,omitempty"`
FullName string `json:"name,omitempty"`
ProfilePicture string `json:"avatar_url,omitempty"`
Bio string `json:"bio,omitempty"`
Website string `json:"blog,omitempty"`
Company string `json:"company,omitempty"`
}
channel
包:
package channel
type Channel struct {
implementation.Descriptor
imp implementation.Implementation
}
// New returns a channel implementation with a given name and auth token.
func New(name string, token auth.UserToken) (*Channel, error) {
if desc, ok := implementation.Lookup(name); ok {
if imp := implementation.New(name, token); imp != nil {
return &Channel{Descriptor: desc, imp: imp}, nil
}
}
return nil, ErrInvalidChannel
}
implementation
包:
package implementation
import "golang.org/x/net/context"
// -------------------------------------------------------------------------------------
// Implementation is the interface implemented by subpackages.
type Implementation interface {
// Synchronize performs a synchronization using the given state. A context parameters
// is provided to provide cancellation as well as implementation-specific behaviors.
//
// If a fatal error occurs (see package error definitions), the state can be discarded
// to prevent the persistence of an invalid state.
Synchronize(state MutableState, ctx context.Context) (<-chan *Output, <-chan error)
// FetchDetails gets details for a given timeline item. Any changes to the TimelineItem
// (including the Meta value) will be persisted.
FetchDetails(item *TimelineItem, ctx context.Context) (interface{}, error)
}
这是原始的Synchronize
方法。在我的测试中,我删除了一些细节,以尝试简化问题。通过删除一个go func
调用,我相信我引入了一个新的问题,可能会让事情变得混乱。
这是原始的Synchronize
方法。其中涉及到一些Wait Groups
和包含单个函数的函数数组,因为该方法最终将同步多个函数。
func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
wg := sync.WaitGroup{}
output := make(chan *implementation.Output)
errors := make(chan error, 1) // buffer allows preflight errors
// Close output channels once we're done
defer func() {
go func() {
wg.Wait()
close(errors)
close(output)
}()
}()
// Perform fetch functions in separate routines
funcs := []synchronizeFunc{
g.fetchUser,
}
for _, f := range funcs {
wg.Add(1)
go func(f synchronizeFunc) {
defer wg.Done()
if err := f(state, output, ctx); err != nil {
errors <- err
}
}(f)
}
glog.Info("after go sync...")
return output, errors
}
英文:
The following Golang test never exits. I suspect it has something to do with a channel deadlock but being a go-noob, I am not very certain.
const userName = "xxxxxxxxxxxx"
func TestSynchroninze(t *testing.T) {
c, err := channel.New(github.ChannelName, authToken)
if err != nil {
t.Fatalf("Could not create channel: %s", err)
return
}
state := channel.NewState(nil)
ctx := context.Background()
ctx = context.WithValue(ctx, "userId", userName)
user := api.User{}
output, errs := c.Synchronize(state, ctx)
if err = <-errs; err != nil {
t.Fatalf("Error performing synchronize: %s", err)
return
}
for o := range output {
switch oo := o.Data.(type) {
case api.User:
user = oo
glog.Infof("we have a USER %s\n", user)
default:
t.Errorf("Encountered unexpected data type: %T", oo)
}
}
}
Here are the methods being tested.
type github struct {
client *api.Client
}
func newImplementation(t auth.UserToken) implementation.Implementation {
return &github{client: api.NewClient(t)}
}
// -------------------------------------------------------------------------------------
const (
kLastUserFetch = "lastUserFetch"
)
type synchronizeFunc func(implementation.MutableState, chan *implementation.Output, context.Context) error
// -------------------------------------------------------------------------------------
func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
output := make(chan *implementation.Output)
errors := make(chan error, 1) // buffer allows preflight errors
// Close output channels once we're done
defer func() {
go func() {
// wg.Wait()
close(errors)
close(output)
}()
}()
err := g.fetchUser(state, output, ctx)
if err != nil {
errors <- err
}
return output, errors
}
func (g *github) fetchUser(state implementation.MutableState, output chan *implementation.Output, ctx context.Context) error {
var err error
var user = api.User{}
userId, _ := ctx.Value("userId").(string)
user, err = g.client.GetUser(userId, ctx.Done())
if err == nil {
glog.Info("No error in fetchUser")
output <- &implementation.Output{Data: user}
state.SetTime(kLastUserFetch, time.Now())
}
return err
}
func (c *Client) GetUser(id string, quit <-chan struct{}) (user User, err error) {
// Execute request
var data []byte
data, err = c.get("users/"+id, nil, quit)
glog.Infof("USER DATA %s", data)
// Parse response
if err == nil && len(data) > 0 {
err = json.Unmarshal(data, &user)
data, _ = json.Marshal(user)
}
return
}
Here is what I see in the console (most of the user details removed)
I1228 13:25:05.291010 21313 client.go:177] GET https://api.github.com/users/xxxxxxxx
I1228 13:25:06.010085 21313 client.go:36] USER DATA {"login":"xxxxxxxx","id":00000000,"avatar_url":"https://avatars.githubusercontent.com/u/0000000?v=3",...}
I1228 13:25:06.010357 21313 github.go:90] No error in fetchUser
==========EDIT=============
Here is the relevant portion of the api
package.
package api
type Client struct {
authToken auth.UserToken
http *http.Client
}
func NewClient(authToken auth.UserToken) *Client {
return &Client{
authToken: authToken,
http: auth.NewClient(authToken),
}
}
// -------------------------------------------------------------------------------------
type User struct {
Id int `json:"id,omitempty"`
Username string `json:"login,omitempty"`
Email string `json:"email,omitempty"`
FullName string `json:"name,omitempty"`
ProfilePicture string `json:"avatar_url,omitempty"`
Bio string `json:"bio,omitempty"`
Website string `json:"blog,omitempty"`
Company string `json:"company,omitempty"`
}
And the channel
package
package channel
type Channel struct {
implementation.Descriptor
imp implementation.Implementation
}
// New returns a channel implementation with a given name and auth token.
func New(name string, token auth.UserToken) (*Channel, error) {
if desc, ok := implementation.Lookup(name); ok {
if imp := implementation.New(name, token); imp != nil {
return &Channel{Descriptor: desc, imp: imp}, nil
}
}
return nil, ErrInvalidChannel
}
and the implementation
package...
package implementation
import "golang.org/x/net/context"
// -------------------------------------------------------------------------------------
// Implementation is the interface implemented by subpackages.
type Implementation interface {
// Synchronize performs a synchronization using the given state. A context parameters
// is provided to provide cancellation as well as implementation-specific behaviors.
//
// If a fatal error occurs (see package error definitions), the state can be discarded
// to prevent the persistence of an invalid state.
Synchronize(state MutableState, ctx context.Context) (<-chan *Output, <-chan error)
// FetchDetails gets details for a given timeline item. Any changes to the TimelineItem
// (including the Meta value) will be persisted.
FetchDetails(item *TimelineItem, ctx context.Context) (interface{}, error)
}
======Edit #2=======
This is the original Synchronize
method. I removed some details in my testing to try and simplify the problem. By removing a go func
call, I believe I introduced a new problem which could be confusing things.
Here is the original Synchronize method. There are some things with Wait Groups
and a function array containing a single function because this method will eventually be synchronizing multiple functions.
func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
wg := sync.WaitGroup{}
output := make(chan *implementation.Output)
errors := make(chan error, 1) // buffer allows preflight errors
// Close output channels once we're done
defer func() {
go func() {
wg.Wait()
close(errors)
close(output)
}()
}()
// Perform fetch functions in separate routines
funcs := []synchronizeFunc{
g.fetchUser,
}
for _, f := range funcs {
wg.Add(1)
go func(f synchronizeFunc) {
defer wg.Done()
if err := f(state, output, ctx); err != nil {
errors <- err
}
}(f)
}
glog.Info("after go sync...")
return output, errors
}
答案1
得分: 1
我认为这两个问题出在以下代码中:
output <- &implementation.Output{Data: user}
该通道没有缓冲区。它会阻塞,直到其他goroutine从中读取。但在你的代码中,是同一个goroutine,所以它会阻塞。
第二个问题是:
// Close output channels once we're done
defer func() {
go func() {
// wg.Wait()
close(errors)
close(output)
}()
}()
当该函数退出时,你启动了一个goroutine。该goroutine被调度,函数返回,但它从未调用该goroutine。
我建议将所有这些逻辑统一到一个函数中:
func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
output := make(chan *implementation.Output)
errors := make(chan error, 1) // 缓冲区允许预先出错
go func() {
defer close(output)
defer close(errors)
err := g.fetchUser(state, output, ctx)
if err != nil {
errors <- err
}
}()
return output, errors
}
英文:
I think the two problems are in
output <- &implementation.Output{Data: user}
the channel does not have a buffer. It will block until some other goroutine reads from it. But in your code is the same goroutine so it will block.
and second:
// Close output channels once we're done
defer func() {
go func() {
// wg.Wait()
close(errors)
close(output)
}()
}()
you launch a go routine when the routine exits. The goroutine is scheduled, the function returns but it never calls the goroutine.
I would suggest to unify all that logic in one:
func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
output := make(chan *implementation.Output)
errors := make(chan error, 1) // buffer allows preflight errors
go func() {
defer close(output)
defer close(errors)
err := g.fetchUser(state, output, ctx)
if err != nil {
errors <- err
}
}()
return output, errors
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论