FluentBit S3上传,将容器名称用作S3中的键

huangapple go评论66阅读模式
英文:

FluentBit S3 upload with container name as key in s3

问题

以下是您要翻译的内容:

"My log file name in s3 looks like kube.var.log.containers.development-api-connect-green-58db8964cb-wrzg5_default_api-connect-fa7cafd99a1bbb8bca002c8ab5e3b2aefc774566bb7e9eb054054112f43f1e87.log/ here I want to extract only container name from tag so that s3 is well structured like this:

s3://<bucket-name>/eks/<container-name>/YYYY/MM/DD/<object-name>

I tried extracting container-name with s3_key_format_tag_delimiters as _. but $TAG[2] has container id with it which I do not want. Can't split on - as well since container name can be like a or a-b or a-b-c... also.

Pod name in s3 path is not feasible as pod name keeps changing for any service when new pods comes up.

Is there any way to achieve this in fluentBit?

My current configuration looks like this:

apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: logging
labels:
k8s-app: fluent-bit
data:

Configuration files: server, input, filters and output

======================================================

fluent-bit.conf: |
[SERVICE]
Flush 1
Log_Level info
Daemon off
Parsers_File parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020

@INCLUDE input-kubernetes.conf
@INCLUDE filter-kubernetes.conf
@INCLUDE output-elasticsearch.conf
@INCLUDE output-s3.conf

input-kubernetes.conf: |
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/default.log
Parser docker
DB /var/log/flb_kube.db
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Refresh_Interval 10

filter-kubernetes.conf: |
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_Tag_Prefix kube.var.log.containers.
Merge_Log Off
Merge_Log_Key log_processed
K8S-Logging.Parser On
K8S-Logging.Exclude Off

output-elasticsearch.conf: |
[OUTPUT]
Name es
Match *
Host ${FLUENT_ELASTICSEARCH_HOST}
Port ${FLUENT_ELASTICSEARCH_PORT}
Logstash_Format On
Replace_Dots On
Retry_Limit False
tls On
tls.verify Off
output-s3.conf: |
[OUTPUT]
Name s3
Match *
bucket dev-eks-logs
region us-east-1
total_file_size 250M
s3_key_format /eks/%Y/%m/%d/
s3_key_format_tag_delimiters .

parsers.conf: |
[PARSER]
Name apache
Format regex
Regex ^(?<host>[^ ]) [^ ] (?<user>[^ ]) [(?<time>[^]])] "(?<method>\S+)(?: +(?<path>[^&quot;]?)(?: +\S)?)?" (?<code>[^ ]) (?<size>[^ ])(?: "(?<referer>[^&quot;])" "(?<agent>[^&quot;])")?$
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z

[PARSER]
    Name   apache2
    Format regex
    Regex  ^(?&lt;host&gt;[^ ]*) [^ ]* (?&lt;user&gt;[^ ]*) \[(?&lt;time&gt;[^\]]*)\] &quot;(?&lt;method&gt;\S+)(?: +(?&lt;path&gt;[^ ]*) +\S*)?&quot; (?&lt;code&gt;[^ ]*) (?&lt;size&gt;[^ ]*)(?: &quot;(?&lt;referer&gt;[^\&quot;]*)&quot; &quot;(?&lt;agent&gt;[^\&quot;]*)&quot;)?$
    Time_Key time
    Time_Format %d/%b/%Y:%H:M:S %z

[PARSER]
    Name   apache_error
    Format regex
    Regex  ^\[[^ ]* (?&lt;time&gt;[^\]]*)\] \[(?&lt;level&gt;[^\]]*)\](?: \[pid (?&lt;pid&gt;[^\]]*)\])?( \[client (?&lt;client&gt;[^\]]*)\])? (?&lt;message&gt;.*)$

[PARSER]
    Name   nginx
    Format regex
    Regex ^(?&lt;remote&gt;[^ ]*) (?&lt;host&gt;[^ ]*) (?&lt;user&gt;[^ ]*) \[(?&lt;time&gt;[^\]]*)\] &quot;(?&lt;method&gt;\S+)(?: +(?&lt;path&gt;[^\&quot;]*?)(?: +\S*)?)?&quot; (?&lt;code&gt;[^ ]*) (?&lt;size&gt;[^ ]*)(?: &quot;(?&lt;referer&gt;[^\&quot;]*)&quot; &quot;(?&lt;agent&gt;[^\&quot;]*)&quot;)?$
    Time_Key time
    Time_Format %d/%b/%Y:%H:M:S %z

[PARSER]
    Name   json
    Format json
    Time_Key time
    Time_Format %d/%b/%Y:%H:M:S %z

[PARSER]
    Name        docker
    Format      json
    Time_Key    time
    Time_Format %Y-%m-%dT%H:M:S.%L
    Time_Keep   On

[PARSER]
    # http://rubular.com/r/tjUt3Awgg4
    Name cri
    Format regex
    Regex ^(?&lt;time&gt;[^ ]+) (?&lt;stream&gt;stdout|stderr) (?&lt;logtag&gt;[^ ]*) (?&lt;message&gt;.*)$
    Time_Key    time
    Time_Format %
英文:

My log file name in s3 looks like kube.var.log.containers.development-api-connect-green-58db8964cb-wrzg5_default_api-connect-fa7cafd99a1bbb8bca002c8ab5e3b2aefc774566bb7e9eb054054112f43f1e87.log/ here I want to extract only container name from tag so that s3 is well structured like this :

s3://&lt;bucket-name&gt;/eks/&lt;container-name&gt;/YYYY/MM/DD/&lt;object-name&gt;

I tried extracting container-name with s3_key_format_tag_delimiters as _. but $TAG[2] has container id with it which I do not want. Can't split on - as well since container name can be like a or a-b or a-b-c... also.

Pod name in s3 path is not feasible as pod name keeps changing for any service when new pods comes up.

Is there any way to achieve this in fluentBit?

My current configuration looks like this:

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-config
  namespace: logging
  labels:
    k8s-app: fluent-bit
data:
  # Configuration files: server, input, filters and output
  # ======================================================
  fluent-bit.conf: |
    [SERVICE]
        Flush         1
        Log_Level     info
        Daemon        off
        Parsers_File  parsers.conf
        HTTP_Server   On
        HTTP_Listen   0.0.0.0
        HTTP_Port     2020

    @INCLUDE input-kubernetes.conf
    @INCLUDE filter-kubernetes.conf
    @INCLUDE output-elasticsearch.conf
    @INCLUDE output-s3.conf

  input-kubernetes.conf: |
    [INPUT]
        Name              tail
        Tag               kube.*
        Path              /var/log/containers/*_default_*.log
        Parser            docker
        DB                /var/log/flb_kube.db
        Mem_Buf_Limit     5MB
        Skip_Long_Lines   On
        Refresh_Interval  10

  filter-kubernetes.conf: |
    [FILTER]
        Name                kubernetes
        Match               kube.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
        Kube_Tag_Prefix     kube.var.log.containers.
        Merge_Log           Off
        Merge_Log_Key       log_processed
        K8S-Logging.Parser  On
        K8S-Logging.Exclude Off

  output-elasticsearch.conf: |
    [OUTPUT]
        Name            es
        Match           *
        Host            ${FLUENT_ELASTICSEARCH_HOST}
        Port            ${FLUENT_ELASTICSEARCH_PORT}
        Logstash_Format On
        Replace_Dots    On
        Retry_Limit     False
        tls On
        tls.verify Off
  output-s3.conf: |
    [OUTPUT]
        Name            s3
        Match           *
        bucket                       dev-eks-logs
        region                       us-east-1
        total_file_size              250M
        s3_key_format                /eks/%Y/%m/%d/
        s3_key_format_tag_delimiters .
  parsers.conf: |
    [PARSER]
        Name   apache
        Format regex
        Regex  ^(?&lt;host&gt;[^ ]*) [^ ]* (?&lt;user&gt;[^ ]*) \[(?&lt;time&gt;[^\]]*)\] &quot;(?&lt;method&gt;\S+)(?: +(?&lt;path&gt;[^\&quot;]*?)(?: +\S*)?)?&quot; (?&lt;code&gt;[^ ]*) (?&lt;size&gt;[^ ]*)(?: &quot;(?&lt;referer&gt;[^\&quot;]*)&quot; &quot;(?&lt;agent&gt;[^\&quot;]*)&quot;)?$
        Time_Key time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name   apache2
        Format regex
        Regex  ^(?&lt;host&gt;[^ ]*) [^ ]* (?&lt;user&gt;[^ ]*) \[(?&lt;time&gt;[^\]]*)\] &quot;(?&lt;method&gt;\S+)(?: +(?&lt;path&gt;[^ ]*) +\S*)?&quot; (?&lt;code&gt;[^ ]*) (?&lt;size&gt;[^ ]*)(?: &quot;(?&lt;referer&gt;[^\&quot;]*)&quot; &quot;(?&lt;agent&gt;[^\&quot;]*)&quot;)?$
        Time_Key time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name   apache_error
        Format regex
        Regex  ^\[[^ ]* (?&lt;time&gt;[^\]]*)\] \[(?&lt;level&gt;[^\]]*)\](?: \[pid (?&lt;pid&gt;[^\]]*)\])?( \[client (?&lt;client&gt;[^\]]*)\])? (?&lt;message&gt;.*)$

    [PARSER]
        Name   nginx
        Format regex
        Regex ^(?&lt;remote&gt;[^ ]*) (?&lt;host&gt;[^ ]*) (?&lt;user&gt;[^ ]*) \[(?&lt;time&gt;[^\]]*)\] &quot;(?&lt;method&gt;\S+)(?: +(?&lt;path&gt;[^\&quot;]*?)(?: +\S*)?)?&quot; (?&lt;code&gt;[^ ]*) (?&lt;size&gt;[^ ]*)(?: &quot;(?&lt;referer&gt;[^\&quot;]*)&quot; &quot;(?&lt;agent&gt;[^\&quot;]*)&quot;)?$
        Time_Key time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name   json
        Format json
        Time_Key time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name        docker
        Format      json
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L
        Time_Keep   On

    [PARSER]
        # http://rubular.com/r/tjUt3Awgg4
        Name cri
        Format regex
        Regex ^(?&lt;time&gt;[^ ]+) (?&lt;stream&gt;stdout|stderr) (?&lt;logtag&gt;[^ ]*) (?&lt;message&gt;.*)$
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L%z

    [PARSER]
        Name        syslog
        Format      regex
        Regex       ^\&lt;(?&lt;pri&gt;[0-9]+)\&gt;(?&lt;time&gt;[^ ]* {1,2}[^ ]* [^ ]*) (?&lt;host&gt;[^ ]*) (?&lt;ident&gt;[a-zA-Z0-9_\/\.\-]*)(?:\[(?&lt;pid&gt;[0-9]+)\])?(?:[^\:]*\:)? *(?&lt;message&gt;.*)$
        Time_Key    time
        Time_Format %b %d %H:%M:%S

答案1

得分: 1

使用以下代码,已经经过测试并对我有效。正如所述,您还在使用ES,分享用于动态索引的代码。

注意:使用configMap设置ENVIRONMENT和CLUSTER_NAME。

确保将INPUT、FILTER和OUTPUT块放置正确。

挂载这两个文件,负责动态解析s3路径和ES索引 -

希望这能正常工作。愉快使用!

英文:

Use below code, its been tested & working for me. As its mentioned, you are also using ES, sharing code for dynamic indices as well.

Note: Set ENVIRONMENT, CLUSTER_NAME using configMap.

    [FILTER]
        Name    record_modifier
        Match *
        Record cluster_name ${CLUSTER_NAME}
    [FILTER]
        Name    record_modifier
        Match *
        Record app_name &quot;labels_not_specified&quot;
    [FILTER]
        Name    lua
        Match   kube.*
        script  es_index.lua
        call    append_es_index
    [FILTER]
        name    lua
        alias   set_std_keys
        match   kube.*
        script  s3_path.lua
        call   set_std_keys
    [FILTER]
        name rewrite_tag
        match kube.*
        rule $log ^.*$ s3.${ENVIRONMENT}.$cluster_name.$namespace_name.$app_name.$container_name true

Make sure INPUT, FILTER and OUTPUT block are placed correctly.

    [OUTPUT]
        Name            es
        Match           kube.*
        Host            xxxx
        Port            yyyy
        HTTP_User       dummy_username
        HTTP_Passwd     dummy_password
        Retry_Limit     False
        Logstash_Format On
        Logstash_Prefix_Key $es_index
        Logstash_DateFormat %Y-%m-%d
        Suppress_Type_Name On
        Tls   On
        Type  _doc
        Retry_Limit 5
        Buffer_Size 50M
        #Trace_Error On

    [OUTPUT]
        name s3
        match s3.*
        region ap-south-1
        bucket dummy-centralized-logging
        upload_timeout 2m
        use_put_object On
        content_type application/json
        compression gzip
        preserve_data_ordering On
        total_file_size              250M
        s3_key_format /$TAG[1]/$TAG[2]/$TAG[3]/$TAG[4]/$TAG[5]/%Y-%m-%d-%H/$UUID-%M-%S.log
        s3_key_format_tag_delimiters .

Mount these 2 files which is responsible for dynamic parsing for s3 path & es indices -


  s3_path.lua: |
    function set_std_keys(tag, timestamp, record)

        -- Pull up cluster
        if (record[&quot;cluster_name&quot;] ~= nil) then
            record[&quot;cluster_name&quot;] = record[&quot;cluster_name&quot;]
        else
            record[&quot;cluster_name&quot;] = &quot;k8s&quot;
        end

        if (record[&quot;kubernetes&quot;] ~= nil) then
            kube = record[&quot;kubernetes&quot;]

            -- Pull up namespace
            if (kube[&quot;namespace_name&quot;] ~= nil and string.len(kube[&quot;namespace_name&quot;]) &gt; 0) then
                record[&quot;namespace_name&quot;] = kube[&quot;namespace_name&quot;]
            else
                record[&quot;namespace_name&quot;] = &quot;default&quot;
            end

            -- Pull up container name
            if (kube[&quot;container_name&quot;] ~= nil and string.len(kube[&quot;container_name&quot;]) &gt; 0) then
                record[&quot;container_name&quot;] = kube[&quot;container_name&quot;]
            end

            -- Pull up app name (Deployment, StateFuleSets, DaemonSet, Job, CronJob etc)
            if (kube[&quot;labels&quot;] ~= nil) then
                labels = kube[&quot;labels&quot;]

                if (labels[&quot;app&quot;] ~= nil and string.len(labels[&quot;app&quot;]) &gt; 0) then
                    record[&quot;app_name&quot;] = labels[&quot;app&quot;]
                elseif (labels[&quot;app.kubernetes.io/instance&quot;] ~= nil and string.len(labels[&quot;app.kubernetes.io/instance&quot;]) &gt; 0) then
                    record[&quot;app_name&quot;] = labels[&quot;app.kubernetes.io/instance&quot;]
                elseif (labels[&quot;k8s-app&quot;] ~= nil and string.len(labels[&quot;k8s-app&quot;]) &gt; 0) then
                    record[&quot;app_name&quot;] = labels[&quot;k8s-app&quot;]
                elseif (labels[&quot;name&quot;] ~= nil and string.len(labels[&quot;name&quot;]) &gt; 0) then
                    record[&quot;app_name&quot;] = labels[&quot;name&quot;]
                end
            else
                record[&quot;app_name&quot;] = record[&quot;app_name&quot;]
            end
        end

      return 2, timestamp, record
    end

  es_index.lua: |
    function append_es_index(tag, timestamp, record)
      new_record = record

      if (record[&quot;cluster_name&quot;] ~= nil) then
          es_index = record[&quot;cluster_name&quot;]
      else
          es_index = &quot;k8s&quot;
      end

      if (record[&quot;kubernetes&quot;] ~= nil) then
          kube = record[&quot;kubernetes&quot;]
          if (kube[&quot;namespace_name&quot;] ~= nil and string.len(kube[&quot;namespace_name&quot;]) &gt; 0) then
              es_index = es_index .. &quot;.&quot; .. kube[&quot;namespace_name&quot;]
          else
              es_index = es_index .. &quot;.&quot; .. &quot;default&quot;
          end

          if (kube[&quot;labels&quot;] ~= nil) then
              labels = kube[&quot;labels&quot;]

              if (labels[&quot;app&quot;] ~= nil and string.len(labels[&quot;app&quot;]) &gt; 0) then
                  es_index = es_index .. &quot;.&quot; ..  labels[&quot;app&quot;]
              elseif (labels[&quot;app.kubernetes.io/instance&quot;] ~= nil and string.len(labels[&quot;app.kubernetes.io/instance&quot;]) &gt; 0) then
                   es_index = es_index .. &quot;.&quot; .. labels[&quot;app.kubernetes.io/instance&quot;]
              elseif (labels[&quot;k8s-app&quot;] ~= nil and string.len(labels[&quot;k8s-app&quot;]) &gt; 0) then
                   es_index = es_index .. &quot;.&quot; .. labels[&quot;k8s-app&quot;]
              elseif (labels[&quot;name&quot;] ~= nil and string.len(labels[&quot;name&quot;]) &gt; 0) then
                   es_index = es_index .. &quot;.&quot; .. labels[&quot;name&quot;]
              end
          else
              es_index = es_index .. &quot;.&quot; .. record[&quot;app_name&quot;]
          end
      end

      new_record[&quot;es_index&quot;] =  es_index

      return 1, timestamp, new_record
    end

Hope this will work. Enjoy!!

huangapple
  • 本文由 发表于 2023年2月14日 03:49:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/75440567.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定