如何在Temporal Python-SDK中编写一个临时工作拦截器类

huangapple go评论67阅读模式
英文:

How to write a temporal worker Interceptor class in Temporal Python-SDK

问题

I am trying to implement an interceptor class for our temporal workflows written in Python. But since the documentation is not completely available, I have tried multiple ways to achieve it. But no success till now.

Below are some sample code for the temporal activities, workflows, interceptor class, worker and workflow executor.

activities.py

from temporalio import activity

@activity.defn
async def activity1():
    print("Hi from activity 1")

@activity.defn
async def activity2():
    print("Hi from activity 2")

workflow.py

from datetime import timedelta
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from mactivities import activity1, activity2

@workflow.defn
class MyWorkflow1:
    @workflow.run
    async def run(self):
        output = await workflow.execute_activity(activity1, start_to_close_timeout=timedelta(5))
        return True

@workflow.defn
class MyWorkflow2:
    @workflow.run
    async def run(self):
        output = await workflow.execute_activity(activity2, start_to_close_timeout=timedelta(5))
        return True

my_interceptor.py

from temporalio.worker import Interceptor

class MyInterceptor(Interceptor):  # NEED HELP IN THIS CLASS
    def __init__(self, next):
        super().__init__(next)

    async def intercept_activity(self, input):
        print("I am printing before activity execution start")
        return await self.next.intercept_activity(input)

    async def intercept_workflow_execution(self, input):
        print("Doing something before a workflow execution start")
        return await self.next.intercept_workflow_execution(input)

run_workflows.py

import asyncio
from temporalio.client import Client
from workflows import MyWorkflow1, MyWorkflow2

async def main():
    client = await Client.connect("localhost:7233")

    await client.execute_workflow(
                  MyWorkflow1.run,
                  id="123",
                  task_queue="MY_QUEUE"
                   )

    await client.execute_workflow(
                  MyWorkflow2.run,
                  id="123",
                  task_queue="MY_QUEUE"
                   )
    return True

if __name__=="__main__":
    asyncio.run(main())

run_worker.py

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker

from activities import activity1, activity2
from workflow import MyWorkflow1, MyWorkflow2
from my_interceptor import MyInterceptor  # <--- NEED HELP IN IMPLEMENTING THIS CLASS

async def main():
    client = await Client.connect("localhost:7233")
    worker = Worker(
             client,
             task_queue="MY_QUEUE",
             workflows=[MyWorkflow1, MyWorkflow2],
             activities=[activity1, activity2],
             interceptors=[MyInterceptor]  # <--- NEED HELP IN IMPLEMENTING THIS CLASS
              )
     await worker.run()

if __name__=="__main__":
    asyncio.run(main())

If I use the Interceptor class I wrote, I am getting exceptions. e.g. workflow_interceptor_class required a parameter "input", but while the worker initialized, it is not available. My activities are failing with a message "Completing activity as failed" etc.

Basically the way of implementation of the interceptor class is wrong.

What I want to achieve ?

  1. Print the name of the workflow before starting the execution of that workflow.
  2. I will keep some information (arguments passed while executing) in a variable for future use.
英文:

I am trying to implement an interceptor class for our temporal workflows written in Python. But since the documentation is not completely available, I have tried multiple ways to achieve it. But no success till now.

Below are some sample code for the temporal activities, workflows, interceptor class, worker and workflow executor.

activities.py

from temporalio import activity


@activity.defn
async def activity1():
    print(&quot;Hi from activity 1&quot;)

@activity.defn
async def activity2():
    print(&quot;Hi from activity 2&quot;)

workflow.py

from datetime import timedelta
from temporalio import workflow


with workflow.unsafe.imports_passed_through():
    from mactivities import activity1, activity2


@workflow.defn
class MyWorkflow1:

    @workflow.run
    async def run(self):
        output = await workflow.execute_activity(activity1, start_to_close_timeout=timedelta(5))
        return True



@workflow.defn
class MyWorkflow2:

    @workflow.run
    async def run(self):
        output = await workflow.execute_activity(activity2, start_to_close_timeout=timedelta(5))
        return True


my_interceptor.py

from temporalio.worker import Interceptor


class MyInterceptor(Interceptor):  # NEED HELP IN THIS CLASS
    def __init__(self, next):
        super().__init__(next)

    async def intercept_activity(self, input):
        print(&quot;I am printing before activity execution start&quot;)
        return await.self.next.intercept_activity(input)

    async workflow_interceptor_class(self, input):
        print(&quot;Doing something before a workflow execution start&quot;)
        return None

run_workflows.py

import asyncio
from temporalio.client import Client
from workflows import MyWorkflow1, MyWorkflow2

async def main():
    client = await Client.connect(&quot;localhost:7233&quot;)

    await client.execute_workflow(
                  MyWorkflow1.run,
                  id=&quot;123&quot;,
                  task_queue=&quot;MY_QUEUE&quot;
                   )

    await client.execute_workflow(
                  MyWorkflow2.run,
                  id=&quot;123&quot;,
                  task_queue=&quot;MY_QUEUE&quot;
                   )
    return True


if __name__==&quot;__main__&quot;:
    asyncio.run(main())

run_worker.py

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker

from activities import activity1, activity2
from workflow import MyWorkflow1, MyWorkflow2
from my_interceptor import MyInterceptor  # &lt;--- NEED HELP IN IMPLEMENTING THIS CLASS

async def main():
    client = await Client.connect(&quot;localhost:7233&quot;)
    worker = Worker(
             client,
             task_queue=&quot;MY_QUEUE&quot;,
             workflows=[MyWorkflow1, MyWorkflow2],
             activities=[activity1, activity2],
             interceptors=[MyInterceptor]  # &lt;--- NEED HELP IN IMPLEMENTING THIS CLASS
              )
     await worker.run()


if __name__==&quot;__main__&quot;:
    asyncio.run(main())


If I use the Interceptor class I wrote, I am getting exceptions. e.g. workflow_interceptor_class required a parameter "input", but while the worker initialized, it is not available. My activities are failing with a message "Completing activity as failed" etc.

Basically the way of implementation of the interceptor class in wrong.

What I want to achieve ?

  1. Print the name of the workflow before starting the execution of that workflow.
  2. I will keep some informations (arguments passed while executing) in a variable for future use.

答案1

得分: 2

首先,您可能希望查看以下示例,了解如何在Temporal Python SDK中实现活动和工作流拦截器:

根据您提供的代码片段,我认为您可能忽略了一个非常重要的细节:您提供给WorkerInterceptor实现实际上并不会单独处理工作流/活动调用的拦截。

相反,您提供的Interceptor对象将用于构建一系列具体的拦截器。因此,您的Interceptor类的职责如下:

  • intercept_activity(): 实例化、链接并返回一个扩展了ActivityInboundInterceptor的对象;如果您不希望拦截活动,则简单地返回next即可;
  • workflow_interceptor_class(): 返回一个扩展了WorkflowInboundInterceptor的类;如果您不想拦截工作流,则返回None

因此,拦截器的完整实现通常会包含两到三个类,例如:

  • MyInterceptor扩展了Interceptor
  • MyActivityInboundInterceptor扩展了ActivityInboundInterceptor
  • MyWorkflowInboundInterceptor扩展了WorkflowInboundInterceptor

一个简单的MyInterceptor的实现会像这样:

class MyInterceptor(Interceptor):
    def intercept_activity(
        self, next: ActivityInboundInterceptor
    ) -> ActivityInboundInterceptor:
        return MyActivityInboundInterceptor(next)

    def workflow_interceptor_class(
        self, input: WorkflowInterceptorClassInput
    ) -> Optional[Type[WorkflowInboundInterceptor]]:
        return MyWorkflowInboundInterceptor

现在,说了这么多,我认为其余的部分应该会更容易理解,如果您对某些方面需要更多细节,请添加一条评论。

顺便说一下,您可能会认为这些操作很不直观、复杂且无用,但我们不要止步于此,接受这背后有很充分的理由。

英文:

First of all, you may want to have a look at the following for examples on how to implement Activity and Workflow interceptors in the Temporal Python SDK:

  • A sample from the Temporal's Python samples repository;
  • Interceptors test file from the Temporal Python SDK's test suit;
  • The OpenTelemetry Interceptor from the Contribution directory in the Temporal Python SDK repo (note that this last one is significantly more complicated than the other two).

Based on code snippets you provided, I think you may have missed a very important detail: the implementation of Interceptor that you provide to Worker will not actually handle the interception of Workflow/Activity calls by itself.

Instead, the Interceptor object you provide will be used to build a chain of concrete interceptors. The responsibility of your Interceptor class are thus the following:

  • intercept_activity(): Instantiate, chain and return an object that extends ActivityInboundInterceptor; simply return next if you do not want to intercept activities;
  • workflow_interceptor_class(): Return a class that extends WorkflowInboundInterceptor; return None if you do not want to intercept Workflows.

Consequently, the complete implementation of an Interceptor will generally contain two or three classes, which could for example be named as so:

  • MyInterceptor which extends Interceptor
  • MyActivityInboundInterceptor which extends ActivityInboundInterceptor
  • MyWorkflowInboundInterceptor which extends WorkflowInboundInterceptor

A simple implementation of MyInterceptor would look something like this:

class MyInterceptor(Interceptor):
    def intercept_activity(
        self, next: ActivityInboundInterceptor
    ) -&gt; ActivityInboundInterceptor:
        return MyActivityInboundInterceptor(next)

    def workflow_interceptor_class(
        self, input: WorkflowInterceptorClassInput
    ) -&gt; Optional[Type[WorkflowInboundInterceptor]]:
        return MyWorkflowInboundInterceptor

Now, that being said, I think the rest of it should fall into place much more easily, but just add a comment if you need more details on something.

> By the way, you may think that all this gymnastic is unintuitive, complex and
> useless, but let's not stop at this and just accept that there are
> strong reasons for this.

huangapple
  • 本文由 发表于 2023年6月16日 02:31:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76484558.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定