英文:
What's the best way to feed MySQL table data to ElasticSearch in real-time?
问题
我需要将部署在AWS上的RDS的MySQL表数据实时或近实时(可能有几分钟的延迟)导入ElasticSearch,并在此过程中连接一些表。
我已经调查过的第一个选项是Flink。但经过一些研究,我找不到一种流式传输表数据更改的方法,因为这些表不是只追加的。
然后我发现有些人在谈论CDC(Change Data Capture),基本上是将MySQL binlog更改流式传输到一个Lambda函数,然后解析它,然后将其发布到ElasticSearch,但这听起来太复杂且容易出错。
是否有任何行业已经尝试并验证过的方法来将非只追加的表同步到ElasticSearch?
英文:
I need to feed MySQL (deployed with RDS on AWS) table data into ElasticSearch in real-time or near-real-time (maybe several minutes of delay), joining a couple tables in the process.
The first option I have investigated is Flink. But after some research I couldn't find a way to stream table data change because the tables are not append-only.
Then I found that some people are talking about CDC(Change Data Capture), basically streaming MySQL binlog changes to a lambda and parse it then post to ElasticSearch, but this just sounds too complicated and error prone.
Is there any industry tried-and-true ways to sync non-append-only tables to ElasticSearch?
答案1
得分: 1
你可以使用Logstash脚本从MySQL中提取数据到Elasticsearch。
示例Logstash代码:
input {
jdbc {
jdbc_driver_library => "<pathToYourDataBaseDriver>/mysql-connector-java-5.1.39.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/ecomdb"
jdbc_user => <db用户名>
jdbc_password => <db密码>
tracking_column¹ => "regdate"
use_column_value²=>true
statement => "SELECT * FROM ecomdb.customer where regdate >:sql_last_value;"
schedule³ => " * * * * * *"
}
}
output {
elasticsearch {
document_id⁴=>" %{id}"
document_type => "doc"
index => "test"
hosts => ["http://localhost:9200"]
}
stdout{
codec => rubydebug
}
}
¹跟踪列
²使用列值
³计划
⁴文档ID
英文:
You can use the logstash script to fetch data from mysql to elasticsearch.
Sample Logstash Code
input {
jdbc {
jdbc_driver_library => "<pathToYourDataBaseDriver>\mysql-connector-java-5.1.39.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/ecomdb"
jdbc_user => <db username>
jdbc_password => <db password>
tracking_column¹ => "regdate"
use_column_value²=>true
statement => "SELECT * FROM ecomdb.customer where regdate >:sql_last_value;"
schedule³ => " * * * * * *"
}
}
output {
elasticsearch {
document_id⁴=> "%{id}"
document_type => "doc"
index => "test"
hosts => ["http://localhost:9200"]
}
stdout{
codec => rubydebug
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论