英文:
Connect query local mongodb database from dag airflow
问题
这是你要翻译的内容:
从Airflow初学者的问题:
我无法从DAG airflow中的任务中查询我的本地mongodb数据库。
DAG在Airflow中被正确导入,但当我启动它时,任务因超时服务器而失败:
[在此输入图像描述](https://i.stack.imgur.com/U7850.png)
这是我在Airflow中设置的连接:
[在此输入图像描述](https://i.stack.imgur.com/XGrXI.png)
这是我用于初始化代码的Python函数:
```python
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.providers.mongo.hooks.mongo import MongoHook
from pymongo.mongo_client import MongoClient
def connecter():
hook = MongoHook(mongo_conn_id='mongo_default')
client = hook.get_conn()
my_db = client['Immo']
return my_db
mydag = DAG(
dag_id='my_dag',
default_args={
'owner': 'airflow',
'start_date': days_ago(0),
}
)
def get_one(c: str):
a = connecter().find({"id_transaction": c})[0]['_id_transaction']
print(a)
return a
task_1_0 = PythonOperator(
task_id='test_connection_my_bd',
python_callable=db.get_one,
dag=mydag,
op_kwargs={'c': 107332}
)
我在容器中运行Airflow,使用社区Airflow构建的docker-compose:
https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml
当我在DAG airflow之外运行我的函数get_one(c:str)
,并使用connecter_local
函数时,它可以正常工作:
def connecter_local():
uri = "mongodb://localhost:27017"
client = MongoClient(uri)
my_db = client['Immo']
return my_db
def get_one_local(c: str):
a = connecter_local().find({"id_transaction": c})[0]['_id_transaction']
print(a)
return a
print(get_one_local('1'))
提前感谢。
<details>
<summary>英文:</summary>
question from beginner in Airflow:
I can't query my local mongodb database from a task in a DAG airflow.
the dag is correctly imported in Airflow but when i start it the task fail because of timeout servor:
[enter image description here](https://i.stack.imgur.com/U7850.png)
Here the connection I set into Airflow:
[enter image description here](https://i.stack.imgur.com/XGrXI.png)
Here my pyhton function to initialize code:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.providers.mongo.hooks.mongo import MongoHook
from pymongo.mongo_client import MongoClient
def connecter():
hook = MongoHook(mongo_conn_id='mongo_default')
client = hook.get_conn()
my_db = client['Immo']
return my_db
mydag = DAG(
dag_id='my_dag',
default_args={
'owner': 'airflow',
'start_date': days_ago(0),
}
)
def get_one(c: str):
a=connecter().find({"id_transaction": c})[0]['_id_transaction']
print(a)
return a
task_1_0 = PythonOperator(
task_id='test_connection_my_bd',
python_callable=db.get_one,
dag=mydag,
op_kwargs={'c': 107332}
)
I run airflow inside containers using the docker-compose built by the community airflow:
https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml
When I run my function get_one(c:str) outside of dag airflow with the function connecter_local it works fine:
def connecter_local():
uri= "mongodb://localhost:27017"
client = MongoClient(uri)
my_db = client['Immo']
return my_db
def get_one_local(c: str):
a=connecter_local().find({"id_transaction": c})[0]['_id_transaction']
print(a)
return a
print(get_one_local('1'))
Thanks in advance.
</details>
# 答案1
**得分**: 0
您的Airflow正在Docker上运行,而MongoDB在您的计算机上(不在Airflow Docker内部)。当您配置host=127.0.0.1时,它位于Docker内部网络,而不是主机IP。您应该将host更改为您的计算机名称或IP地址。
<details>
<summary>英文:</summary>
Your airflow is running on a docker and the mongo-db on your machine (not inside the airflow docker).
when you configure host=127.0.0.1 its inside the docker (internal network) and its not the host ip. you should change host to your computer-name or ip
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论