英文:
MWAA - Trigger a specific Dag Task from a environment using a task from other environment
问题
I’m trying to execute a task from one MWAA environment to another MWAA environment. If I need to execute the whole dag it’s simple and I can do it using this code:
尝试从一个MWAA环境执行任务到另一个MWAA环境。如果需要执行整个DAG,可以使用以下代码:
@task()
def invoke_dag_task():
client = boto3.client('mwaa')
token = client.create_cli_token(Name='my-environment')
url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
body = 'dags trigger test'
headers = {
'Authorization' : 'Bearer ' + token['CliToken'],
'Content-Type': 'text/plain'
}
requests.post(url, data=body, headers=headers)
This code above is working perfectly. To execute only a specific task I’m using this one:
上述代码完美运行。要执行特定任务,我使用以下代码:
@task()
def invoke_dag_task():
client = boto3.client('mwaa')
token = client.create_cli_token(Name='my-environment')
url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
body = 'tasks test test task_1 2023-05-17T16:48:03Z'
headers = {
'Authorization' : 'Bearer ' + token['CliToken'],
'Content-Type': 'text/plain'
}
requests.post(url, data=body, headers=headers)
I don’t get any error but the task is not triggered. do you know why?
我没有收到任何错误,但任务未触发。你知道为什么吗?
Thanks!
英文:
I’m trying to execute a task from one MWAA environment to another MWAA environment. If I need to execute the whole dag it’s simple and I can do it using this code:
@task()
def invoke_dag_task():
client = boto3.client('mwaa')
token = client.create_cli_token(Name='my-environment')
url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
body = 'dags trigger test'
headers = {
'Authorization' : 'Bearer ' + token['CliToken'],
'Content-Type': 'text/plain'
}
requests.post(url, data=body, headers=headers)
This code above is working perfectly. To execute only a specific task I’m using this one:
@task()
def invoke_dag_task():
client = boto3.client('mwaa')
token = client.create_cli_token(Name='my-environment')
url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
body = 'tasks test test task_1 2023-05-17T16:48:03Z'
headers = {
'Authorization' : 'Bearer ' + token['CliToken'],
'Content-Type': 'text/plain'
}
requests.post(url, data=body, headers=headers)
I don’t get any error but the task is not triggered. do you know why?
Thanks!
答案1
得分: 1
请注意,MWAA不支持所有Airflow命令行命令。看起来您上面定义的任务受支持,但这些不会触发任务。
您可以使用其他命令,并且我在博客文章中提供了一个示例,如下所示:
import json
import requests
import boto3
import base64
print('Loading function')
def lambda_handler(event, context):
print("Connecting to MWaa environment" + ": " + event['environment'])
client = boto3.client('mwaa')
response = client.create_cli_token(Name=str(event['environment']))
print("Using this command" + ": " + event['command'])
auth_token=response.get('CliToken')
hed = {'Content-Type': 'text/plain', 'Authorization': 'Bearer ' + auth_token}
data = str(args.command)
url = 'https://{web_server}/aws_mwaa/cli'.format(web_server=response.get('WebServerHostname'))
r = requests.post(url, data=data, headers=hed)
output = base64.b64decode(r.json()['stdout']).decode('utf8')
print(output)
return event['environment'] # Echo back the first key value
然后,您可以使用以下负载触发它:
{
"environment": "{您的环境}",
"command": "{Airflow命令}"
}
英文:
Bear in mind that MWAA does not support all Airflow cli commands. It looks like the tasks you have defined above are supported but these will not trigger a task.
You can use other commands and I put together an example in a blog post which looks like:
import json
import requests
import boto3
import base64
print('Loading function')
def lambda_handler(event, context):
print("Connecting to MWaa environment" + ": " + event['environment'])
client = boto3.client('mwaa')
response = client.create_cli_token(Name=str( event['environment']))
print("Using this command" + ": " + event['command'])
auth_token=response.get('CliToken')
hed = {'Content-Type': 'text/plain', 'Authorization': 'Bearer ' + auth_token}
data = str(args.command)
url = 'https://{web_server}/aws_mwaa/cli'.format(web_server=response.get('WebServerHostname'))
r = requests.post(url, data=data, headers=hed)
output = base64.b64decode(r.json()['stdout']).decode('utf8')
print(output)
return event['environment'] # Echo back the first key value
which you then trigger with a payload
{
"environment": "{your environment}",
"command": "{airflow command}"
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论