英文:
FluentBit S3 upload with container name as key in s3
问题
以下是您要翻译的内容:
"My log file name in s3 looks like kube.var.log.containers.development-api-connect-green-58db8964cb-wrzg5_default_api-connect-fa7cafd99a1bbb8bca002c8ab5e3b2aefc774566bb7e9eb054054112f43f1e87.log/
here I want to extract only container name from tag so that s3 is well structured like this:
s3://<bucket-name>/eks/<container-name>/YYYY/MM/DD/<object-name>
I tried extracting container-name with s3_key_format_tag_delimiters
as _. but $TAG[2] has container id with it which I do not want. Can't split on -
as well since container name can be like a or a-b or a-b-c... also.
Pod name in s3 path is not feasible as pod name keeps changing for any service when new pods comes up.
Is there any way to achieve this in fluentBit?
My current configuration looks like this:
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: logging
labels:
k8s-app: fluent-bit
data:
Configuration files: server, input, filters and output
======================================================
fluent-bit.conf: |
[SERVICE]
Flush 1
Log_Level info
Daemon off
Parsers_File parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
@INCLUDE input-kubernetes.conf
@INCLUDE filter-kubernetes.conf
@INCLUDE output-elasticsearch.conf
@INCLUDE output-s3.conf
input-kubernetes.conf: |
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/default.log
Parser docker
DB /var/log/flb_kube.db
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Refresh_Interval 10
filter-kubernetes.conf: |
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_Tag_Prefix kube.var.log.containers.
Merge_Log Off
Merge_Log_Key log_processed
K8S-Logging.Parser On
K8S-Logging.Exclude Off
output-elasticsearch.conf: |
[OUTPUT]
Name es
Match *
Host ${FLUENT_ELASTICSEARCH_HOST}
Port ${FLUENT_ELASTICSEARCH_PORT}
Logstash_Format On
Replace_Dots On
Retry_Limit False
tls On
tls.verify Off
output-s3.conf: |
[OUTPUT]
Name s3
Match *
bucket dev-eks-logs
region us-east-1
total_file_size 250M
s3_key_format /eks/%Y/%m/%d/
s3_key_format_tag_delimiters .
parsers.conf: |
[PARSER]
Name apache
Format regex
Regex ^(?<host>[^ ]) [^ ] (?<user>[^ ]) [(?<time>[^]])] "(?<method>\S+)(?: +(?<path>[^"]?)(?: +\S)?)?" (?<code>[^ ]) (?<size>[^ ])(?: "(?<referer>[^"])" "(?<agent>[^"])")?$
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z
[PARSER]
Name apache2
Format regex
Regex ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
Time_Key time
Time_Format %d/%b/%Y:%H:M:S %z
[PARSER]
Name apache_error
Format regex
Regex ^\[[^ ]* (?<time>[^\]]*)\] \[(?<level>[^\]]*)\](?: \[pid (?<pid>[^\]]*)\])?( \[client (?<client>[^\]]*)\])? (?<message>.*)$
[PARSER]
Name nginx
Format regex
Regex ^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
Time_Key time
Time_Format %d/%b/%Y:%H:M:S %z
[PARSER]
Name json
Format json
Time_Key time
Time_Format %d/%b/%Y:%H:M:S %z
[PARSER]
Name docker
Format json
Time_Key time
Time_Format %Y-%m-%dT%H:M:S.%L
Time_Keep On
[PARSER]
# http://rubular.com/r/tjUt3Awgg4
Name cri
Format regex
Regex ^(?<time>[^ ]+) (?<stream>stdout|stderr) (?<logtag>[^ ]*) (?<message>.*)$
Time_Key time
Time_Format %
英文:
My log file name in s3 looks like kube.var.log.containers.development-api-connect-green-58db8964cb-wrzg5_default_api-connect-fa7cafd99a1bbb8bca002c8ab5e3b2aefc774566bb7e9eb054054112f43f1e87.log/
here I want to extract only container name from tag so that s3 is well structured like this :
s3://<bucket-name>/eks/<container-name>/YYYY/MM/DD/<object-name>
I tried extracting container-name with s3_key_format_tag_delimiters
as _. but $TAG[2] has container id with it which I do not want. Can't split on -
as well since container name can be like a or a-b or a-b-c... also.
Pod name in s3 path is not feasible as pod name keeps changing for any service when new pods comes up.
Is there any way to achieve this in fluentBit?
My current configuration looks like this:
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: logging
labels:
k8s-app: fluent-bit
data:
# Configuration files: server, input, filters and output
# ======================================================
fluent-bit.conf: |
[SERVICE]
Flush 1
Log_Level info
Daemon off
Parsers_File parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
@INCLUDE input-kubernetes.conf
@INCLUDE filter-kubernetes.conf
@INCLUDE output-elasticsearch.conf
@INCLUDE output-s3.conf
input-kubernetes.conf: |
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/*_default_*.log
Parser docker
DB /var/log/flb_kube.db
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Refresh_Interval 10
filter-kubernetes.conf: |
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_Tag_Prefix kube.var.log.containers.
Merge_Log Off
Merge_Log_Key log_processed
K8S-Logging.Parser On
K8S-Logging.Exclude Off
output-elasticsearch.conf: |
[OUTPUT]
Name es
Match *
Host ${FLUENT_ELASTICSEARCH_HOST}
Port ${FLUENT_ELASTICSEARCH_PORT}
Logstash_Format On
Replace_Dots On
Retry_Limit False
tls On
tls.verify Off
output-s3.conf: |
[OUTPUT]
Name s3
Match *
bucket dev-eks-logs
region us-east-1
total_file_size 250M
s3_key_format /eks/%Y/%m/%d/
s3_key_format_tag_delimiters .
parsers.conf: |
[PARSER]
Name apache
Format regex
Regex ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z
[PARSER]
Name apache2
Format regex
Regex ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z
[PARSER]
Name apache_error
Format regex
Regex ^\[[^ ]* (?<time>[^\]]*)\] \[(?<level>[^\]]*)\](?: \[pid (?<pid>[^\]]*)\])?( \[client (?<client>[^\]]*)\])? (?<message>.*)$
[PARSER]
Name nginx
Format regex
Regex ^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z
[PARSER]
Name json
Format json
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z
[PARSER]
Name docker
Format json
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%L
Time_Keep On
[PARSER]
# http://rubular.com/r/tjUt3Awgg4
Name cri
Format regex
Regex ^(?<time>[^ ]+) (?<stream>stdout|stderr) (?<logtag>[^ ]*) (?<message>.*)$
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%L%z
[PARSER]
Name syslog
Format regex
Regex ^\<(?<pri>[0-9]+)\>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$
Time_Key time
Time_Format %b %d %H:%M:%S
答案1
得分: 1
使用以下代码,已经经过测试并对我有效。正如所述,您还在使用ES,分享用于动态索引的代码。
注意:使用configMap设置ENVIRONMENT和CLUSTER_NAME。
确保将INPUT、FILTER和OUTPUT块放置正确。
挂载这两个文件,负责动态解析s3路径和ES索引 -
希望这能正常工作。愉快使用!
英文:
Use below code, its been tested & working for me. As its mentioned, you are also using ES, sharing code for dynamic indices as well.
Note: Set ENVIRONMENT, CLUSTER_NAME using configMap.
[FILTER]
Name record_modifier
Match *
Record cluster_name ${CLUSTER_NAME}
[FILTER]
Name record_modifier
Match *
Record app_name "labels_not_specified"
[FILTER]
Name lua
Match kube.*
script es_index.lua
call append_es_index
[FILTER]
name lua
alias set_std_keys
match kube.*
script s3_path.lua
call set_std_keys
[FILTER]
name rewrite_tag
match kube.*
rule $log ^.*$ s3.${ENVIRONMENT}.$cluster_name.$namespace_name.$app_name.$container_name true
Make sure INPUT, FILTER and OUTPUT block are placed correctly.
[OUTPUT]
Name es
Match kube.*
Host xxxx
Port yyyy
HTTP_User dummy_username
HTTP_Passwd dummy_password
Retry_Limit False
Logstash_Format On
Logstash_Prefix_Key $es_index
Logstash_DateFormat %Y-%m-%d
Suppress_Type_Name On
Tls On
Type _doc
Retry_Limit 5
Buffer_Size 50M
#Trace_Error On
[OUTPUT]
name s3
match s3.*
region ap-south-1
bucket dummy-centralized-logging
upload_timeout 2m
use_put_object On
content_type application/json
compression gzip
preserve_data_ordering On
total_file_size 250M
s3_key_format /$TAG[1]/$TAG[2]/$TAG[3]/$TAG[4]/$TAG[5]/%Y-%m-%d-%H/$UUID-%M-%S.log
s3_key_format_tag_delimiters .
Mount these 2 files which is responsible for dynamic parsing for s3 path & es indices -
s3_path.lua: |
function set_std_keys(tag, timestamp, record)
-- Pull up cluster
if (record["cluster_name"] ~= nil) then
record["cluster_name"] = record["cluster_name"]
else
record["cluster_name"] = "k8s"
end
if (record["kubernetes"] ~= nil) then
kube = record["kubernetes"]
-- Pull up namespace
if (kube["namespace_name"] ~= nil and string.len(kube["namespace_name"]) > 0) then
record["namespace_name"] = kube["namespace_name"]
else
record["namespace_name"] = "default"
end
-- Pull up container name
if (kube["container_name"] ~= nil and string.len(kube["container_name"]) > 0) then
record["container_name"] = kube["container_name"]
end
-- Pull up app name (Deployment, StateFuleSets, DaemonSet, Job, CronJob etc)
if (kube["labels"] ~= nil) then
labels = kube["labels"]
if (labels["app"] ~= nil and string.len(labels["app"]) > 0) then
record["app_name"] = labels["app"]
elseif (labels["app.kubernetes.io/instance"] ~= nil and string.len(labels["app.kubernetes.io/instance"]) > 0) then
record["app_name"] = labels["app.kubernetes.io/instance"]
elseif (labels["k8s-app"] ~= nil and string.len(labels["k8s-app"]) > 0) then
record["app_name"] = labels["k8s-app"]
elseif (labels["name"] ~= nil and string.len(labels["name"]) > 0) then
record["app_name"] = labels["name"]
end
else
record["app_name"] = record["app_name"]
end
end
return 2, timestamp, record
end
es_index.lua: |
function append_es_index(tag, timestamp, record)
new_record = record
if (record["cluster_name"] ~= nil) then
es_index = record["cluster_name"]
else
es_index = "k8s"
end
if (record["kubernetes"] ~= nil) then
kube = record["kubernetes"]
if (kube["namespace_name"] ~= nil and string.len(kube["namespace_name"]) > 0) then
es_index = es_index .. "." .. kube["namespace_name"]
else
es_index = es_index .. "." .. "default"
end
if (kube["labels"] ~= nil) then
labels = kube["labels"]
if (labels["app"] ~= nil and string.len(labels["app"]) > 0) then
es_index = es_index .. "." .. labels["app"]
elseif (labels["app.kubernetes.io/instance"] ~= nil and string.len(labels["app.kubernetes.io/instance"]) > 0) then
es_index = es_index .. "." .. labels["app.kubernetes.io/instance"]
elseif (labels["k8s-app"] ~= nil and string.len(labels["k8s-app"]) > 0) then
es_index = es_index .. "." .. labels["k8s-app"]
elseif (labels["name"] ~= nil and string.len(labels["name"]) > 0) then
es_index = es_index .. "." .. labels["name"]
end
else
es_index = es_index .. "." .. record["app_name"]
end
end
new_record["es_index"] = es_index
return 1, timestamp, new_record
end
Hope this will work. Enjoy!!
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论