Cadence throwing WorkflowRejectedExecutionError when executing workflow with many child workflows/activities

huangapple go评论77阅读模式
英文:

Cadence throwing WorkflowRejectedExecutionError when executing workflow with many child workflows/activities

问题

我正在评估使用 Cadence 执行长时间运行的批量操作。我有以下(Kotlin)代码:

class UpdateNameBulkWorkflowImpl : UpdateNameBulkWorkflow {

    private val changeNamePromises = mutableListOf<Promise<ChangeNameResult>>()

    override fun updateNames(newName: String, entityIds: Collection<String>) {
        entityIds.forEach { entityId ->
            val childWorkflow = Workflow.newChildWorkflowStub(
                    UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
            )
            val promise = Async.function(childWorkflow::setName, newName, entityId)

            changeNamePromises.add(promise)
        }

        val allDone = Promise.allOf(changeNamePromises)
        allDone.get()
    }

    class UpdateNameSingleWorkflowImpl : UpdateNameBulkWorkflow.UpdateNameSingleWorkflow {
        override fun setName(newName: String, entityId: String): SetNameResult {
            return Async.function(activities::setName, newName, entityId).get()
        }
    }
}

对于较小数量的实体,这可以正常工作,但我很快遇到以下异常:

java.lang.RuntimeException: Failure processing decision task. WorkflowID=b5327d20-6ea6-4aba-b863-2165cb21e038, RunID=c85e2278-e483-4c81-8def-f0cc0bd309fd
	at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:283)
    ...
Caused by: com.uber.cadence.internal.sync.WorkflowRejectedExecutionError: java.util.concurrent.RejectedExecutionException: ...
    ...

看起来我很快就耗尽了线程池,Cadence 无法安排新的任务。

我通过将 updateNames 的定义更改为以下内容来解决这个问题:

override fun updateNames(newName: String, entityIds: Collection<String>) {
    entityIds.chunked(200).forEach { sublist ->
        val promises = sublist.map { entityId ->
            val childWorkflow = Workflow.newChildWorkflowStub(
                    UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
            )
            Async.function(childWorkflow::setName, newName, entityId)
        }

        val allDone = Promise.allOf(promises)
        allDone.get()
    }
}

这基本上将项目分批处理,每次处理200个,在移动到下一个批之前等待每个批完成。我对这样做的性能有些担忧(一个批中的单个错误会在重试期间停止处理后续批中的所有记录)。我还担心在发生崩溃事件时,Cadence 如何恢复此函数的进度。

我的问题是:是否有一种符合 Cadence 习惯的方法来做到这一点,而不会立即耗尽资源?我是否在使用错误的技术,还是这只是一种天真的方法?

英文:

I am evaluating the use of Cadence for performing long-running bulk actions. I have the following (Kotlin) code:

class UpdateNameBulkWorkflowImpl : UpdateNameBulkWorkflow {

    private val changeNamePromises = mutableListOf&lt;Promise&lt;ChangeNameResult&gt;&gt;()

    override fun updateNames(newName: String, entityIds: Collection&lt;String&gt;) {
        entityIds.forEach { entityId -&gt;
            val childWorkflow = Workflow.newChildWorkflowStub(
                    UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
            )
            val promise = Async.function(childWorkflow::setName, newName, entityId)

            changeNamePromises.add(promise)
        }

        val allDone = Promise.allOf(changeNamePromises)
        allDone.get()
    }

    class UpdateNameSingleWorkflowImpl : UpdateNameBulkWorkflow.UpdateNameSingleWorkflow {
        override fun setName(newName: String, entityId: String): SetNameResult {
            return Async.function(activities::setName, newName, entityId).get()
        }
    }
}

This works fine for smaller numbers of entities, but I quickly run into the following exception:

java.lang.RuntimeException: Failure processing decision task. WorkflowID=b5327d20-6ea6-4aba-b863-2165cb21e038, RunID=c85e2278-e483-4c81-8def-f0cc0bd309fd
	at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:283) ~[cadence-client-2.7.4.jar:na]
	at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:229) ~[cadence-client-2.7.4.jar:na]
	at com.uber.cadence.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:76) ~[cadence-client-2.7.4.jar:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.uber.cadence.internal.sync.WorkflowRejectedExecutionError: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7f17a605[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@7fa9f240[Wrapped task = com.uber.cadence.internal.sync.WorkflowThreadImpl$RunnableWrapper@1a27000b]] rejected from java.util.concurrent.ThreadPoolExecutor@22188bd0[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 2400]
	at com.uber.cadence.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:281) ~[cadence-client-2.7.4.jar:na]
	at com.uber.cadence.internal.sync.AsyncInternal.execute(AsyncInternal.java:300) ~[cadence-client-2.7.4.jar:na]
	at com.uber.cadence.internal.sync.AsyncInternal.function(AsyncInternal.java:111) ~[cadence-client-2.7.4.jar:na]
...

It appears that I am quickly exhausting the thread pool and Cadence is unable to schedule new tasks.

I have worked around this by changing the definition of updateNames to:

    override fun updateNames(newName: String, entityIds: Collection&lt;String&gt;) {

        entityIds.chunked(200).forEach { sublist -&gt;
            val promises = sublist.map { entityId -&gt;
                val childWorkflow = Workflow.newChildWorkflowStub(
                        UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
                )
                Async.function(childWorkflow::setName, newName, entityId)
            }

            val allDone = Promise.allOf(promises)
            allDone.get()
        }
    }

This basically processes the items in chunks of 200, and waits for each chunk to complete before moving onto the next one. I have concerns with how well this will perform (a single error in a chunk will stop processing of all records in the following chunks while it is retried). I'm also concerned with how well Cadence will be able to recover the progress of this function in the event of a crash.

My question is: Is there an idiomatic Cadence way of doing this that doesn't cause this immediate resource exhaustion? Am I using the wrong technology or is this just a naive approach?

答案1

得分: 1

Cadence工作流在单个工作流运行的大小上有相对较小的限制。它随着并行工作流运行的数量而扩展。因此,在单个工作流中执行大量任务是一种反模式。

惯用的解决方案是:

  • 运行有限大小的一块任务,然后调用“作为新任务继续”。这样,单个运行的大小受到限制。
  • 使用分层工作流。一个单一的父工作流,带有1,000个子工作流,每个子工作流执行1,000个活动,允许执行100万个活动,从而保持每个工作流历史记录的大小受限。
英文:

Cadence workflow has relatively small limit on the size of a single workflow run. It scales out with the number of parallel workflow runs. So executing very large number of tasks in a single workflow is an anti-pattern.

The idiomatic solutions are:

  • Run a chank of limited size and then call continue as new. This way a single run size is bounded.
  • Use hierarchical workflows. A single parent with 1k children each executing 1k activities allows executing 1 million activities keeping each workflow history size bounded.

huangapple
  • 本文由 发表于 2020年8月20日 21:15:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/63505935.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定