Airflow – 为Big Query结果集的每一行进行HTTP请求

huangapple go评论60阅读模式
英文:

Airflow - Make http request for each row of Big Query Result Set

问题

我想使用Airflow DAG执行GCP Big Query,并针对结果集的每一行向一个端点发出请求。随着这个http请求的返回,我希望将其存储在GCS中,作为外部表。

我没有找到将GCP Big Query的结果集传递给其他运算符并对其进行迭代的方法。我唯一找到的方法是使用Python运算符,但我认为有更好的方法来完成这个任务。

英文:

I would like to use Airflow DAG to execute a GCP Big Query, and for each row of the result set, I will make a request to an endpoint. With the return of this http request, I would like to store it in GCS, as external table.

I didn't find a way to pass the result set of GCP Big query to other operator and iterate over it. The only way that I find, is to use a Python Operator, but I supposed that there is a better way to do that.

答案1

得分: 1

解决方案 1

我认为对于这种需求和用例,最简单的解决方案是在PythonOperator中执行不同的操作,就像你提到的那样。

PythonOperator中,你可以使用Python BigQuery客户端来执行BigQuery作业。

你可以在一个DictList中检索结果,然后在你的端点中针对列表中的每个元素(或者可能更优化的方式)启动API请求,然后再次使用Python客户端将结果存储在GCS中作为外部表。

解决方案 2

你还可以混合常规运算符与PythonOperator,并使用xcom从前一个运算符中检索结果,例如:

import airflow
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

def call_api(**kwargs):
    ti = kwargs['ti']
    query_results = ti.xcom_pull(task_ids='your_query')

    # 根据查询结果进行 API 调用

with airflow.DAG(
        "dag_id",
        default_args={},
        schedule_interval=None) as dag:
    your_query_task = BigQueryInsertJobOperator(
        task_id='your_query',
        configuration={
            "query": {
                "query": 'your_query',
                "useLegacySql": False,
            }
        },
        location='EU'
    )
    start_dag = DummyOperator(task_id='OK', dag=dag)

    api_call_task = PythonOperator(
        task_id="save_file_bq",
        op_kwargs={
            'dataset': 'dataset',
            'table': 'table'
        },
        python_callable=call_api
    )

    # DAG 的其余逻辑用于将结果上传到 GCS 作为外部表...

    start_dag >> api_call_task

至于DAG中的其余逻辑,你可以决定哪种方法对你来说最合适:

  • PythonOperator中将结果上传到GCS
  • 或者再次使用xcom在这种情况下使用常规运算符,但在这种情况下,你应该扩展内置运算符
英文:

Solution 1

I think for this this kind of need and use case, the most easy solution is to do the different operations in a PythonOperator as you mentioned.

In the PythonOperator, you can use Python BigQuery client to execute BigQuery job.

You can retrieve the result in a List of Dict, then launch the api requests in your endpoint, per element in the list (or maybe a more optimized way) and again use Python client to store the result in GCS as external table.

Solution 2

You can also mix usual operators with PythonOperator and use xcom to retrieve the result from the previous operator, example :

import airflow
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator


def call_api(**kwargs):
    ti = kwargs['ti']
    query_results = ti.xcom_pull(task_ids='your_query')

    # api call by query result


with airflow.DAG(
        "dag_id",
        default_args={},
        schedule_interval=None) as dag:
    your_query_task = BigQueryInsertJobOperator(
        task_id='your_query',
        configuration={
            "query": {
                "query": 'your_query',
                "useLegacySql": False,
            }
        },
        location='EU'
    )
    start_dag = DummyOperator(task_id='OK', dag=dag)

    api_call_task = PythonOperator(
        task_id="save_file_bq",
        op_kwargs={
            'dataset': 'dataset',
            'table': 'table'
        },
        python_callable=call_api
    )

    # The rest of DAG for upload the result in GCS as external table....

    start_dag >> api_call_task

For the rest of logic in the DAG, you can decide what is the best approach for you :

  • Upload the result in GCS in the PythonOperator
  • Or use an usual operator with xcom again but in this case, you should extend the built in operator

huangapple
  • 本文由 发表于 2023年5月21日 01:04:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/76296404.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定