CloudWatch 订阅过滤器未将数据摄取到 Kinesis 数据流中。

huangapple go评论91阅读模式
英文:

Cloudwatch Subscription Filter does not ingest to Kinesis Data Stream

问题

I am trying to forward Cloudwatch logs to Kinesis Data Stream using the Subscription filter. Even with broad permissions, for some reason, I am not able to see any records in Kinesis. I could see logs in the Cloudwatch log group but are not being forwarded to Kinesis Data Stream. The lambda is a basic one with just a couple of console.log statements.

Here is the terraform code for the stack.

我正在尝试使用订阅过滤器将Cloudwatch日志转发到Kinesis Data Stream。 即使有广泛的权限,由于某种原因,我仍然无法在Kinesis中看到任何记录。 我可以在Cloudwatch日志组中看到日志,但它们没有被转发到Kinesis Data Stream。 Lambda函数只是一个基本的函数,只包含了一些console.log语句。

以下是该堆栈的Terraform代码。

英文:

I am trying to forward Cloudwatch logs to Kinesis Data Stream using the Subscription filter.
Even with broad permissions, for some reason, I am not able to see any records in Kinesis.
I could see logs in the Cloudwatch log group but are not being forwarded to Kinesis Data Stream. The lambda is a basic one with just a couple of console.log statements.

Here is the terraform code for the stack.

  1. resource "aws_iam_role" "sample_lambda_role" {
  2. name = "sample_lambda_role"
  3. assume_role_policy = <<EOF
  4. {
  5. "Version": "2012-10-17",
  6. "Statement": [
  7. {
  8. "Action": "sts:AssumeRole",
  9. "Principal": {
  10. "Service": "lambda.amazonaws.com"
  11. },
  12. "Effect": "Allow",
  13. "Sid": ""
  14. }
  15. ]
  16. }
  17. EOF
  18. }
  19. resource "aws_iam_policy" "sample_lambda_policy" {
  20. name = "sample_lambda_policy"
  21. path = "/"
  22. description = "AWS IAM Policy for managing aws lambda role"
  23. policy = <<EOF
  24. {
  25. "Version": "2012-10-17",
  26. "Statement": [
  27. {
  28. "Action": [
  29. "logs:CreateLogGroup",
  30. "logs:CreateLogStream",
  31. "logs:PutLogEvents"
  32. ],
  33. "Resource": "arn:aws:logs:*:*:*",
  34. "Effect": "Allow"
  35. }
  36. ]
  37. }
  38. EOF
  39. }
  40. resource "aws_iam_role_policy_attachment" "sample_lambda_attach_iam_policy_to_iam_role" {
  41. role = aws_iam_role.sample_lambda_role.name
  42. policy_arn = aws_iam_policy.sample_lambda_policy.arn
  43. }
  44. data "archive_file" "lambda_app_zip" {
  45. type = "zip"
  46. source_dir = "${path.module}/sample-lambda"
  47. #source_file = "index.js" #if one file
  48. output_path = "${path.module}/sample-lambda.zip"
  49. }
  50. resource "aws_lambda_function" "sample_lambda" {
  51. filename = "${path.module}/sample-lambda.zip"
  52. function_name = "sample-lambda"
  53. role = aws_iam_role.sample_lambda_role.arn
  54. handler = "index.handler"
  55. source_code_hash = data.archive_file.lambda_app_zip.output_base64sha256
  56. runtime = "nodejs14.x"
  57. depends_on = [
  58. aws_iam_role_policy_attachment.sample_lambda_attach_iam_policy_to_iam_role
  59. ]
  60. }
  61. resource "aws_cloudwatch_log_group" "sample_lambda_function_log_group" {
  62. name = "/aws/lambda/${aws_lambda_function.sample_lambda.function_name}"
  63. retention_in_days = 1
  64. lifecycle {
  65. prevent_destroy = false
  66. }
  67. }
  68. resource "aws_kinesis_stream" "log_stream" {
  69. name = "terraform-kinesis-test"
  70. shard_count = 1
  71. retention_period = 24
  72. shard_level_metrics = [
  73. "IncomingBytes",
  74. "OutgoingBytes",
  75. ]
  76. }
  77. resource "aws_iam_role" "cloudwatch_ingestion_role" {
  78. name = "cloudwatch_ingestion_role"
  79. assume_role_policy = <<EOF
  80. {
  81. "Version": "2012-10-17",
  82. "Statement": [
  83. {
  84. "Action": "sts:AssumeRole",
  85. "Principal": {
  86. "Service": [
  87. "logs.amazonaws.com"
  88. ]
  89. },
  90. "Effect": "Allow",
  91. "Sid": "",
  92. "Condition": {
  93. "StringLike": { "aws:SourceArn": "arn:aws:logs:*:*:*" }
  94. }
  95. }
  96. ]
  97. }
  98. EOF
  99. }
  100. resource "aws_iam_policy" "cloudwatch_ingestion_policy" {
  101. name = "cloudwatch_ingestion_policy"
  102. path = "/"
  103. description = "AWS IAM Policy for cloudwatch logs ingestion"
  104. policy = <<EOF
  105. {
  106. "Version": "2012-10-17",
  107. "Statement": [
  108. {
  109. "Action": [
  110. "kinesis:*"
  111. ],
  112. "Resource": "arn:aws:kinesis:*:*:stream/*",
  113. "Effect": "Allow"
  114. }
  115. ]
  116. }
  117. EOF
  118. }
  119. resource "aws_iam_role_policy_attachment" "cloudwatch_ingestion_attach_iam_policy_to_iam_role" {
  120. role = aws_iam_role.cloudwatch_ingestion_role.name
  121. policy_arn = aws_iam_policy.cloudwatch_ingestion_policy.arn
  122. }
  123. resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter" {
  124. name = "sample_lambda_function_logfilter"
  125. role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
  126. log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
  127. //filter_pattern = "logtype test"
  128. filter_pattern = "" //WILL THIS WORK?
  129. destination_arn = aws_kinesis_stream.log_stream.arn
  130. distribution = "ByLogStream"
  131. }

答案1

得分: 4

我注意到评论部分关于filter_pattern值的讨论,因此我进行了有空格和没有空格的订阅过滤模式值的实验,并观察到两种过滤模式的行为相同。

以下是我使用的Terraform脚本:

  1. resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
  2. name = "sample_lambda_function_logfilter_with_space"
  3. role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
  4. log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
  5. filter_pattern = " "
  6. destination_arn = aws_kinesis_stream.log_stream.arn
  7. distribution = "ByLogStream"
  8. }
  9. resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
  10. name = "sample_lambda_function_logfilter_without_space"
  11. role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
  12. log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
  13. filter_pattern = ""
  14. destination_arn = aws_kinesis_stream.log_stream.arn
  15. distribution = "ByLogStream"
  16. }

Terraform计划输出:

  1. # aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_with_space will be created
  2. + resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_with_space" {
  3. + destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
  4. + distribution = "ByLogStream"
  5. + filter_pattern = " "
  6. + id = (known after apply)
  7. + log_group_name = "/aws/lambda/sample-lambda"
  8. + name = "sample_lambda_function_logfilter_with_space"
  9. + role_arn = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
  10. }
  11. # aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_without_space will be created
  12. + resource "aws_cloudwatch_log_subscription_filter" "sample_lambda_function_logfilter_without_space" {
  13. + destination_arn = "arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test"
  14. + distribution = "ByLogStream"
  15. + id = (known after apply)
  16. + log_group_name = "/aws/lambda/sample-lambda"
  17. + name = "sample_lambda_function_logfilter_without_space"
  18. + role_arn = "arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role"
  19. }

请注意,如果我们传递一个没有空格值的过滤模式,Terraform在计划阶段不会分配过滤模式值。然而,Kinesis将此订阅过滤器与其他过滤器视为相同。

在AWS控制台中,如图所示,两个订阅过滤器呈现相同的模式值:

CloudWatch 订阅过滤器未将数据摄取到 Kinesis 数据流中。

因此,我们可以排除关于提到的Terraform脚本中订阅过滤模式值的任何混淆。


那么,现在可能的问题是什么呢?我主要怀疑您在Kinesis数据流上筛选记录的方式可能有问题。我怀疑您在数据查看器中获取记录时使用了Latest ShardIteratorType(起始位置下拉菜单)。

Latest ShardIteratorType 显示最新记录之后的记录。考虑到您的操作顺序,您可能首先执行了Lambda函数,然后尝试在数据查看器中筛选记录。由于这些操作之间存在时间差,当您尝试使用最新的起始位置来筛选记录时,Kinesis会在最近发布的CloudWatch日志之后生成一个数据指针,这就是您无法看到Kinesis上的任何记录的原因。


AWS CLI命令

要使用Latest ShardIteratorType 获取记录,请按照以下步骤操作:

首先,执行以下命令。这将在您的分片中的最新记录之后创建一个数据指针:

  1. aws kinesis get-shard-iterator \
  2. --stream-name terraform-kinesis-test \
  3. --shard-id shardId-000000000000 \
  4. --shard-iterator-type LATEST

命令输出:

  1. {
  2. "ShardIterator": "AAAAAAAAAAGiKQ..."
  3. }

现在,您可以执行您的Lambda函数以生成一些CloudWatch日志。然后,通过新创建的订阅过滤器将日志发送到Kinesis。

接下来,执行以下命令,使用您之前检索到的分片迭代器值从Kinesis数据流中获取记录。

  1. aws kinesis get-records \
  2. --limit 10 \
  3. --shard-iterator "AAAAAAAAAAGiKQ..."

命令输出:

  1. {
  2. "Records": [
  3. {
  4. "SequenceNumber": "49643477757265957414492357197584820922864438932158808066",
  5. "ApproximateArrivalTimestamp": "2023-08-10T23:43:56.704000+00:00",
  6. "Data": "H4sIAAAAAAAA/...",
  7. "PartitionKey": "f656f4eedc671f9bd3cea60ef85e599c"
  8. },
  9. ],
  10. "NextShardIterator": "AAAAAAAAAAFOa...",
  11. "MillisBehindLatest": 0
  12. }

您在记录部分下看到的Data字段是经过Base64编码和GZIP压缩的,其中包含CloudWatch日志,因此使用以下命令检索实际值。

  1. echo -n "<BASE64ENCODED_GZIP_COMPRESSED_DATA>" | base64 -d | zcat

上述大纲的步骤将帮助您使用AWS CLI通过Latest ShardIteratorType类型检索记录。然而,如果您的意图是直接在AWS控制台的数据查看器部分查看记录,您可以使用备选的ShardIteratorTypes

例如,当使用TRIM_HORIZON起始位置时,数据将如下图所示:

CloudWatch 订阅过滤器未将数据摄取到 Kinesis 数据流中。

要了解有关ShardIteratorTypes的更多信息,请参考此链接

英文:

I noticed discussions around the filter_pattern value in the comment section, so I conducted experiments with and without spaces in the subscription filter pattern value and observed that both filter patterns behave same.

Here is the Terraform script I used:

  1. resource &quot;aws_cloudwatch_log_subscription_filter&quot; &quot;sample_lambda_function_logfilter_with_space&quot; {
  2. name = &quot;sample_lambda_function_logfilter_with_space&quot;
  3. role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
  4. log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
  5. filter_pattern = &quot; &quot;
  6. destination_arn = aws_kinesis_stream.log_stream.arn
  7. distribution = &quot;ByLogStream&quot;
  8. }
  9. resource &quot;aws_cloudwatch_log_subscription_filter&quot; &quot;sample_lambda_function_logfilter_without_space&quot; {
  10. name = &quot;sample_lambda_function_logfilter_without_space&quot;
  11. role_arn = aws_iam_role.cloudwatch_ingestion_role.arn
  12. log_group_name = aws_cloudwatch_log_group.sample_lambda_function_log_group.name
  13. filter_pattern = &quot;&quot;
  14. destination_arn = aws_kinesis_stream.log_stream.arn
  15. distribution = &quot;ByLogStream&quot;
  16. }

Terraform plan output:

  1. # aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_with_space will be created
  2. + resource &quot;aws_cloudwatch_log_subscription_filter&quot; &quot;sample_lambda_function_logfilter_with_space&quot; {
  3. + destination_arn = &quot;arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test&quot;
  4. + distribution = &quot;ByLogStream&quot;
  5. + filter_pattern = &quot; &quot;
  6. + id = (known after apply)
  7. + log_group_name = &quot;/aws/lambda/sample-lambda&quot;
  8. + name = &quot;sample_lambda_function_logfilter_with_space&quot;
  9. + role_arn = &quot;arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role&quot;
  10. }
  11. # aws_cloudwatch_log_subscription_filter.sample_lambda_function_logfilter_without_space will be created
  12. + resource &quot;aws_cloudwatch_log_subscription_filter&quot; &quot;sample_lambda_function_logfilter_without_space&quot; {
  13. + destination_arn = &quot;arn:aws:kinesis:us-west-2:XXXXXXXXXXXX:stream/terraform-kinesis-test&quot;
  14. + distribution = &quot;ByLogStream&quot;
  15. + id = (known after apply)
  16. + log_group_name = &quot;/aws/lambda/sample-lambda&quot;
  17. + name = &quot;sample_lambda_function_logfilter_without_space&quot;
  18. + role_arn = &quot;arn:aws:iam::XXXXXXXXXXXX:role/cloudwatch_ingestion_role&quot;
  19. }

Note that if we pass a filter pattern without a space value, Terraform does not assign the filter pattern value during the planning stage. However, Kinesis treats this subscription filter the same as the other one.

In the AWS Console, as depicted in the image, both subscription filter renders the same pattern value:

CloudWatch 订阅过滤器未将数据摄取到 Kinesis 数据流中。

Hence, we can rule out any confusion regarding the subscription filter pattern value in the mentioned Terraform script.


So, what could be the issue now? My main suspicion is that the way you're filtering records on the Kinesis data stream is wrong. I suspect you might have used the Latest ShardIteratorType (starting position drop-down) when fetching records in the Data Viewer.

The Latest ShardIteratorType shows records just after the most recent record in the shard. Considering your sequence of actions, it's likely that you executed your lambda function at first and then attempted to filter records within the Data Viewer. Due to the time lag between these actions, when you try to filter the records using the Latest starting position, Kinesis generates a data pointer after the recently published CloudWatch logs, and that was the reason why you couldn't see any records on the Kinesis.


AWS CLI Commands

To fetch the records using the Latest ShardIteratorType, follow these steps:

First, execute the following command. This will create a data pointer after the most recent records present in your shard:

  1. aws kinesis get-shard-iterator \
  2. --stream-name terraform-kinesis-test \
  3. --shard-id shardId-000000000000 \
  4. --shard-iterator-type LATEST

Command Output:

  1. {
  2. &quot;ShardIterator&quot;: &quot;AAAAAAAAAAGiKQ...&quot;
  3. }

Now, you can execute your lambda function to produce some CloudWatch logs. The logs then will be sent to Kinesis through the newly created subscription filter.

Next, execute the following command to fetch records from the Kinesis data stream using the shard-iterator value that you retrieved earlier.

  1. aws kinesis get-records \
  2. --limit 10 \
  3. --shard-iterator &quot;AAAAAAAAAAGiKQ...&quot;

Command Output:

  1. {
  2. &quot;Records&quot;: [
  3. {
  4. &quot;SequenceNumber&quot;: &quot;49643477757265957414492357197584820922864438932158808066&quot;,
  5. &quot;ApproximateArrivalTimestamp&quot;: &quot;2023-08-10T23:43:56.704000+00:00&quot;,
  6. &quot;Data&quot;: &quot;H4sIAAAAAAAA/...&quot;,
  7. &quot;PartitionKey&quot;: &quot;f656f4eedc671f9bd3cea60ef85e599c&quot;
  8. },
  9. ],
  10. &quot;NextShardIterator&quot;: &quot;AAAAAAAAAAFOa...&quot;,
  11. &quot;MillisBehindLatest&quot;: 0
  12. }

The Data field that you see under the records section is base64 encoded and GZIP compressed, which has a CloudWatch log, so use the following command to retrieve the actual value.

  1. echo -n &quot;&lt;BASE64ENCODED_GZIP_COMPRESSED_DATA&gt;&quot; | base64 -d | zcat

The steps outlined above will help you in retrieving records using the Latest ShardIteratorType type via the AWS CLI. Nevertheless, if your intention is to directly view the records within the AWS Console's Data Viewer section, you can use the alternative ShardIteratorTypes.

For instance, when utilizing the TRIM_HORIZON starting position, the data will appear as shown in the image below:

CloudWatch 订阅过滤器未将数据摄取到 Kinesis 数据流中。

To know more about the ShardIteratorTypes, refer this link.

huangapple
  • 本文由 发表于 2023年8月4日 22:18:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/76836767.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定