英文:
Passing User Parameter when invoking lambda in CodePipeline
问题
以下是代码的翻译部分:
我有一个简单的Lambda函数,它接受存储桶名称作为参数,并在执行函数时显示存储桶的名称。
import boto3
def lambda_handler(event, context):
s3 = boto3.resource('s3')
bucket_name = event['bucket_key']
return bucket_name
如果我传递以下事件JSON并执行它:
{
"bucket_key": "my_bucket"
}
我将得到 my_bucket
作为响应,并且它成功运行。
现在,我想将此函数添加到管道的一个阶段作为一个操作(Invoke Lambda),如下所示:
然而,当我运行管道时,函数会进入循环,如果我查看CloudWatch,我可以看到以下错误消息:
[ERROR] KeyError: 'bucket_key'
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 6, in lambda_handler
bucket_name = event['bucket_key']
在调用Lambda时,我需要以特定的格式传递用户参数给CodePipeline吗?对我来说没有道理,因为我传递的是完全相同的JSON事件,但它只在Lambda控制台中工作。
英文:
I have a simple Lambda function that accepts bucket name as a parameter and displays the name of the bucket when the function is executed
import boto3
def lambda_handler(event, context):
s3 = boto3.resource('s3')
bucket_name = event['bucket_key']
return bucket_name
If i pass this event JSON and execute it
{
"bucket_key": "my_bucket"
}
I would get my_bucket
as the response and it runs succesfully.
Now, what i want to do is to add this function in one of the stages as an action (Invoke Lambda) of my pipeline as shown below
However, when i run the pipeline, the function will go in a loop and if i look at cloudwatch, i can see this error message
[ERROR] KeyError: 'bucket_key'
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 6, in lambda_handler
bucket_name = event['bucket_key']
Is there a specific format that i need to pass as the user parameter in codepipeline when invoking Lambda? It doesnt make sense to me that I'm passing exactly the same JSON event but it only works in lambda console
答案1
得分: 1
# 代码缺少一个格式化接收到的事件的逻辑
import boto3
import json
pipeline = boto3.client('codepipeline')
s3 = boto3.resource('s3')
def lambda_handler(event, context):
print(event) # 查看代码流水线发送的事件内容
user_parameters = event.get('CodePipeline.job', {}).get('data', {}).get('actionConfiguration', {}).get('configuration', {}).get('UserParameters', {})
json_object = json.loads(user_parameters)
print(json_object) # 查看格式化后的响应内容
bucket_name = json_object.get('bucket_key', {}) # 获取存储桶名称
index = 'index.html'
# 执行一些逻辑
try:
# 如果存储桶或文件存在,则执行try块
s3.Object(bucket_name, index).load()
print(f"{bucket_name} 存储桶和 {index} 文件存在!正在关闭前端 ...")
# 将 index 重命名为 index.original
s3.Object(bucket_name, 'index-original.html').copy_from(CopySource=bucket_name + '/index.html')
# index-downtime 覆盖 index
s3.Object(bucket_name, 'index.html').copy_from(CopySource=bucket_name + '/index-downtime.html')
# 将 index original 重命名为 index downtime
s3.Object(bucket_name, 'index-downtime.html').copy_from(CopySource=bucket_name + '/index-original.html')
# 删除 index-original.html
s3.Object(bucket_name, 'index-original.html').delete()
except:
print(f"{bucket_name} 存储桶或 {index} 文件不存在!")
response = pipeline.put_job_success_result(
jobId=event['CodePipeline.job']['id'] # 每当代码流水线调用 Lambda 时都需要此参数
)
return response
# 不要忘记在 Lambda 角色中添加 put_job_success
英文:
The code is missing a logic that will format the event received
import boto3
import json
pipeline = boto3.client('codepipeline')
s3 = boto3.resource('s3')
def lambda_handler(event, context):
print(event) # to see what the event sent by the codepipeline
user_parameters = event.get('CodePipeline.job', {}).get('data', {}).get('actionConfiguration', {}).get('configuration', {}).get('UserParameters', {})
json_object = json.loads(user_parameters)
print(json_object) # to see what response looks like after being formatted
bucket_name = json_object.get('bucket_key',{}) # getting the bucket name
index = 'index.html'
# Do some logic
try:
# If bucket or file exist then we will execute the try block
s3.Object(bucket_name, index).load()
print(f"{bucket_name} bucket and {index} exist! Taking down the frontend now ...")
# Rename index to index.original
s3.Object(bucket_name,'index-original.html').copy_from(CopySource=bucket_name+'/index.html')
# Index-downtime overwrites index
s3.Object(bucket_name,'index.html').copy_from(CopySource=bucket_name+'/index-downtime.html')
# Rename index original to index downtime
s3.Object(bucket_name,'index-downtime.html').copy_from(CopySource=bucket_name+'/index-original.html')
# Delete index-original.html
s3.Object(bucket_name,'index-original.html').delete()
except:
print(f"{bucket_name} bucket or {index} does not exist!")
response = pipeline.put_job_success_result(
jobId=event['CodePipeline.job']['id'] # needed for whenever codepipeline is invoking lambda
)
return response
Dont forget to add the put job success in lambda role
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论