将文件从S3位置复制/移动到通过Airflow DAG挂载在EC2上的EBS。

huangapple go评论85阅读模式
英文:

Copy/Move files from S3 location to EBS mounted on EC2 via airflow DAG

问题

我有一个Airflow DAG。我想通过DAG将文件从S3位置复制到挂载在EC2上的EBS上,目标位置是*/usr/local/myfiles*。

英文:

I have a airflow DAG. I want to copy files from S3 location to EBS mounted on EC2 to a specific location say /usr/local/myfiles via DAG.

答案1

得分: 2

你可以使用 UniversalTransferOperator 来实现这个功能。这相当于从 S3 存储桶传输文件到本地。可以使用这个 文档 来开始。

以下代码应该可以工作:

import os
import pathlib
from datetime import datetime

from airflow import DAG

from universal_transfer_operator.constants import FileType
from universal_transfer_operator.datasets.file.base import File
from universal_transfer_operator.datasets.table import Metadata, Table
from universal_transfer_operator.universal_transfer_operator import UniversalTransferOperator

CWD = pathlib.Path(__file__).parent
DATA_DIR = str(CWD) + "/../../data/"

with DAG(
    "example_universal_transfer_operator",
    schedule_interval=None,
    start_date=datetime(2022, 1, 1),
    catchup=False,
) as dag:
    transfer_non_native_s3_to_ebs = UniversalTransferOperator(
        task_id="transfer_non_native_s3_to_ebs",
        source_dataset=File(path="s3://path/to/file", conn_id="aws_default"),
        destination_dataset=File(
            path=File(path=f"{DATA_DIR}/path/to/destination_file"),
        ),
    )
英文:

You can use UniversalTransferOperator for this. This is equivalent to transferring files from the S3 bucket to the local. Get started using this documentation.

The following should work:

import os
import pathlib
from datetime import datetime

from airflow import DAG

from universal_transfer_operator.constants import FileType
from universal_transfer_operator.datasets.file.base import File
from universal_transfer_operator.datasets.table import Metadata, Table
from universal_transfer_operator.universal_transfer_operator import UniversalTransferOperator

CWD = pathlib.Path(__file__).parent
DATA_DIR = str(CWD) + "/../../data/"

with DAG(
    "example_universal_transfer_operator",
    schedule_interval=None,
    start_date=datetime(2022, 1, 1),
    catchup=False,
) as dag:
    transfer_non_native_s3_to_ebs = UniversalTransferOperator(
        task_id="transfer_non_native_s3_to_ebs",
        source_dataset=File(path="s3://path/to/file", conn_id="aws_default"),
        destination_dataset=File(
            path=File(path=f"{DATA_DIR}/path/to/destination_file"),
        ),
    )

答案2

得分: 0

只有一个解决方案,我确信还有其他方法。
您可以使用SSHOperator,它将在您的EC2实例上运行aws cli命令。

例如:

your_task = SSHOperator(
task_id="copy_from_s3",
command="aws s3 cp s3://... /usr/local/myfiles ",
ssh_hook=SSHHook(ssh_conn_id='your_conn'),
dag=dag)
英文:

Just one solution, I'm sure there are other ways.
You can have an SSHOperator that will run an aws cli command on your EC2 instance.

For example:

your_task = SSHOperator(
task_id = "copy_from_s3",
command = "aws s3 cp s3://... /usr/local/myfiles ",
ssh_hook = SSHHook(ssh_conn_id='your_conn'),
dag=dag)

huangapple
  • 本文由 发表于 2023年7月4日 22:36:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/76613704.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定