MWAA – 从一个环境中触发另一个环境的特定Dag任务。

huangapple go评论73阅读模式
英文:

MWAA - Trigger a specific Dag Task from a environment using a task from other environment

问题

I’m trying to execute a task from one MWAA environment to another MWAA environment. If I need to execute the whole dag it’s simple and I can do it using this code:

尝试从一个MWAA环境执行任务到另一个MWAA环境。如果需要执行整个DAG,可以使用以下代码:

@task()
def invoke_dag_task():
client = boto3.client('mwaa')
token = client.create_cli_token(Name='my-environment')
url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
body = 'dags trigger test'
headers = {
'Authorization' : 'Bearer ' + token['CliToken'],
'Content-Type': 'text/plain'
}
requests.post(url, data=body, headers=headers)

This code above is working perfectly. To execute only a specific task I’m using this one:

上述代码完美运行。要执行特定任务,我使用以下代码:

@task()
def invoke_dag_task():
client = boto3.client('mwaa')
token = client.create_cli_token(Name='my-environment')
url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
body = 'tasks test test task_1 2023-05-17T16:48:03Z'
headers = {
'Authorization' : 'Bearer ' + token['CliToken'],
'Content-Type': 'text/plain'
}
requests.post(url, data=body, headers=headers)

I don’t get any error but the task is not triggered. do you know why?

我没有收到任何错误,但任务未触发。你知道为什么吗?

Thanks!

英文:

I’m trying to execute a task from one MWAA environment to another MWAA environment. If I need to execute the whole dag it’s simple and I can do it using this code:

@task()
def invoke_dag_task():
    client = boto3.client('mwaa')
    token = client.create_cli_token(Name='my-environment')
    url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
    body = 'dags trigger test'
    headers = {
        'Authorization' : 'Bearer ' + token['CliToken'],
        'Content-Type': 'text/plain'
    }
    requests.post(url, data=body, headers=headers)

This code above is working perfectly. To execute only a specific task I’m using this one:

@task()
def invoke_dag_task():
    client = boto3.client('mwaa')
    token = client.create_cli_token(Name='my-environment')
    url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
    body = 'tasks test test task_1 2023-05-17T16:48:03Z'
    headers = {
        'Authorization' : 'Bearer ' + token['CliToken'],
        'Content-Type': 'text/plain'
    }
    requests.post(url, data=body, headers=headers)

I don’t get any error but the task is not triggered. do you know why?
Thanks!

答案1

得分: 1

请注意,MWAA不支持所有Airflow命令行命令。看起来您上面定义的任务受支持,但这些不会触发任务。

您可以使用其他命令,并且我在博客文章中提供了一个示例,如下所示:

import json
import requests
import boto3
import base64

print('Loading function')

def lambda_handler(event, context):

    print("Connecting to MWaa environment" + ": " +  event['environment'])
    client = boto3.client('mwaa')
    response = client.create_cli_token(Name=str(event['environment']))

    print("Using this command" + ": " +  event['command'])

    auth_token=response.get('CliToken')
    hed = {'Content-Type': 'text/plain', 'Authorization': 'Bearer ' + auth_token}
    data = str(args.command)
    url = 'https://{web_server}/aws_mwaa/cli'.format(web_server=response.get('WebServerHostname'))
    r = requests.post(url, data=data, headers=hed)
    output = base64.b64decode(r.json()['stdout']).decode('utf8')
    print(output)

    return event['environment']  # Echo back the first key value

然后,您可以使用以下负载触发它:

{
  "environment": "{您的环境}",
  "command": "{Airflow命令}"
}
英文:

Bear in mind that MWAA does not support all Airflow cli commands. It looks like the tasks you have defined above are supported but these will not trigger a task.

You can use other commands and I put together an example in a blog post which looks like:

import json
import requests
import boto3
import base64

print('Loading function')


def lambda_handler(event, context):

    print("Connecting to MWaa environment" + ": " +  event['environment'])
    client = boto3.client('mwaa')
    response = client.create_cli_token(Name=str( event['environment']))

    print("Using this command" + ": " +  event['command'])

    auth_token=response.get('CliToken')
    hed = {'Content-Type': 'text/plain', 'Authorization': 'Bearer ' + auth_token}
    data = str(args.command)
    url = 'https://{web_server}/aws_mwaa/cli'.format(web_server=response.get('WebServerHostname'))
    r = requests.post(url, data=data, headers=hed)
    output = base64.b64decode(r.json()['stdout']).decode('utf8')
    print(output)

    return event['environment']  # Echo back the first key value

which you then trigger with a payload

{
  "environment": "{your environment}",
  "command": "{airflow command}"
}

huangapple
  • 本文由 发表于 2023年5月18日 00:56:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/76274497.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定