英文:
How to write a temporal worker Interceptor class in Temporal Python-SDK
问题
I am trying to implement an interceptor class for our temporal workflows written in Python. But since the documentation is not completely available, I have tried multiple ways to achieve it. But no success till now.
Below are some sample code for the temporal activities, workflows, interceptor class, worker and workflow executor.
activities.py
from temporalio import activity
@activity.defn
async def activity1():
print("Hi from activity 1")
@activity.defn
async def activity2():
print("Hi from activity 2")
workflow.py
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from mactivities import activity1, activity2
@workflow.defn
class MyWorkflow1:
@workflow.run
async def run(self):
output = await workflow.execute_activity(activity1, start_to_close_timeout=timedelta(5))
return True
@workflow.defn
class MyWorkflow2:
@workflow.run
async def run(self):
output = await workflow.execute_activity(activity2, start_to_close_timeout=timedelta(5))
return True
my_interceptor.py
from temporalio.worker import Interceptor
class MyInterceptor(Interceptor): # NEED HELP IN THIS CLASS
def __init__(self, next):
super().__init__(next)
async def intercept_activity(self, input):
print("I am printing before activity execution start")
return await self.next.intercept_activity(input)
async def intercept_workflow_execution(self, input):
print("Doing something before a workflow execution start")
return await self.next.intercept_workflow_execution(input)
run_workflows.py
import asyncio
from temporalio.client import Client
from workflows import MyWorkflow1, MyWorkflow2
async def main():
client = await Client.connect("localhost:7233")
await client.execute_workflow(
MyWorkflow1.run,
id="123",
task_queue="MY_QUEUE"
)
await client.execute_workflow(
MyWorkflow2.run,
id="123",
task_queue="MY_QUEUE"
)
return True
if __name__=="__main__":
asyncio.run(main())
run_worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from activities import activity1, activity2
from workflow import MyWorkflow1, MyWorkflow2
from my_interceptor import MyInterceptor # <--- NEED HELP IN IMPLEMENTING THIS CLASS
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="MY_QUEUE",
workflows=[MyWorkflow1, MyWorkflow2],
activities=[activity1, activity2],
interceptors=[MyInterceptor] # <--- NEED HELP IN IMPLEMENTING THIS CLASS
)
await worker.run()
if __name__=="__main__":
asyncio.run(main())
If I use the Interceptor class I wrote, I am getting exceptions. e.g. workflow_interceptor_class required a parameter "input", but while the worker initialized, it is not available. My activities are failing with a message "Completing activity as failed" etc.
Basically the way of implementation of the interceptor class is wrong.
What I want to achieve ?
- Print the name of the workflow before starting the execution of that workflow.
- I will keep some information (arguments passed while executing) in a variable for future use.
英文:
I am trying to implement an interceptor class for our temporal workflows written in Python. But since the documentation is not completely available, I have tried multiple ways to achieve it. But no success till now.
Below are some sample code for the temporal activities, workflows, interceptor class, worker and workflow executor.
activities.py
from temporalio import activity
@activity.defn
async def activity1():
print("Hi from activity 1")
@activity.defn
async def activity2():
print("Hi from activity 2")
workflow.py
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from mactivities import activity1, activity2
@workflow.defn
class MyWorkflow1:
@workflow.run
async def run(self):
output = await workflow.execute_activity(activity1, start_to_close_timeout=timedelta(5))
return True
@workflow.defn
class MyWorkflow2:
@workflow.run
async def run(self):
output = await workflow.execute_activity(activity2, start_to_close_timeout=timedelta(5))
return True
my_interceptor.py
from temporalio.worker import Interceptor
class MyInterceptor(Interceptor): # NEED HELP IN THIS CLASS
def __init__(self, next):
super().__init__(next)
async def intercept_activity(self, input):
print("I am printing before activity execution start")
return await.self.next.intercept_activity(input)
async workflow_interceptor_class(self, input):
print("Doing something before a workflow execution start")
return None
run_workflows.py
import asyncio
from temporalio.client import Client
from workflows import MyWorkflow1, MyWorkflow2
async def main():
client = await Client.connect("localhost:7233")
await client.execute_workflow(
MyWorkflow1.run,
id="123",
task_queue="MY_QUEUE"
)
await client.execute_workflow(
MyWorkflow2.run,
id="123",
task_queue="MY_QUEUE"
)
return True
if __name__=="__main__":
asyncio.run(main())
run_worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from activities import activity1, activity2
from workflow import MyWorkflow1, MyWorkflow2
from my_interceptor import MyInterceptor # <--- NEED HELP IN IMPLEMENTING THIS CLASS
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="MY_QUEUE",
workflows=[MyWorkflow1, MyWorkflow2],
activities=[activity1, activity2],
interceptors=[MyInterceptor] # <--- NEED HELP IN IMPLEMENTING THIS CLASS
)
await worker.run()
if __name__=="__main__":
asyncio.run(main())
If I use the Interceptor class I wrote, I am getting exceptions. e.g. workflow_interceptor_class required a parameter "input", but while the worker initialized, it is not available. My activities are failing with a message "Completing activity as failed" etc.
Basically the way of implementation of the interceptor class in wrong.
What I want to achieve ?
- Print the name of the workflow before starting the execution of that workflow.
- I will keep some informations (arguments passed while executing) in a variable for future use.
答案1
得分: 2
首先,您可能希望查看以下示例,了解如何在Temporal Python SDK中实现活动和工作流拦截器:
- Temporal Python 示例库中的一个样例;
- Temporal Python SDK测试套件中的拦截器测试文件;
- Temporal Python SDK仓库中Contribution目录中的OpenTelemetry拦截器(请注意,这个比其他两个要复杂得多)。
根据您提供的代码片段,我认为您可能忽略了一个非常重要的细节:您提供给Worker
的Interceptor
实现实际上并不会单独处理工作流/活动调用的拦截。
相反,您提供的Interceptor
对象将用于构建一系列具体的拦截器。因此,您的Interceptor
类的职责如下:
intercept_activity()
: 实例化、链接并返回一个扩展了ActivityInboundInterceptor
的对象;如果您不希望拦截活动,则简单地返回next
即可;workflow_interceptor_class()
: 返回一个扩展了WorkflowInboundInterceptor
的类;如果您不想拦截工作流,则返回None
。
因此,拦截器的完整实现通常会包含两到三个类,例如:
MyInterceptor
扩展了Interceptor
MyActivityInboundInterceptor
扩展了ActivityInboundInterceptor
MyWorkflowInboundInterceptor
扩展了WorkflowInboundInterceptor
一个简单的MyInterceptor
的实现会像这样:
class MyInterceptor(Interceptor):
def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
return MyActivityInboundInterceptor(next)
def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
return MyWorkflowInboundInterceptor
现在,说了这么多,我认为其余的部分应该会更容易理解,如果您对某些方面需要更多细节,请添加一条评论。
顺便说一下,您可能会认为这些操作很不直观、复杂且无用,但我们不要止步于此,接受这背后有很充分的理由。
英文:
First of all, you may want to have a look at the following for examples on how to implement Activity and Workflow interceptors in the Temporal Python SDK:
- A sample from the Temporal's Python samples repository;
- Interceptors test file from the Temporal Python SDK's test suit;
- The OpenTelemetry Interceptor from the Contribution directory in the Temporal Python SDK repo (note that this last one is significantly more complicated than the other two).
Based on code snippets you provided, I think you may have missed a very important detail: the implementation of Interceptor
that you provide to Worker
will not actually handle the interception of Workflow/Activity calls by itself.
Instead, the Interceptor
object you provide will be used to build a chain of concrete interceptors. The responsibility of your Interceptor
class are thus the following:
intercept_activity()
: Instantiate, chain and return an object that extendsActivityInboundInterceptor
; simply returnnext
if you do not want to intercept activities;workflow_interceptor_class()
: Return a class that extendsWorkflowInboundInterceptor
; returnNone
if you do not want to intercept Workflows.
Consequently, the complete implementation of an Interceptor will generally contain two or three classes, which could for example be named as so:
MyInterceptor
which extendsInterceptor
MyActivityInboundInterceptor
which extendsActivityInboundInterceptor
MyWorkflowInboundInterceptor
which extendsWorkflowInboundInterceptor
A simple implementation of MyInterceptor
would look something like this:
class MyInterceptor(Interceptor):
def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
return MyActivityInboundInterceptor(next)
def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
return MyWorkflowInboundInterceptor
Now, that being said, I think the rest of it should fall into place much more easily, but just add a comment if you need more details on something.
> By the way, you may think that all this gymnastic is unintuitive, complex and
> useless, but let's not stop at this and just accept that there are
> strong reasons for this.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论