如何在Airflow中连接HDFS?

huangapple go评论56阅读模式
英文:

how to connect hdfs in airflow?

问题

如何在Airflow中执行HDFS操作?

确保您安装以下Python包

pip install apache-airflow-providers-apache-hdfs

# 代码段

# 导入包
from airflow import settings
from airflow.models import Connection
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.operators.bash import BashOperator

# 定义新的DAG
dag_execute_hdfs_commands = DAG(
      dag_id='connect_hdfs',
      schedule_interval='@once',
      start_date=days_ago(1),
      dagrun_timeout=timedelta(minutes=60),
      description='执行HDFS命令',
     )

# 建立与HDFS的连接
conn = Connection(
     conn_id='webhdfs_default1',
     conn_type='HDFS',
     host='localhost',
     login='usr_id',
     password='password',
     port='9000',
    )
session = settings.Session()

# 以下行将向Airflow默认数据库添加新连接
# 请确保一旦DAG成功运行,您将注释掉以下行。
# 因为我们不希望每次执行HDFS操作时都添加相同的连接“webhdfs_default1”。
session.add(conn) #在下一次运行时,请注释掉此行
session.close()

if __name__ == '__main__':
    dag_execute_hdfs_commands.cli()

一旦上述DAG成功运行您可以在此之后执行HDFS操作
例如如果您希望列出HDFS目录中的文件请尝试以下代码

# 文件列表操作
start_task = BashOperator(
         task_id="start_task",
         bash_command="hdfs dfs -ls /",
         dag=dag_execute_hdfs_commands
         )

start_task
英文:

How to perform HDFS operation in Airflow?

make sure you install following python package
> pip install apache-airflow-providers-apache-hdfs

#Code Snippet

#Import packages 
from airflow import settings
from airflow.models import Connection
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.operators.bash import BashOperator
 
#Define new DAG
dag_execute_hdfs_commands = DAG(
      dag_id ='connect_hdfs',
      schedule_interval='@once',
      start_date=days_ago(1),
      dagrun_timeout=timedelta(minutes=60),
      description='excuting hdfs commands',
     )

#Establish connection to HDFS
conn =Connection(
     conn_id = 'webhdfs_default1',
     conn_type='HDFS',
     host='localhost',
     login='usr_id',
     password='password',
     port='9000',
    )
session = settings.Session()

#Following line will add new connection to you airflow default DB
#Make sure once the DAG runs successfully you comment out following line.
#Because we do not want to add same connection "webhdfs_default1" every time we perform hdfs operations.
session.add(conn) #On your next run comment this out
session.close()

if __name__ == '__main__':
    dag_execute_hdfs_commands.cli()

Once above DAG runs successfully you can perform hdfs operation hereafter
For example if you wish to list files in hdfs directory try the following code

#File listing operation
start_task = BashOperator(
         task_id="start_task",
         bash_command="hdfs dfs -ls /",
         dag=dag_execute_hdfs_commands
         )

start_task

答案1

得分: 1

你不能在BashOperator中使用连接webhdfs_default,它应该与WebHDFSHook hook一起使用,该hook创建一个客户端以查询Web HDFS服务器。目前有两种已实现的方法:

  • check_for_path:检查HDFS中是否存在文件。
  • load_file:将文件上传到HDFS。

你可以访问客户端以执行其他操作:

webHDFS_hook = WebHDFSHook(webhdfs_conn_id="你的连接ID")
client = webHDFS_hook.get_conn()
client.<操作>

客户端是hdfs.InsecureClient的实例,如果配置core.security不是kerberos,则是hdfs.ext.kerberos.KerberosClient。这是hdfs cli客户端的文档,你可以查看可用的操作并使用它们。

有很多可用的操作,如下载、删除、列出、读取、创建目录等,你可以在新的Airflow操作中调用它们。

英文:

You cannot use the connection webhdfs_default with BashOperator, where it works with WebHDFSHook hook, which create a client to query the web HDFS server. Currently there is two implemented method:

  • check_for_path: to check if a file exists in hdfs
  • load_file: to upload a file to hdfs

You can access the client to do other operation:

webHDFS_hook = WebHDFSHook(webhdfs_conn_id=&quot;&lt;you conn id&gt;&quot;)
client = webHDFS_hook.get_conn()
client.&lt;operation&gt;

The client it an instance from hdfs.InsecureClient if the conf core.security is not kerberos, and hdfs.ext.kerberos.KerberosClient if it is. Here is the documentation of hdfs cli clients, you can check what are the available operation and use them.

There is a lot of available operations like download, delete, list, read, make_dir, ..., which you can call in a new Airflow operator.

huangapple
  • 本文由 发表于 2023年2月8日 22:18:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/75387094.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定