将连接附加到由AWS Airflow编排的Glue作业

huangapple go评论53阅读模式
英文:

Attach connection to Glue job orchestrated by AWS Airflow

问题

我正在尝试通过Airflow触发一个Glue作业。触发功能运行得很好,但由于缺少Redshift连接,它将超时。当我在作业详细信息选项卡内手动添加连接时,Glue作业可以正常运行。是否有一种方法可以将连接附加到Redshift,以便在触发的Glue作业的作业详细信息选项卡中显示?如果我手动添加它,它会在下一次Airflow运行时被移除。

我希望在Glue中尽可能避免使用psycopg2或其他库。

英文:

I am trying to trigger a glue job by Airflow. Triggering works great but it is going to timeout because of the missing Redshift connection. When I add the connection manually inside Job details tab glue job works. Is there a way to attach a connection to Redshift that would appear in Job details tab in triggered Glue? If I add it manually it gets removed on next Airflow run.

I would like to avoid using psycopg2 or other libraries in Glue if possible.

from airflow.operators.empty import EmptyOperator
from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator

project='<project>'
env='<env>'
region_name='<region>'
glue_script='<script_name>'
job_name='<name>'

default_args = {
    'owner': 'airflow',
    'start_date' : datetime(2022, 11, 12)
}

# Creating DAG Object
dag = DAG(dag_id=f'{project}-c-p-dag-{env}',
          default_args=default_args,
          schedule_interval='@once',
          catchup=False
          )

# The DummyOperator is a task and does nothing
start = EmptyOperator(
    task_id='start', dag=dag
)

submit_glue_job = GlueJobOperator(
    task_id="submit_glue_job",
    job_name=f"{project}-{job_name}-{env}-{region_name}",
    iam_role_name="<role>",
    s3_bucket=f"s3://{project}-g-a-{env}-{region_name}",
    script_location=f"s3://{project}-g-a-{env}-{region_name}/scripts/{glue_script}",
    create_job_kwargs={"GlueVersion": "3.0", "NumberOfWorkers": 2, "WorkerType": "G.1X"},
    region_name=region_name,
    script_args={'--AWS_REGION': region_name,
    '--REDSHIFT_SECRET': '<secret>',
    '--REDSHIFT_DATABASE': '<db>',
    '--RS_DB_TABLE': '<table>',
    '--DYNAMODB_TABLE': '<dynamotable>'},
    dag=dag
)

start >> submit_glue_job

答案1

得分: 2

create_job_kwargs下添加了Connections并且成功工作了!

submit_glue_job = GlueJobOperator(
    task_id="submit_glue_job",
    job_name=f"{project}-{job_name}-{env}-{region_name}",
    iam_role_name="<role>",
    s3_bucket=f"s3://{project}-g-a-{env}-{region_name}",
    script_location=f"s3://{project}-g-a-{env}-{region_name}/scripts/{glue_script}",
    create_job_kwargs={"GlueVersion": "3.0", "NumberOfWorkers": 2, "WorkerType": "G.1X", "Connections": {"Connections": ["<glue_connection_name>"]}},
    region_name=region_name,
    script_args={'--AWS_REGION': region_name, '--REDSHIFT_SECRET': '<secret>', '--REDSHIFT_DATABASE': '<db>', '--RS_DB_TABLE': '<table>', '--DYNAMODB_TABLE': '<dynamotable>'},
    dag=dag
)
英文:

Added Connections under create_job_kwargs and worked!

submit_glue_job = GlueJobOperator(
task_id=&quot;submit_glue_job&quot;,
job_name=f&quot;{project}-{job_name}-{env}-{region_name}&quot;,
iam_role_name=&quot;&lt;role&gt;&quot;,
s3_bucket=f&quot;s3://{project}-g-a-{env}-{region_name}&quot;,
script_location=f&quot;s3://{project}-g-a-{env}-{region_name}/scripts/{glue_script}&quot;,
create_job_kwargs={&quot;GlueVersion&quot;: &quot;3.0&quot;, &quot;NumberOfWorkers&quot;: 2, &quot;WorkerType&quot;: &quot;G.1X&quot;, &quot;Connections&quot;:{&quot;Connections&quot;:[&quot;&lt;glue_connection_name&gt;&quot;]}, },
region_name=region_name,
script_args={&#39;--AWS_REGION&#39;: region_name,
&#39;--REDSHIFT_SECRET&#39;: &#39;&lt;secret&gt;&#39;,
&#39;--REDSHIFT_DATABASE&#39;: &#39;&lt;db&gt;&#39;,
&#39;--RS_DB_TABLE&#39;: &#39;&lt;table&gt;&#39;,
&#39;--DYNAMODB_TABLE&#39;: &#39;&lt;dynamotable&gt;&#39;},
dag=dag

)

huangapple
  • 本文由 发表于 2023年5月25日 02:53:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/76326609.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定