将BigQuery查询任务的结果传递给下一个任务,同时使用模板宏。

huangapple go评论67阅读模式
英文:

Passing results of BigQuery query task to the next task while using template macro

问题

这似乎是一个奇怪的问题,所以我确定我错过了一些东西。
不知何故,除非我使用函数执行提供和使用信息的任务并从PythonOperator中调用它们,否则我似乎无法传递值使用XCOM。
到目前为止,这很好。

但是现在我需要在SQL查询中使用执行日期。由于它嵌入在函数内,无法由Jinja解析。
我明白为什么{{ ds }} 宏在操作器之外不可用,我只是不知道如何在这种情况下解决这个问题?

我目前正在做的示例:

def get_some_values(**context):
    hook = BigQueryHook(use_legacy_sql=False)
    conn = hook.get_conn()
    cursor = conn.cursor()
    cursor.execute(
        "SELECT value1, value2, value3 FROM some_dataset.some_table__{{ ds }}"
    )
    results = cursor.fetchone()
    
    # 将结果存储在XCom中
    if results is not None:
        for i, result in enumerate(results):
            context['ti'].xcom_push(f'value{i+1}', result)

def send_slack_message(**context):
    # 从XCom中检索结果
    value1 = context['ti'].xcom_pull(key='value1')
    value2 = context['ti'].xcom_pull(key='value2')
    value3 = context['ti'].xcom_pull(key='value3') 

    slack_msg = """values returned: {}, {}, {} """.format(value1, value2, value3)
    
    send_slack_message = SlackWebhookOperator(
        task_id='slack_test',
        http_conn_id=SLACK_CONN_ID,
        webhook_token=slack_webhook_token,
        channel = '#some_channel',
        message=slack_msg,
        username='airflow',
        dag=dag,
    )
    send_slack_message.execute(context=context)

dag = DAG(
    'test',
    default_args=default_args,
    schedule_interval='0 12 * * *',
    catchup=False,
)
get_values_to_output = PythonOperator(
    task_id='get_values_to_output',
    python_callable=get_some_values,
    provide_context=True,
    dag=dag
)

send_slack_message = PythonOperator(
    task_id='send_slack_message',
    python_callable=send_slack_message,
    provide_context=True,
    dag=dag
)

在这种情况下,查询失败,因为它只想从some_table__{{ ds }} 表中选择。
如何在这里获取执行日期?或者如何在不使用函数的情况下将查询的值传递到下一个任务?
('current date' 不够好,因为我想能够执行回溯运行)

英文:

This seems a peculiar struggle, so I'm sure I'm missing something.
Somehow I can't seem to pass values using XCOM, unless I'm using functions to execute the tasks that provide and use the information and call them from PythonOperator.
This works, so far so good.

But now I need to use the execution date in the sql query. Since it's embedded within a function it isn't parsed by Jinja.
I get why the {{ ds }} macro is not available outside of the operators, I'm just struggling how to solve this in this case?

Example of what I'm doing currently:

def get_some_values(**context):
	hook = BigQueryHook(use_legacy_sql=False)
    conn = hook.get_conn()
    cursor = conn.cursor()
    cursor.execute(
        "SELECT value1, value2, value3 FROM some_dataset.some_table__{{ ds }}"
    )
    results = cursor.fetchone()
    
    # Store the results in XCom
    if results is not None:
        for i, result in enumerate(results):
            context['ti'].xcom_push(f'value{i+1}', result)

def send_slack_message(**context):
    # Retrieve the results from XCom
    value1 = context['ti'].xcom_pull(key='value1')
    value2 = context['ti'].xcom_pull(key='value2')
    value3 = context['ti'].xcom_pull(key='value3') 

    slack_msg = """values returned: {}, {}, {} """.format(value1, value2, value3)
    
    send_slack_message = SlackWebhookOperator(
        task_id='slack_test',
        http_conn_id=SLACK_CONN_ID,
        webhook_token=slack_webhook_token,
        channel = '#some_channel',
        message=slack_msg,
        username='airflow',
        dag=dag,
    )
    send_slack_message.execute(context=context)

dag = DAG(
    'test',
    default_args=default_args,
    schedule_interval='0 12 * * *',
    catchup=False,
)
get_values_to_output = PythonOperator(
    task_id='get_values_to_output',
    python_callable=get_some_values,
    provide_context=True,
    dag=dag
    )

send_slack_message = PythonOperator(
    task_id='send_slack_message',
    python_callable=send_slack_message,
    provide_context=True,
    dag=dag
)

In this case the query is failing cause it just wants to select from the some_table__{{ ds }} table.
how do I get the execution date in here? OR how do I pass values from a query to the next task without using a function?
('current date' is not good enough since I want to be able to do back runs)

答案1

得分: 2

python_callable 不是一个模板字段,我们无法渲染代码中的每一行,而是Airflow提供了上下文变量给您的函数,您可以访问它们来格式化您的字符串:

cursor.execute(
    f"SELECT value1, value2, value3 FROM some_dataset.some_table__{context['ds']}"
)
英文:

The python_callable is not a template field, where we cannot render every line in the code, but instead, Airflow provide the context variables to you function, and you can access them to format your string:

    cursor.execute(
        f"SELECT value1, value2, value3 FROM some_dataset.some_table__{context['ds']}"
    )

答案2

得分: 0

你可以使用Astro-SDK来获取有关SDK的更多信息,请查看文档。具体来说,你可以使用SDK的run_raw_sql()装饰器。run_raw_sql()是一个允许你运行SQL查询的装饰器(文档)。下面的代码对你应该有所帮助:

from astro import sql as aql

def send_slack_message(context):
   result = context['ti'].xcom_pull(task_ids='run_sql')
   slack_msg = """values returned: {}, {}, {} """.format(result[0],  result[1], result[2])

   send_slack_message = SlackWebhookOperator(
       task_id='slack_test',
       http_conn_id=SLACK_CONN_ID,
       webhook_token=slack_webhook_token,
       channel = '#some_channel',
       message=slack_msg,
       username='airflow',
       dag=dag,
   )
send_slack_message.execute(context=context)

@aql.run_raw_sql(task_id="run_sql")
def run_query(conn_id="bigquery_conn"):
    return """SELECT value1, value2, value3 FROM some_dataset.some_table__{{ ds }}"""

with DAG(
    'test',
    default_args=default_args,
    schedule_interval='0 12 * * *',
    catchup=False,
)
    send_slack_message = PythonOperator(
        task_id='send_slack_message',
        python_callable=send_slack_message,
        provide_context=True,
        dag=dag
    )
    raw_sql_query() >> send_slack_message

另外,你也可以直接在DAG中使用SlackWebhookOperator

英文:

Hey @Chrisvdberge you can use Astro-SDK for more info on the SDK check docs. Specifically, you can use SDK's run_raw_sql() decorator. run_raw_sql() is a decorator which allows you to run SQL queries(doc). Something like below should work for you.

from astro import sql as aql

def send_slack_message(context):
   result = context['ti'].xcom_pull(task_ids='run_sql')
   slack_msg = """values returned: {}, {}, {} """.format(result[0],  result[1], result[2])

   send_slack_message = SlackWebhookOperator(
       task_id='slack_test',
       http_conn_id=SLACK_CONN_ID,
       webhook_token=slack_webhook_token,
       channel = '#some_channel',
       message=slack_msg,
       username='airflow',
       dag=dag,
   )
send_slack_message.execute(context=context)

@aql.run_raw_sql(task_id="run_sql")
def run_query(conn_id="bigquery_conn"):
    return """SELECT value1, value2, value3 FROM some_dataset.some_table__{{ ds }}"""

with DAG(
    'test',
    default_args=default_args,
    schedule_interval='0 12 * * *',
    catchup=False,
)
    send_slack_message = PythonOperator(
        task_id='send_slack_message',
        python_callable=send_slack_message,
        provide_context=True,
        dag=dag
    )
    raw_sql_query() >> send_slack_message

Also, you can probably use SlackWebhookOperator directly inside a dag.

huangapple
  • 本文由 发表于 2023年2月16日 18:25:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/75470878.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定