英文:
Cloudwatch Subscription Filter does not ingest to Kinesis Data Stream
问题
I am trying to forward Cloudwatch logs to Kinesis Data Stream using the Subscription filter. Even with broad permissions, for some reason, I am not able to see any records in Kinesis. I could see logs in the Cloudwatch log group but are not being forwarded to Kinesis Data Stream. The lambda is a basic one with just a couple of console.log statements.
Here is the terraform code for the stack.
我正在尝试使用订阅过滤器将Cloudwatch日志转发到Kinesis Data Stream。 即使有广泛的权限,由于某种原因,我仍然无法在Kinesis中看到任何记录。 我可以在Cloudwatch日志组中看到日志,但它们没有被转发到Kinesis Data Stream。 Lambda函数只是一个基本的函数,只包含了一些console.log语句。
以下是该堆栈的Terraform代码。
英文:
I am trying to forward Cloudwatch logs to Kinesis Data Stream using the Subscription filter.
Even with broad permissions, for some reason, I am not able to see any records in Kinesis.
I could see logs in the Cloudwatch log group but are not being forwarded to Kinesis Data Stream. The lambda is a basic one with just a couple of console.log statements.
Here is the terraform code for the stack.
resource "aws_iam_role" "sample_lambda_role" {
name = "sample_lambda_role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}
resource "aws_iam_policy" "sample_lambda_policy" {
name = "sample_lambda_policy"
path = "/"
description = "AWS IAM Policy for managing aws lambda role"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*",
"Effect": "Allow"
}
]
}
EOF
}
resource "aws_iam_role_policy_attachment" "sample_lambda_attach_iam_policy_to_iam_role" {
role = aws_iam_role.sample_lambda_role.name
policy_arn = aws_iam_policy.sample_lambda_policy.arn
}
data "archive_file" "lambda_app_zip" {
type = "zip"
source_dir = "${path.module}/sample-lambda"
#source_file = "index.js" #if one file
output_path = "${path.module}/sample-lambda.zip"
}
resource "aws_lambda_function" "sample_lambda" {
filename = "${path.module}/sample-lambda.zip"
function_name = "sample-lambda"
role = aws_iam_role.sample_lambda_role.arn
handler = "index.handler"
source_code_hash = data.archive_file.lambda_app_zip.output_base64sha256
runtime = "nodejs14.x"
depends_on = [
aws_iam_role_policy_attachment.sample_lambda_attach_iam_policy_to_iam_role
]
}
resource "aws_cloudwatch_log_group" "sample_lambda_function_log_group" {
name = "/aws/lambda/${aws_lambda_function.sample_lambda.function_name}"
retention_in_days = 1
lifecycle {
prevent_destroy = false
}
}
resource "aws_kinesis_stream" "log_stream" {
name = "terraform-kinesis-test"
shard_count = 1
retention_period = 24
shard_level_metrics = [
"IncomingBytes",
"OutgoingBytes",
]
}
resource "aws_iam_role" "cloudwatch_ingestion_role" {
name = "cloudwatch_ingestion_role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": [
"logs.amazonaws.com"
]
},
"Effect": "Allow",
"Sid": "",
"Condition": {
"StringLike": { "aws:SourceArn": "arn:aws:logs:*:*:*" }
}
}
]
}
EOF
}
resource "aws_iam_policy" "cloudwatch_ingestion_policy" {
name = "cloudwatch_ingestion_policy"
path = "/"
description = "AWS IAM Policy for cloudwatch logs ingestion"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"kinesis:*"
],
"Resource": "arn:aws:kinesis:*:*:stream/*",
"Effect": "Allow"
}
]
}
EOF
}
resource "aws_iam_role_policy_attachment" "cloudwatch_ingestion_attach_iam_policy_to_iam_role" {
role = aws_iam_role.cloudwatch_ingestion_role.name
policy_arn = aws_iam_policy.cloudwatch_ingestion_policy.arn
}
resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter" {
name = "sample_lambda_function_logfilter"
role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
//filter_pattern = "logtype test"
filter_pattern = "" //WILL THIS WORK?
destination_arn = aws_kinesis_stream.log_stream.arn
distribution = "ByLogStream"
}
答案1
得分: 4
我注意到评论部分关于filter_pattern
值的讨论,因此我进行了有空格和没有空格的订阅过滤模式值的实验,并观察到两种过滤模式的行为相同。
以下是我使用的Terraform脚本:
resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
name = "sample_lambda_function_logfilter_with_space"
role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
filter_pattern = " "
destination_arn = aws_kinesis_stream.log_stream.arn
distribution = "ByLogStream"
}
resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
name = "sample_lambda_function_logfilter_without_space"
role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
filter_pattern = ""
destination_arn = aws_kinesis_stream.log_stream.arn
distribution = "ByLogStream"
}
Terraform计划输出:
# aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_with_space will be created
+ resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
+ destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
+ distribution = "ByLogStream"
+ filter_pattern = " "
+ id = (known after apply)
+ log_group_name = "/aws/lambda/sample-lambda"
+ name = "sample_lambda_function_logfilter_with_space"
+ role_arn = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
}
# aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_without_space will be created
+ resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
+ destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
+ distribution = "ByLogStream"
+ id = (known after apply)
+ log_group_name = "/aws/lambda/sample-lambda"
+ name = "sample_lambda_function_logfilter_without_space"
+ role_arn = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
}
请注意,如果我们传递一个没有空格值的过滤模式,Terraform在计划阶段不会分配过滤模式值。然而,Kinesis将此订阅过滤器与其他过滤器视为相同。
在AWS控制台中,如图所示,两个订阅过滤器呈现相同的模式值:
因此,我们可以排除关于提到的Terraform脚本中订阅过滤模式值的任何混淆。
那么,现在可能的问题是什么呢?我主要怀疑您在Kinesis数据流上筛选记录的方式可能有问题。我怀疑您在数据查看器中获取记录时使用了Latest
ShardIteratorType(起始位置下拉菜单)。
Latest
ShardIteratorType 显示最新记录之后的记录。考虑到您的操作顺序,您可能首先执行了Lambda函数,然后尝试在数据查看器中筛选记录。由于这些操作之间存在时间差,当您尝试使用最新的起始位置来筛选记录时,Kinesis会在最近发布的CloudWatch日志之后生成一个数据指针,这就是您无法看到Kinesis上的任何记录的原因。
AWS CLI命令
要使用Latest
ShardIteratorType 获取记录,请按照以下步骤操作:
首先,执行以下命令。这将在您的分片中的最新记录之后创建一个数据指针:
aws kinesis get-shard-iterator \
--stream-name terraform-kinesis-test \
--shard-id shardId-000000000000 \
--shard-iterator-type LATEST
命令输出:
{
"ShardIterator": "AAAAAAAAAAGiKQ..."
}
现在,您可以执行您的Lambda函数以生成一些CloudWatch日志。然后,通过新创建的订阅过滤器将日志发送到Kinesis。
接下来,执行以下命令,使用您之前检索到的分片迭代器值从Kinesis数据流中获取记录。
aws kinesis get-records \
--limit 10 \
--shard-iterator "AAAAAAAAAAGiKQ..."
命令输出:
{
"Records": [
{
"SequenceNumber": "49643477757265957414492357197584820922864438932158808066",
"ApproximateArrivalTimestamp": "2023-08-10T23:43:56.704000+00:00",
"Data": "H4sIAAAAAAAA/...",
"PartitionKey": "f656f4eedc671f9bd3cea60ef85e599c"
},
],
"NextShardIterator": "AAAAAAAAAAFOa...",
"MillisBehindLatest": 0
}
您在记录部分下看到的Data
字段是经过Base64编码和GZIP压缩的,其中包含CloudWatch日志,因此使用以下命令检索实际值。
echo -n "<BASE64ENCODED_GZIP_COMPRESSED_DATA>" | base64 -d | zcat
上述大纲的步骤将帮助您使用AWS CLI通过Latest
ShardIteratorType类型检索记录。然而,如果您的意图是直接在AWS控制台的数据查看器部分查看记录,您可以使用备选的ShardIteratorTypes。
例如,当使用TRIM_HORIZON
起始位置时,数据将如下图所示:
要了解有关ShardIteratorTypes的更多信息,请参考此链接。
英文:
I noticed discussions around the filter_pattern
value in the comment section, so I conducted experiments with and without spaces in the subscription filter pattern value and observed that both filter patterns behave same.
Here is the Terraform script I used:
resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
name = "sample_lambda_function_logfilter_with_space"
role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
filter_pattern = " "
destination_arn = aws_kinesis_stream.log_stream.arn
distribution = "ByLogStream"
}
resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
name = "sample_lambda_function_logfilter_without_space"
role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
filter_pattern = ""
destination_arn = aws_kinesis_stream.log_stream.arn
distribution = "ByLogStream"
}
Terraform plan output:
# aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_with_space will be created
+ resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
+ destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
+ distribution = "ByLogStream"
+ filter_pattern = " "
+ id = (known after apply)
+ log_group_name = "/aws/lambda/sample-lambda"
+ name = "sample_lambda_function_logfilter_with_space"
+ role_arn = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
}
# aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_without_space will be created
+ resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
+ destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
+ distribution = "ByLogStream"
+ id = (known after apply)
+ log_group_name = "/aws/lambda/sample-lambda"
+ name = "sample_lambda_function_logfilter_without_space"
+ role_arn = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
}
Note that if we pass a filter pattern without a space value, Terraform does not assign the filter pattern value during the planning stage. However, Kinesis treats this subscription filter the same as the other one.
In the AWS Console, as depicted in the image, both subscription filter renders the same pattern value:
Hence, we can rule out any confusion regarding the subscription filter pattern value in the mentioned Terraform script.
So, what could be the issue now? My main suspicion is that the way you're filtering records on the Kinesis data stream is wrong. I suspect you might have used the Latest
ShardIteratorType (starting position drop-down) when fetching records in the Data Viewer.
The Latest
ShardIteratorType shows records just after the most recent record in the shard. Considering your sequence of actions, it's likely that you executed your lambda function at first and then attempted to filter records within the Data Viewer. Due to the time lag between these actions, when you try to filter the records using the Latest starting position, Kinesis generates a data pointer after the recently published CloudWatch logs, and that was the reason why you couldn't see any records on the Kinesis.
AWS CLI Commands
To fetch the records using the Latest
ShardIteratorType, follow these steps:
First, execute the following command. This will create a data pointer after the most recent records present in your shard:
aws kinesis get-shard-iterator \
--stream-name terraform-kinesis-test \
--shard-id shardId-000000000000 \
--shard-iterator-type LATEST
Command Output:
{
"ShardIterator": "AAAAAAAAAAGiKQ..."
}
Now, you can execute your lambda function to produce some CloudWatch logs. The logs then will be sent to Kinesis through the newly created subscription filter.
Next, execute the following command to fetch records from the Kinesis data stream using the shard-iterator value that you retrieved earlier.
aws kinesis get-records \
--limit 10 \
--shard-iterator "AAAAAAAAAAGiKQ..."
Command Output:
{
"Records": [
{
"SequenceNumber": "49643477757265957414492357197584820922864438932158808066",
"ApproximateArrivalTimestamp": "2023-08-10T23:43:56.704000+00:00",
"Data": "H4sIAAAAAAAA/...",
"PartitionKey": "f656f4eedc671f9bd3cea60ef85e599c"
},
],
"NextShardIterator": "AAAAAAAAAAFOa...",
"MillisBehindLatest": 0
}
The Data
field that you see under the records section is base64 encoded and GZIP compressed, which has a CloudWatch log, so use the following command to retrieve the actual value.
echo -n "<BASE64ENCODED_GZIP_COMPRESSED_DATA>" | base64 -d | zcat
The steps outlined above will help you in retrieving records using the Latest
ShardIteratorType type via the AWS CLI. Nevertheless, if your intention is to directly view the records within the AWS Console's Data Viewer section, you can use the alternative ShardIteratorTypes.
For instance, when utilizing the TRIM_HORIZON
starting position, the data will appear as shown in the image below:
To know more about the ShardIteratorTypes, refer this link.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论