英文:
how to connect hdfs in airflow?
问题
如何在Airflow中执行HDFS操作?
确保您安装以下Python包
pip install apache-airflow-providers-apache-hdfs
# 代码段
# 导入包
from airflow import settings
from airflow.models import Connection
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.operators.bash import BashOperator
# 定义新的DAG
dag_execute_hdfs_commands = DAG(
dag_id='connect_hdfs',
schedule_interval='@once',
start_date=days_ago(1),
dagrun_timeout=timedelta(minutes=60),
description='执行HDFS命令',
)
# 建立与HDFS的连接
conn = Connection(
conn_id='webhdfs_default1',
conn_type='HDFS',
host='localhost',
login='usr_id',
password='password',
port='9000',
)
session = settings.Session()
# 以下行将向Airflow默认数据库添加新连接
# 请确保一旦DAG成功运行,您将注释掉以下行。
# 因为我们不希望每次执行HDFS操作时都添加相同的连接“webhdfs_default1”。
session.add(conn) #在下一次运行时,请注释掉此行
session.close()
if __name__ == '__main__':
dag_execute_hdfs_commands.cli()
一旦上述DAG成功运行,您可以在此之后执行HDFS操作
例如,如果您希望列出HDFS目录中的文件,请尝试以下代码
# 文件列表操作
start_task = BashOperator(
task_id="start_task",
bash_command="hdfs dfs -ls /",
dag=dag_execute_hdfs_commands
)
start_task
英文:
How to perform HDFS operation in Airflow?
make sure you install following python package
> pip install apache-airflow-providers-apache-hdfs
#Code Snippet
#Import packages
from airflow import settings
from airflow.models import Connection
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.operators.bash import BashOperator
#Define new DAG
dag_execute_hdfs_commands = DAG(
dag_id ='connect_hdfs',
schedule_interval='@once',
start_date=days_ago(1),
dagrun_timeout=timedelta(minutes=60),
description='excuting hdfs commands',
)
#Establish connection to HDFS
conn =Connection(
conn_id = 'webhdfs_default1',
conn_type='HDFS',
host='localhost',
login='usr_id',
password='password',
port='9000',
)
session = settings.Session()
#Following line will add new connection to you airflow default DB
#Make sure once the DAG runs successfully you comment out following line.
#Because we do not want to add same connection "webhdfs_default1" every time we perform hdfs operations.
session.add(conn) #On your next run comment this out
session.close()
if __name__ == '__main__':
dag_execute_hdfs_commands.cli()
Once above DAG runs successfully you can perform hdfs operation hereafter
For example if you wish to list files in hdfs directory try the following code
#File listing operation
start_task = BashOperator(
task_id="start_task",
bash_command="hdfs dfs -ls /",
dag=dag_execute_hdfs_commands
)
start_task
答案1
得分: 1
你不能在BashOperator
中使用连接webhdfs_default
,它应该与WebHDFSHook
hook一起使用,该hook创建一个客户端以查询Web HDFS服务器。目前有两种已实现的方法:
check_for_path
:检查HDFS中是否存在文件。load_file
:将文件上传到HDFS。
你可以访问客户端以执行其他操作:
webHDFS_hook = WebHDFSHook(webhdfs_conn_id="你的连接ID")
client = webHDFS_hook.get_conn()
client.<操作>
客户端是hdfs.InsecureClient
的实例,如果配置core.security
不是kerberos
,则是hdfs.ext.kerberos.KerberosClient
。这是hdfs cli客户端的文档,你可以查看可用的操作并使用它们。
有很多可用的操作,如下载、删除、列出、读取、创建目录等,你可以在新的Airflow操作中调用它们。
英文:
You cannot use the connection webhdfs_default
with BashOperator
, where it works with WebHDFSHook
hook, which create a client to query the web HDFS server. Currently there is two implemented method:
- check_for_path: to check if a file exists in hdfs
- load_file: to upload a file to hdfs
You can access the client to do other operation:
webHDFS_hook = WebHDFSHook(webhdfs_conn_id="<you conn id>")
client = webHDFS_hook.get_conn()
client.<operation>
The client it an instance from hdfs.InsecureClient
if the conf core.security
is not kerberos
, and hdfs.ext.kerberos.KerberosClient
if it is. Here is the documentation of hdfs cli clients, you can check what are the available operation and use them.
There is a lot of available operations like download, delete, list, read, make_dir, ..., which you can call in a new Airflow operator.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论