连接查询本地 MongoDB 数据库从 DAG Airflow

huangapple go评论68阅读模式
英文:

Connect query local mongodb database from dag airflow

问题

这是你要翻译的内容:

从Airflow初学者的问题:

我无法从DAG airflow中的任务中查询我的本地mongodb数据库。

DAG在Airflow中被正确导入,但当我启动它时,任务因超时服务器而失败:
[在此输入图像描述](https://i.stack.imgur.com/U7850.png)

这是我在Airflow中设置的连接:
[在此输入图像描述](https://i.stack.imgur.com/XGrXI.png)

这是我用于初始化代码的Python函数:

```python
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.providers.mongo.hooks.mongo import MongoHook
from pymongo.mongo_client import MongoClient

def connecter():
    hook = MongoHook(mongo_conn_id='mongo_default')
    client = hook.get_conn()
    my_db = client['Immo']
    return my_db

mydag = DAG(
    dag_id='my_dag',
    default_args={
        'owner': 'airflow',
        'start_date': days_ago(0),
    }
)

def get_one(c: str):
    a = connecter().find({"id_transaction": c})[0]['_id_transaction']
    print(a)
    return a

task_1_0 = PythonOperator(
    task_id='test_connection_my_bd',
    python_callable=db.get_one,
    dag=mydag,
    op_kwargs={'c': 107332}
)

我在容器中运行Airflow,使用社区Airflow构建的docker-compose:
https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml

当我在DAG airflow之外运行我的函数get_one(c:str),并使用connecter_local函数时,它可以正常工作:

def connecter_local():
    uri = "mongodb://localhost:27017"
    client = MongoClient(uri)
    my_db = client['Immo']
    return my_db

def get_one_local(c: str):
    a = connecter_local().find({"id_transaction": c})[0]['_id_transaction']
    print(a)
    return a

print(get_one_local('1'))

提前感谢。


<details>
<summary>英文:</summary>

question from beginner in Airflow:


I can&#39;t query my local mongodb database from a task in a DAG airflow.

the dag is correctly imported in Airflow but when i start it the task fail because of timeout servor:
[enter image description here](https://i.stack.imgur.com/U7850.png)

Here the connection I set into Airflow:
[enter image description here](https://i.stack.imgur.com/XGrXI.png)

Here my pyhton function to initialize code:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.providers.mongo.hooks.mongo import MongoHook
from pymongo.mongo_client import MongoClient

def connecter():
hook = MongoHook(mongo_conn_id='mongo_default')
client = hook.get_conn()
my_db = client['Immo']
return my_db

mydag = DAG(
dag_id='my_dag',
default_args={
'owner': 'airflow',
'start_date': days_ago(0),
}
)

def get_one(c: str):

a=connecter().find({&quot;id_transaction&quot;: c})[0][&#39;_id_transaction&#39;]
print(a)
return a

task_1_0 = PythonOperator(
task_id='test_connection_my_bd',
python_callable=db.get_one,
dag=mydag,
op_kwargs={'c': 107332}
)


I run airflow inside containers using the docker-compose built by the community airflow:
https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml
 


When I run my function get_one(c:str) outside of dag airflow with the function connecter_local it works fine:


def connecter_local():
uri= "mongodb://localhost:27017"
client = MongoClient(uri)
my_db = client['Immo']
return my_db

def get_one_local(c: str):

a=connecter_local().find({&quot;id_transaction&quot;: c})[0][&#39;_id_transaction&#39;]
print(a)
return a

print(get_one_local('1'))

Thanks in advance.

</details>


# 答案1
**得分**: 0

您的Airflow正在Docker上运行,而MongoDB在您的计算机上(不在Airflow Docker内部)。当您配置host=127.0.0.1时,它位于Docker内部网络,而不是主机IP。您应该将host更改为您的计算机名称或IP地址。

<details>
<summary>英文:</summary>

Your airflow is running on a docker and the mongo-db on your machine (not inside the airflow docker).

when you configure host=127.0.0.1 its inside the docker (internal network) and its not the host ip. you should change host to your computer-name or ip

</details>



huangapple
  • 本文由 发表于 2023年5月29日 23:16:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/76358480.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定