英文:
Airflow - Make http request for each row of Big Query Result Set
问题
我想使用Airflow DAG执行GCP Big Query,并针对结果集的每一行向一个端点发出请求。随着这个http请求的返回,我希望将其存储在GCS中,作为外部表。
我没有找到将GCP Big Query的结果集传递给其他运算符并对其进行迭代的方法。我唯一找到的方法是使用Python运算符,但我认为有更好的方法来完成这个任务。
英文:
I would like to use Airflow DAG to execute a GCP Big Query, and for each row of the result set, I will make a request to an endpoint. With the return of this http request, I would like to store it in GCS, as external table.
I didn't find a way to pass the result set of GCP Big query to other operator and iterate over it. The only way that I find, is to use a Python Operator, but I supposed that there is a better way to do that.
答案1
得分: 1
解决方案 1
我认为对于这种需求和用例,最简单的解决方案是在PythonOperator
中执行不同的操作,就像你提到的那样。
在PythonOperator
中,你可以使用Python BigQuery客户端来执行BigQuery
作业。
你可以在一个Dict
的List
中检索结果,然后在你的端点中针对列表中的每个元素(或者可能更优化的方式)启动API请求,然后再次使用Python
客户端将结果存储在GCS
中作为外部表。
解决方案 2
你还可以混合常规运算符与PythonOperator
,并使用xcom
从前一个运算符中检索结果,例如:
import airflow
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
def call_api(**kwargs):
ti = kwargs['ti']
query_results = ti.xcom_pull(task_ids='your_query')
# 根据查询结果进行 API 调用
with airflow.DAG(
"dag_id",
default_args={},
schedule_interval=None) as dag:
your_query_task = BigQueryInsertJobOperator(
task_id='your_query',
configuration={
"query": {
"query": 'your_query',
"useLegacySql": False,
}
},
location='EU'
)
start_dag = DummyOperator(task_id='OK', dag=dag)
api_call_task = PythonOperator(
task_id="save_file_bq",
op_kwargs={
'dataset': 'dataset',
'table': 'table'
},
python_callable=call_api
)
# DAG 的其余逻辑用于将结果上传到 GCS 作为外部表...
start_dag >> api_call_task
至于DAG
中的其余逻辑,你可以决定哪种方法对你来说最合适:
- 在
PythonOperator
中将结果上传到GCS - 或者再次使用
xcom
在这种情况下使用常规运算符,但在这种情况下,你应该扩展内置运算符
英文:
Solution 1
I think for this this kind of need and use case, the most easy solution is to do the different operations in a PythonOperator
as you mentioned.
In the PythonOperator
, you can use Python BigQuery client to execute BigQuery
job.
You can retrieve the result in a List
of Dict
, then launch the api requests in your endpoint, per element in the list (or maybe a more optimized way) and again use Python
client to store the result in GCS
as external table.
Solution 2
You can also mix usual operators with PythonOperator
and use xcom
to retrieve the result from the previous operator, example :
import airflow
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
def call_api(**kwargs):
ti = kwargs['ti']
query_results = ti.xcom_pull(task_ids='your_query')
# api call by query result
with airflow.DAG(
"dag_id",
default_args={},
schedule_interval=None) as dag:
your_query_task = BigQueryInsertJobOperator(
task_id='your_query',
configuration={
"query": {
"query": 'your_query',
"useLegacySql": False,
}
},
location='EU'
)
start_dag = DummyOperator(task_id='OK', dag=dag)
api_call_task = PythonOperator(
task_id="save_file_bq",
op_kwargs={
'dataset': 'dataset',
'table': 'table'
},
python_callable=call_api
)
# The rest of DAG for upload the result in GCS as external table....
start_dag >> api_call_task
For the rest of logic in the DAG
, you can decide what is the best approach for you :
- Upload the result in GCS in the
PythonOperator
- Or use an usual operator with xcom again but in this case, you should extend the built in operator
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论