英文:
How to execute the same workflow with cadence RegisterDelayedCallback in unit tests?
问题
使用RegisterDelayedCallback
运行相同节奏的工作流程时,是否可以运行单元测试?
我有以下代码,它运行工作流程两次,第一次执行保存回调令牌,第二次执行检索保存的令牌以异步完成活动。
workflow.go
package workflow
import (
"context"
"encoding/base64"
"fmt"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/workflow"
)
type WorkflowImpl struct {
worker.Worker
client.Client
}
func (w WorkflowImpl) TActivity(ctx context.Context, action string) error {
fmt.Println("TActivity started", action)
if action != "Approved" {
activityInfo := activity.GetInfo(ctx)
callbackToken := base64.StdEncoding.EncodeToString(activityInfo.TaskToken)
fmt.Println("save callbackToken", callbackToken)
// 保存回调令牌。
return activity.ErrResultPending
}
fmt.Println("Approved")
// 做一些已批准的事情。
// 获取保存的回调令牌。
// 使用保存的回调令牌调用 w.CompleteActivity()。
return nil
}
func (w WorkflowImpl) TWorkflow(ctx workflow.Context, action string) (result string, err error) {
fmt.Println("TWorkflow started", action)
waitChannel := workflow.NewChannel(ctx)
workflow.Go(ctx, func(ctx workflow.Context) {
if err := workflow.ExecuteActivity(ctx, w.TActivity, action).Get(ctx, nil); err != nil {
// 什么都不做,保持工作流程开放。
return
}
waitChannel.Send(ctx, "OK")
})
var signal string
waitChannel.Receive(ctx, &signal)
return signal, nil
}
workflow_test.go
package workflow_test
import (
"time"
"go.uber.org/cadence/worker"
)
func (s *UnitTestSuite) Test_TWorkflow() {
env := s.NewTestWorkflowEnvironment()
worker := workflow.WorkflowImpl{
Worker: ...
Client: ...
}
s.worker = &worker
env.RegisterActivity(s.worker.TActivity)
// 延迟第二个 TWorkflow。
env.RegisterDelayedCallback(func() {
env.ExecuteWorkflow(s.worker.TWorkflow, "Approved")
}, time.Second*2)
env.ExecuteWorkflow(s.worker.TWorkflow, "Noop")
s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())
}
上述代码不完整,它没有保存回调令牌和调用 CompleteActivity。为了测试顺序,我只期望看到工作流程开始和活动开始两次的日志,但我没有看到。在第一个工作流程开始后,没有任何活动的日志,测试就会挂起,直到超时。
缺少什么或者是否可能像这样执行两次相同的工作流程?
英文:
Is it possible to run unit tests with RegisterDelayedCallback
that executes the same cadence workflow?
I have the following code that runs a workflow twice, the first execution saves a callback token and the second execution retrieves the saved token to complete the activity asynchronously.
workflow.go
package workflow
import (
"context"
"encoding/base64"
"fmt"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/workflow"
)
type WorkflowImpl struct {
worker.Worker
client.Client
}
func (w WorkflowImpl) TActivity(ctx context.Context, action string) error {
fmt.Println("TActivity started", action)
if action != "Approved" {
activityInfo := activity.GetInfo(ctx)
callbackToken := base64.StdEncoding.EncodeToString(activityInfo.TaskToken)
fmt.Println("save callbackToken", callbackToken)
// Saves callbackToken.
return activity.ErrResultPending
}
fmt.Println("Approved")
// Do some approved things.
// Get saved callback token.
// Call w.CompleteActivity() with the saved callback token.
return nil
}
func (w WorkflowImpl) TWorkflow(ctx workflow.Context, action string) (result string, err error) {
fmt.Println("TWorkflow started", action)
waitChannel := workflow.NewChannel(ctx)
workflow.Go(ctx, func(ctx workflow.Context) {
if err := workflow.ExecuteActivity(ctx, w.TActivity, action).Get(ctx, nil); err != nil {
// Do nothing, keep workflow open.
return
}
waitChannel.Send(ctx, "OK")
})
var signal string
waitChannel.Receive(ctx, &signal)
return signal, nil
}
workflow_test.go
package workflow_test
import (
"time"
"go.uber.org/cadence/worker"
)
func (s *UnitTestSuite) Test_TWorkflow() {
env := s.NewTestWorkflowEnvironment()
worker := workflow.WorkflowImpl{
Worker: ...
Client: ...
}
s.worker = &worker
env.RegisterActivity(s.worker.TActivity)
// Delay second TWorkflow.
env.RegisterDelayedCallback(func() {
env.ExecuteWorkflow(s.worker.TWorkflow, "Approved")
}, time.Second*2)
env.ExecuteWorkflow(s.worker.TWorkflow, "Noop")
s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())
}
The above code is not complete, it doesn't save the callback token and call CompleteActivity. For the purpose of testing the sequence I just expect to see the logs of the workflow started and the activity started twice, but I am not seeing that. After the first workflow started, and without logs for any activity, the test hanged until timeout.
What is missing or is it possible to execute the same workflow twice like this?
答案1
得分: 2
env.RegisterDelayedCallback(func() {
go env.ExecuteWorkflow(s.worker.TWorkflow, "Approved")
}, time.Second*2)
这里存在一个死锁问题。当回调函数运行时,env
被锁定(参见源代码)。而回调函数想要在同一个env
上执行一个工作流,这需要获取env
上的相同锁(参见源代码)。
让我们尝试通过在新的goroutine中运行回调函数来解决死锁问题:
env.RegisterDelayedCallback(func() {
go env.ExecuteWorkflow(s.worker.TWorkflow, "Approved")
}, time.Second*2)
现在我们得到了一个panic:
panic: Current TestWorkflowEnvironment is used to execute s.worker.TWorkflow. Please create a new TestWorkflowEnvironment for s.worker.TWorkflow.
目前,TestWorkflowEnvironment
无法运行两个不是父子关系的工作流。请参阅跟踪任务的问题Make TestWorkflowEnvironment support test multiple workflows。
正如panic消息所建议的,您必须为执行另一个工作流创建一个新的TestWorkflowEnvironment
(但我不确定它是否适用于您的用例)。
英文:
env.RegisterDelayedCallback(func() {
env.ExecuteWorkflow(s.worker.TWorkflow, "Approved")
}, time.Second*2)
There is a deadlock here. The env
is locked when a callback is running (see the source code). And the callback wants to execute a workflow on the same env
, which needs to acquire the same lock on the env
(see the source code).
Let's try to break the deadlock by running the callback in a new goroutine:
env.RegisterDelayedCallback(func() {
go env.ExecuteWorkflow(s.worker.TWorkflow, "Approved")
}, time.Second*2)
Now we get a panic:
panic: Current TestWorkflowEnvironment is used to execute s.worker.TWorkflow. Please create a new TestWorkflowEnvironment for s.worker.TWorkflow.
Currently, the TestWorkflowEnvironment
can not run 2 workflows that are not parent-child. See the issue that tracks the task to Make TestWorkflowEnvironment support test multiple workflows.
As the panic message suggested, you have to create a new TestWorkflowEnvironment
to execute another workflow (but I'm not sure does it work for your use case).
答案2
得分: 1
你可以创建一个仅用于测试的工作流,该工作流会调用你要测试的两个工作流作为子工作流。
英文:
You can create a test-only workflow that invokes both workflows you are testing as child workflows.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论