测试Go中的Nats订阅

huangapple go评论71阅读模式
英文:

Testing Nats subscription in Go

问题

我有一个函数设计用来监听 Nats 主题并在接收到消息时路由它们:

func (conn *JetStreamConnection) SubscribeMultiple(ctx context.Context, subject string,
	subscribers ...*SubscriptionCallback) error {

	callbacks := make(map[string]func(*pnats.NatsMessage) (func(context.Context), error))
	for _, subscriber := range subscribers {
		callbacks[subscriber.Category] = subscriber.Callback
	}

	fullSubject := fmt.Sprintf("%s.*", subject)
	sub, err := conn.context.SubscribeSync(fullSubject, nats.Context(ctx))
	if err != nil {
        return err
	}

loop:
	for {

		select {
		case <-ctx.Done():
			break loop
		default:
		}

		msg, err := sub.NextMsgWithContext(ctx)
		if err != nil {
			return err
		}

		msg.InProgress()

		var message pnats.NatsMessage
		if err := conn.unmarshaller(msg.Data, &message); err != nil {
			msg.Term()
			return err
		}

		actualSubject := fmt.Sprintf("%s.%s", subject, message.Context.Category)
		subscriber, ok := callbacks[message.Context.Category]
		if !ok {
			msg.Nak()
			continue
		}

		callback, err := subscriber(&message)
		if err == nil {
			msg.Ack()
		} else {
			msg.Nak()
			return err
		}

		callback(ctx)
	}

	if err := sub.Unsubscribe(); err != nil {
		return err
	}

	return nil
}

我的问题是,由于 SubscribeSync 函数生成了一个 *nats.Subscription 对象,我无法对测试进行模拟。我该如何在这个对象周围进行测试?

英文:

I have a function designed to listen to a Nats subject and route the messages as it receives them:

func (conn *JetStreamConnection) SubscribeMultiple(ctx context.Context, subject string,
subscribers ...*SubscriptionCallback) error {
callbacks := make(map[string]func(*pnats.NatsMessage) (func(context.Context), error))
for _, subscriber := range subscribers {
callbacks[subscriber.Category] = subscriber.Callback
}
fullSubject := fmt.Sprintf(&quot;%s.*&quot;, subject)
sub, err := conn.context.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
return err
}
loop:
for {
select {
case &lt;-ctx.Done():
break loop
default:
}
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
return err
}
msg.InProgress()
var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &amp;message); err != nil {
msg.Term()
return err
}
actualSubject := fmt.Sprintf(&quot;%s.%s&quot;, subject, message.Context.Category)
subscriber, ok := callbacks[message.Context.Category]
if !ok {
msg.Nak()
continue
}
callback, err := subscriber(&amp;message)
if err == nil {
msg.Ack()
} else {
msg.Nak()
return err
}
callback(ctx)
}
if err := sub.Unsubscribe(); err != nil {
return err
}
return nil
}

My problem is that, since the SubscribeSync function produces a *nats.Subscription object, I have no way to mock out the test. How can I test around this object?

答案1

得分: 2

你可以将循环放在一个单独的函数中。这个函数可以接受一个描述nats Subscription的接口,而不是*nats.Subscription。这样你就可以使用gomock或其他工具创建Subscription的模拟对象。然后你可以单独测试内部函数。

类似这样的代码:

func (conn *JetStreamConnection) SubscribeMultiple(ctx context.Context, subject string,
    subscribers ...*SubscriptionCallback) error {

    callbacks := make(map[string]func(*pnats.NatsMessage) (func(context.Context), error))
    for _, subscriber := range subscribers {
        callbacks[subscriber.Category] = subscriber.Callback
    }

    fullSubject := fmt.Sprintf("%s.*", subject)
    sub, err := conn.context.SubscribeSync(fullSubject, nats.Context(ctx))
    if err != nil {
        return err
    }

    return run(ctx, sub)
}

//go:generate mockgen -source conn.go -destination ../mocks/conn.go -package mocks
type ISubscription interface{
    NextMsgWithContext(ctx context.Context) (*nats.Msg, error)
    Unsubscribe() error
}

func (conn *JetStreamConnection) run(ctx context.Context, sub ISubscription) error {
loop:
    for {

        select {
        case <-ctx.Done():
            break loop
        default:
        }

        msg, err := sub.NextMsgWithContext(ctx)
        if err != nil {
            return err
        }

        msg.InProgress()

        var message pnats.NatsMessage
        if err := conn.unmarshaller(msg.Data, &message); err != nil {
            msg.Term()
            return err
        }

        actualSubject := fmt.Sprintf("%s.%s", subject, message.Context.Category)
        subscriber, ok := callbacks[message.Context.Category]
        if !ok {
            msg.Nak()
            continue
        }

        callback, err := subscriber(&message)
        if err == nil {
            msg.Ack()
        } else {
            msg.Nak()
            return err
        }

        callback(ctx)
    }

    if err := sub.Unsubscribe(); err != nil {
        return err
    }
}

更新:如果你仍然想测试SubscribeMultiple函数,你可以创建一个只有一个Run函数的Runner结构体,并将其作为JetStreamConnection的依赖项。同样,你可以为Runner创建一个模拟对象,并使用它进行测试。

英文:

You can put your loop in a separate function. This func can accept an interface that describes nats Subscription instead of *nats.Subscription. This way you will be able to create Subscription mocks with gomock or other tools. After that you can test the inside func separately

Something like this:

func (conn *JetStreamConnection) SubscribeMultiple(ctx context.Context, subject string,
    subscribers ...*SubscriptionCallback) error {

    callbacks := make(map[string]func(*pnats.NatsMessage) (func(context.Context), error))
    for _, subscriber := range subscribers {
        callbacks[subscriber.Category] = subscriber.Callback
    }

    fullSubject := fmt.Sprintf(&quot;%s.*&quot;, subject)
    sub, err := conn.context.SubscribeSync(fullSubject, nats.Context(ctx))
    if err != nil {
        return err
    }

    return run(ctx, sub)
}

//go:generate mockgen -source conn.go -destination ../mocks/conn.go -package mocks
type ISubscription interface{
    NextMsgWithContext(ctx context.Context) (*nats.Msg, error)
    Unsubscribe() error
}

func (conn *JetStreamConnection) run(ctx context.Context, sub ISubscription) error {
loop:
    for {

        select {
        case &lt;-ctx.Done():
            break loop
        default:
        }

        msg, err := sub.NextMsgWithContext(ctx)
        if err != nil {
            return err
        }

        msg.InProgress()

        var message pnats.NatsMessage
        if err := conn.unmarshaller(msg.Data, &amp;message); err != nil {
            msg.Term()
            return err
        }

        actualSubject := fmt.Sprintf(&quot;%s.%s&quot;, subject, message.Context.Category)
        subscriber, ok := callbacks[message.Context.Category]
        if !ok {
            msg.Nak()
            continue
        }

        callback, err := subscriber(&amp;message)
        if err == nil {
            msg.Ack()
        } else {
            msg.Nak()
            return err
        }

        callback(ctx)
    }

    if err := sub.Unsubscribe(); err != nil {
        return err
    }
}

upd: if you still want to test SubscribeMultiple, you can create a Runner that will have only one func Run and take it as dependency for JetStreamConnection. Again, you can create a mock for Runner and test with it

huangapple
  • 本文由 发表于 2022年8月4日 15:06:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/73231636.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定