什么是将MySQL表数据实时传输到ElasticSearch的最佳方式?

huangapple go评论76阅读模式
英文:

What's the best way to feed MySQL table data to ElasticSearch in real-time?

问题

我需要将部署在AWS上的RDS的MySQL表数据实时或近实时(可能有几分钟的延迟)导入ElasticSearch,并在此过程中连接一些表。

我已经调查过的第一个选项是Flink。但经过一些研究,我找不到一种流式传输表数据更改的方法,因为这些表不是只追加的。

然后我发现有些人在谈论CDC(Change Data Capture),基本上是将MySQL binlog更改流式传输到一个Lambda函数,然后解析它,然后将其发布到ElasticSearch,但这听起来太复杂且容易出错。

是否有任何行业已经尝试并验证过的方法来将非只追加的表同步到ElasticSearch?

英文:

I need to feed MySQL (deployed with RDS on AWS) table data into ElasticSearch in real-time or near-real-time (maybe several minutes of delay), joining a couple tables in the process.

The first option I have investigated is Flink. But after some research I couldn't find a way to stream table data change because the tables are not append-only.

Then I found that some people are talking about CDC(Change Data Capture), basically streaming MySQL binlog changes to a lambda and parse it then post to ElasticSearch, but this just sounds too complicated and error prone.

Is there any industry tried-and-true ways to sync non-append-only tables to ElasticSearch?

答案1

得分: 1

你可以使用Logstash脚本从MySQL中提取数据到Elasticsearch。

示例Logstash代码:

input {
  jdbc {
    jdbc_driver_library => "<pathToYourDataBaseDriver>/mysql-connector-java-5.1.39.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/ecomdb"
    jdbc_user => <db用户名>
    jdbc_password => <db密码>
    tracking_column¹ => "regdate"
    use_column_value²=>true
    statement => "SELECT * FROM ecomdb.customer where regdate >:sql_last_value;"
    schedule³ => " * * * * * *"
  }
}
output {
  elasticsearch {
    document_id=>" %{id}"
    document_type => "doc"
    index => "test"
    hosts => ["http://localhost:9200"]
  }
  stdout{
    codec => rubydebug
  }
}

¹跟踪列
²使用列值
³计划
⁴文档ID

英文:

You can use the logstash script to fetch data from mysql to elasticsearch.

Sample Logstash Code

    input {
  jdbc {
    jdbc_driver_library =&gt; &quot;&lt;pathToYourDataBaseDriver&gt;\mysql-connector-java-5.1.39.jar&quot;
    jdbc_driver_class =&gt; &quot;com.mysql.jdbc.Driver&quot;
    jdbc_connection_string =&gt; &quot;jdbc:mysql://localhost:3306/ecomdb&quot;
    jdbc_user =&gt; &lt;db username&gt;
    jdbc_password =&gt; &lt;db password&gt;
    tracking_column&#185; =&gt; &quot;regdate&quot;
    use_column_value&#178;=&gt;true
    statement =&gt; &quot;SELECT * FROM ecomdb.customer where regdate &gt;:sql_last_value;&quot;
    schedule&#179; =&gt; &quot; * * * * * *&quot;
  }
}
output {
  elasticsearch {
    document_id⁴=&gt; &quot;%{id}&quot;
    document_type =&gt; &quot;doc&quot;
    index =&gt; &quot;test&quot;
    hosts =&gt; [&quot;http://localhost:9200&quot;]
  }
  stdout{
  codec =&gt; rubydebug
  }
}

huangapple
  • 本文由 发表于 2020年1月6日 20:21:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/59612043.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定