英文:
filebeat ingress timestamp replace with application timestamp
问题
I'm using filebeat and kafka and wanted to replace ingress filebeat timestamp with application timestamp. I saw few example with logstash where can we add filter but not sure with kafka.
In this code I tried to replace timestamp but application_timestamp but its not worked due to date format. but this code worked for message field.
filebeat.yml config below
fields:
application_timestamp: "2023-06-07 07:49:51.196Z"
processors:
- timestamp:
field: application_timestamp
layouts:
- '2006-01-02 15:04:05.999Z'
test:
- '2019-11-18 04:59:51.123Z'
processors:
- script:
lang: javascript
id: replace_timestamp
source: >
function process(event) {
event.Put("@timestamp", event.Get("fields.application_timestamp"));
return [event];
}
This is what you want to achieve:
英文:
I'm using filebeat and kafka and wanted to replace ingress filebeat timestamp with application timestamp. I saw few example with logstash where can we add filter but not sure with kafka.
In this code I tried to replace timestamp but application_timestamp but its not worked due to date format. but this code worked for message field.
filebeat.yml config below
fields:
application_timestamp: "2023-06-07 07:49:51.196Z"
processors:
- timestamp:
field: application_timestamp
layouts:
- '2006-01-02 15:04:05.999Z'
test:
- '2019-11-18 04:59:51.123Z'
processors:
- script:
lang: javascript
id: replace_timestamp
source: >
function process(event) {
event.Put("@timestamp", event.Get("fields.application_timestamp"));
return [event];
}
答案1
得分: 0
我建议使用 ingest pipeline 来实现你想要的目标。
在这篇 文章 中,你可以找到一个很好的示例。
英文:
I recommend using ingest pipeline to achieve what you want.
In this article, you can find a good example.
答案2
得分: 0
已更新的工作文件 filebeat.yml
:
filebeat.inputs:
- type: log
enabled: true
tags:
- test-kafka
paths:
- /Users/Documents/kafka_testing/logs/test.log
json.keys_under_root: true
json.add_error_key: true
output.kafka:
# 指定 filebeat 获取时间戳和消息字段,否则它将将行视为 json 并发布到 kafka
codec.format:
string: '%{[@timestamp]} %{[message]} %{[application_time]}'
# kafka
# 发布到 'test' 主题
hosts: ["localhost:9092"]
topic: 'test'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
processors:
- dissect:
tokenizer: "{type:test,application_time:%{application_time}}"
target_prefix: ""
- timestamp:
field: application_time
layouts:
- '2006-01-02 15:04:05.99'
test:
- '2019-11-18 04:59:51.12'
结果/输出:
2021-06-25T19:25:30.000Z {type:test,application_time:2021-06-25 19:25:30} 2021-06-25 19:25:30
英文:
Updated working filebeat.yml
filebeat.inputs:
- type: log
enabled: true
tags:
- test-kafka
paths:
- /Users/Documents/kafka_testing/logs/test.log
json.keys_under_root: true
json.add_error_key: true
output.kafka:
# specifying filebeat to take timestamp and message fields, other wise it
# take the lines as json and publish to kafka
codec.format:
string: '%{[@timestamp]} %{[message]} %{[application_time]}'
# kafka
# publishing to 'test' topic
hosts: ["localhost:9092"]
topic: 'test'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
processors:
- dissect:
tokenizer: "{type:test,application_time:%{application_time}}"
target_prefix: ""
- timestamp:
field: application_time
layouts:
- '2006-01-02 15:04:05.99'
test:
- '2019-11-18 04:59:51.12'
Result/Output :-
2021-06-25T19:25:30.000Z {type:test,application_time:2021-06-25 19:25:30} 2021-06-25 19:25:30
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论