如何使用Airflow DAG调用一个REST端点

huangapple go评论76阅读模式
英文:

How to call a REST end point using Airflow DAG

问题

我是新手使用Apache Airflow。我想使用DAG调用一个REST端点。
例如的REST端点是

@PostMapping(path = "/api/employees", consumes = "application/json")

现在我想使用Airflow DAG调用这个REST端点并进行调度。我使用SimpleHttpOperator来调用REST端点。

t1 = SimpleHttpOperator(
task_id='post_op',
endpoint='http://localhost:8084/api/employees',
data=json.dumps({"department": "Digital","id": 102,"name": "Rakesh","salary": 80000}),
headers={"Content-Type": "application/json"},
dag=dag,)

当我触发DAG时,任务失败了。Airflow运行在Docker上,Docker镜像是puckel/docker-airflow。为什么它会调用主机http_default. Host: https://www.google.com/

英文:

I'm new to Apache Airflow. I want to call a REST end point using DAG.
REST end point for example

@PostMapping(path = "/api/employees", consumes = "application/json")

Now I want to call this rest end point using Airflow DAG, and schedule it. What I'm doing is using SimpleHttpOperator to call the Rest end point.

t1 = SimpleHttpOperator(
task_id='post_op',
endpoint='http://localhost:8084/api/employees',
data=json.dumps({"department": "Digital","id": 102,"name": "Rakesh","salary": 80000}),
headers={"Content-Type": "application/json"},
dag=dag,)

When I trigger the DAG the task is getting failed

[2019-12-30 09:09:06,330] {{taskinstance.py:862}} INFO - Executing <Task(SimpleHttpOperator): 
post_op> on 2019-12-30T08:57:00.674386+00:00
[2019-12-30 09:09:06,331] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run', 
'example_http_operator', 'post_op', '2019-12-30T08:57:00.674386+00:00', '--job_id', '6', '--pool', 
'default_pool', '--raw', '-sd', 'DAGS_FOLDER/ExampleHttpOperator.py', '--cfg_path', 
'/tmp/tmpf9t6kzxb']
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30 
09:09:07,445] {{__init__.py:51}} INFO - Using executor SequentialExecutor
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30 
09:09:07,446] {{dagbag.py:92}} INFO - Filling up the DagBag from 
/usr/local/airflow/dags/ExampleHttpOperator.py
[2019-12-30 09:09:07,473] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30 
09:09:07,472] {{cli.py:545}} INFO - Running <TaskInstance: example_http_operator.post_op 2019-12- 
30T08:57:00.674386+00:00 [running]> on host 855dbc2ce3a3
[2019-12-30 09:09:07,480] {{http_operator.py:87}} INFO - Calling HTTP method
[2019-12-30 09:09:07,483] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,483] 
{{base_hook.py:84}} INFO - Using connection to: id: http_default. Host: https://www.google.com/, 
Port: None, Schema: None, Login: None, Password: None, extra: {}
[2019-12-30 09:09:07,484] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,484] 
{{http_hook.py:131}} INFO - Sending 'POST' to url: 
https://www.google.com/http://localhost:8084/api/employees
[2019-12-30 09:09:07,501] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,501] 
{{http_hook.py:181}} WARNING - HTTPSConnectionPool(host='www.google.com', port=443): Max retries 
exceeded with url: /http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake: 
SysCallError(-1, 'Unexpected EOF')"))) Tenacity will retry to execute the operation
[2019-12-30 09:09:07,501] {{taskinstance.py:1058}} ERROR - 
HTTPSConnectionPool(host='www.google.com', port=443): Max retries exceeded with url: 
/http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake: SysCallError(-1, 
'Unexpected EOF')")))
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 485, in wrap_socket
cnx.do_handshake()
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1934, in do_handshake
self._raise_ssl_error(self._ssl, result)
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1664, in _raise_ssl_error
raise SysCallError(-1, "Unexpected EOF")
OpenSSL.SSL.SysCallError: (-1, 'Unexpected EOF')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 376, in _make_request
self._validate_conn(conn)
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 994, in _validate_conn
conn.connect()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 394, in connect
ssl_context=context,
File "/usr/local/lib/python3.7/site-packages/urllib3/util/ssl_.py", line 370, in ssl_wrap_socket
return context.wrap_socket(sock, server_hostname=server_hostname)
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 491, in wrap_socket
raise ssl.SSLError("bad handshake: %r" % e)
ssl.SSLError: ("bad handshake: SysCallError(-1, 'Unexpected EOF')",)

Airflow is running on Docker and the docker image is puckel/docker-airflow.
Why it is calling the host http_default. Host: https://www.google.com/

答案1

得分: 3

需要考虑您正在使用的“Operator”和它使用的底层“Hook”。 “Hook”从“Airflow Connection”中获取连接信息,这只是一个用于存储凭据和其他连接信息的容器。您可以在Airflow UI中配置“Connections”(使用Airflow UI -> Admin -> Connections)。

所以在这种情况下,您需要首先配置您的HTTP“Connection”。

http_hook documentation中:

http_conn_id (str) – 连接的基本API URL,即https://www.google.com/

碰巧对于“httpHook”,您应该通过将“host”参数设置为您端点的“base_url”来配置“Connection”:http://localhost:8084/

由于您的操作员具有默认的“http_conn_id”,所以钩子将使用Airflow UI中称为“http_default”的“Airflow Connection”。如果您不想更改默认值,您可以使用Airflow UI创建另一个“Airflow Connection”,并将新的conn_id参数传递给您的操作员。

请参阅source code以更好地了解如何使用“Connection”对象。

最后,根据http_operator documentation

endpoint (str) – 完整URL的相对部分(模板化)

您应该只将您的URL的“relative”部分传递给操作员。其余部分将从底层的“http_hook”获取。

在这种情况下,您的“Operator”的“endpoint”值应该是“api/employees”(而不是完整的URL)。

很不幸,Airflow项目文档在这种情况下并不是非常清楚。请考虑提供改进,它们始终受欢迎 如何使用Airflow DAG调用一个REST端点

英文:

You need to consider both the Operator you are using and the underlying Hook which it uses to connect.
The Hook fetches connection information from an Airflow Connection which is just a container used to store credentials and other connection information. You can configure Connections in the Airflow UI (using the Airflow UI -> Admin -> Connections).

So in this case, you need to first configure your HTTP Connection.

From the http_hook documentation:

http_conn_id (str) – connection that has the base API url i.e https://www.google.com/

It so happens that for the httpHook, you should configure the Connection by setting the host argument equal to the base_url of your endpoint: http://localhost:8084/.

Since your operator has the default http_conn_id, the hook will use the Airflow Connection called "http_default" in the Airflow UI.
If you don't want to change the default one you can create another Airflow Connection using the Airflow UI, and pass the new conn_id argument to your operator.

See the source code to get a better idea how the Connection object is used.

Lastly, according to the http_operator documentation:

endpoint (str) – The relative part of the full url. (templated)

You should only be passing the relative part of your URL to the operator. The rest it will get from the underlying http_hook.

In this case, the value of endpoint for your Operator should be api/employees (not the full URL).

The Airflow project documentation is unfortunately not very clear in this case. Please consider contributing an improvement, they are always welcome 如何使用Airflow DAG调用一个REST端点

答案2

得分: 1

我认为你需要在你的Dockerfile或docker run命令中设置连接字符串的ENV变量:

ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN my_conn_string

详见这里这里

连接

外部系统的连接信息存储在Airflow元数据数据库中,并在UI中进行管理(菜单 -> 管理员 -> 连接)。在那里定义了一个conn_id,并附加了主机名/登录/密码/模式信息。Airflow流水线可以简单地引用集中管理的conn_id,而不必在任何地方硬编码这些信息。

可以定义许多具有相同conn_id的连接,当出现这种情况时,当hooks使用BaseHook的get_connection方法时,Airflow将随机选择一个连接,从而实现一些基本的负载均衡和容错能力,与重试一起使用时。

Airflow还可以通过操作系统的环境变量引用连接。环境变量需要以AIRFLOW_CONN_为前缀才能被视为连接。在Airflow流水线中引用连接时,conn_id应该是变量名,不带前缀。例如,如果conn_id命名为POSTGRES_MASTER,环境变量应该命名为AIRFLOW_CONN_POSTGRES_MASTER。Airflow假设从环境变量返回的值以URI格式表示(例如,postgres://user:password@localhost:5432/master)。

详见这里

因此,你现在正在使用默认连接:

Using connection to: id: http_default. Host: https://www.google.com/
英文:

I think you need to set your ENV variable of connection string in your Dockerfile or docker run command:

ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN my_conn_string

see this and this

> Connections
>
> The connection information to external systems is stored in the
> Airflow metadata database and managed in the UI (Menu -> Admin ->
> Connections) A conn_id is defined there and hostname / login /
> password / schema information attached to it. Airflow pipelines can
> simply refer to the centrally managed conn_id without having to hard
> code any of this information anywhere.
>
> Many connections with the same conn_id can be defined and when that is
> the case, and when thehooks uses the get_connection method from
> BaseHook, Airflow will choose one connection randomly, allowing for
> some basic load balancing and fault tolerance when used in conjunction
> with retries.
>
> Airflow also has the ability to reference connections via environment
> variables from the operating system. The environment variable needs to
> be prefixed with AIRFLOW_CONN_ to be considered a connection. When
> referencing the connection in the Airflow pipeline, the conn_id should
> be the name of the variable without the prefix. For example, if the
> conn_id is named POSTGRES_MASTER the environment variable should be
> named AIRFLOW_CONN_POSTGRES_MASTER. Airflow assumes the value returned
> from the environment variable to be in a URI format
> (e.g.postgres://user:password@localhost:5432/master).

see this

therefore you are now using the default:

Using connection to: id: http_default. Host: https://www.google.com/

huangapple
  • 本文由 发表于 2020年1月3日 14:50:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/59574331.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定