英文:
Copy/Move files from S3 location to EBS mounted on EC2 via airflow DAG
问题
我有一个Airflow DAG。我想通过DAG将文件从S3位置复制到挂载在EC2上的EBS上,目标位置是*/usr/local/myfiles*。
英文:
I have a airflow DAG. I want to copy files from S3 location to EBS mounted on EC2 to a specific location say /usr/local/myfiles via DAG.
答案1
得分: 2
你可以使用 UniversalTransferOperator
来实现这个功能。这相当于从 S3 存储桶传输文件到本地。可以使用这个 文档 来开始。
以下代码应该可以工作:
import os
import pathlib
from datetime import datetime
from airflow import DAG
from universal_transfer_operator.constants import FileType
from universal_transfer_operator.datasets.file.base import File
from universal_transfer_operator.datasets.table import Metadata, Table
from universal_transfer_operator.universal_transfer_operator import UniversalTransferOperator
CWD = pathlib.Path(__file__).parent
DATA_DIR = str(CWD) + "/../../data/"
with DAG(
"example_universal_transfer_operator",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
) as dag:
transfer_non_native_s3_to_ebs = UniversalTransferOperator(
task_id="transfer_non_native_s3_to_ebs",
source_dataset=File(path="s3://path/to/file", conn_id="aws_default"),
destination_dataset=File(
path=File(path=f"{DATA_DIR}/path/to/destination_file"),
),
)
英文:
You can use UniversalTransferOperator
for this. This is equivalent to transferring files from the S3 bucket to the local. Get started using this documentation.
The following should work:
import os
import pathlib
from datetime import datetime
from airflow import DAG
from universal_transfer_operator.constants import FileType
from universal_transfer_operator.datasets.file.base import File
from universal_transfer_operator.datasets.table import Metadata, Table
from universal_transfer_operator.universal_transfer_operator import UniversalTransferOperator
CWD = pathlib.Path(__file__).parent
DATA_DIR = str(CWD) + "/../../data/"
with DAG(
"example_universal_transfer_operator",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
) as dag:
transfer_non_native_s3_to_ebs = UniversalTransferOperator(
task_id="transfer_non_native_s3_to_ebs",
source_dataset=File(path="s3://path/to/file", conn_id="aws_default"),
destination_dataset=File(
path=File(path=f"{DATA_DIR}/path/to/destination_file"),
),
)
答案2
得分: 0
只有一个解决方案,我确信还有其他方法。
您可以使用SSHOperator
,它将在您的EC2实例上运行aws cli
命令。
例如:
your_task = SSHOperator(
task_id="copy_from_s3",
command="aws s3 cp s3://... /usr/local/myfiles ",
ssh_hook=SSHHook(ssh_conn_id='your_conn'),
dag=dag)
英文:
Just one solution, I'm sure there are other ways.
You can have an SSHOperator
that will run an aws cli
command on your EC2 instance.
For example:
your_task = SSHOperator(
task_id = "copy_from_s3",
command = "aws s3 cp s3://... /usr/local/myfiles ",
ssh_hook = SSHHook(ssh_conn_id='your_conn'),
dag=dag)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论