1. ホーム
  2. データベース
  3. マイサク

MysqlからElasticsearchにデータを同期させる方法を説明します。

2022-01-07 14:36:21

I. 同期化の原理

Mysqlベースのビンログサブスクリプション。binlogログは、Mysqlがデータの変更をリアルタイムで記録するために使用される

ESへのMysqlのデータ同期には、完全同期と増分同期の2種類がある

フルシンクとは、最初のESインデックスが構築された後、MysqlからESにすべてのデータを一度にインポートすることです。

増分同期とは、Mysqlに新しいデータが生成され、この新しいデータには、Mysqlに挿入された新しいデータ、古いデータの更新、削除されたデータの3つのケースがあり、これらのデータの変更と追加をすべてESに同期させることである

II. logstash-input-jdbc

logstash公式プラグインは、logstashに統合され、logstashをダウンロードすることができ、設定ファイルを介してmysqlとelasticsearchのデータ同期を達成するために

長所

  • mysqlデータの完全同期、増分同期、スケジュール同期が可能
  • バージョンアップの繰り返しが早く、比較的安定している
  • ES固有のプラグインlogstashの一部として簡単に使用できます。

デメリット

  • 同期削除ができないため、MySQLのデータが削除された後もElasticsearchにデータが残っている。
  • 同期の最小時間差は1分で、データの同期は1分に1回なので、リアルタイムに同期することはできない

III. go-mysql-elasticsearch

go-mysql-elasticsearchは、中国の作者が開発したプラグインです。

プロフェッショナル

  • mysqlのデータを完全かつ増分的に同期することが可能

デメリット

  • Elasticsearchとの完全なデータ同期を実現できない。
  • まだ開発中であり、比較的不安定な段階

IV. elasticsearch-jdbc

elasticsearch-jdbc 現在、elasticsearch-jdbc の最新バージョンは 2.3.4 で、ElasticSearch のサポートバージョンは 2.3.4, 非実装です。

プロフェッショナル

  • mysqlのデータを完全かつ増分的に同期することが可能

デメリット

  • ElasticSearchの最新バージョンは、現在2.3.4です。
  • 同期削除操作を達成することができない、ElasticsearchのデータでMySQLのデータ削除はまだ存在している

V. logstash-input-jdbc による同期化

インストールの最初のステップです。

logstash 5.x以降では、logstash-input-jdbcプラグインが統合されています。logstashをインストールした後、logstash-input-jdbcプラグインをコマンドでインストールします。

cd /logstash-6.4.2/bin
. /logstash-plugin install logstash-input-jdbc

2つ目の設定ステップ。

logstash-6.4.2/configフォルダにjdbc.confを新規に作成し、以下のように設定します。

logstash-6.4.2/configディレクトリにjdbc.sqlファイルを新規に作成します。

select * from t_employee

ステップ3 実行

cd logstash-6.4.2
# Check that the configuration file syntax is correct
bin/logstash -f config/jdbc.conf --config.test_and_exit
# Start
bin/logstash -f config/jdbc.conf --config.reload.automatic

--config.reload.automatic: 設定ファイルの内容を自動的に再読み込みします。

インデックス作成後に同期したデータをkibanaで表示する

PUT octopus
GET octopus/_search

VI. go-mysql-elasticsearchは同期を実装しています。

ステップ1:mysqlのbinlogのロギング

go-mysql-elasticsearchはelasticsearchとmysql binlogを同期させ、データの追加、削除、変更を行います。

binlogは主にデータベースのマスター・スレーブ間レプリケーションとデータリカバリに使用されます。binlogはデータの追加、削除、変更を記録します。マスター・スレーブ間レプリケーションでは、マスターがbinlogログをスレーブに同期し、スレーブがbinlog内のイベントを再生することでマスター・スレーブの同期を達成します。

mysqlのbinlogのロギングには、以下の3つのモードがあります。

ROW: records every row of data that is modified, but the log volume is too large
STATEMENT: records every SQL statement that modifies data, reducing logging, but SQL statements using functions or triggers are prone to master-slave inconsistencies
MIXED: Combines the advantages of ROW and STATEMENT, so you can choose to log using ROW or STATEMENT depending on the specific SQL statement used to perform the data operation.

mysqlのbinlog経由でESクラスタにデータを同期させるには、ROWモードしか使えません。ROWモードだけがmysqlで何が変更されたかを知っているからです。

UPDATE操作を例に、ROWモードのbinlogの内容の例を挙げると、以下のようになります。

SET TIMESTAMP=1527917394/*! */;
    BEGIN
    /*! */;
    # at 3751
    #180602 13:29:54 server id 1 end_log_pos 3819 CRC32 0x8dabdf01 Table_map: `webservice`. `building` mapped to number 74
    # at 3819
    #180602 13:29:54 server id 1 end_log_pos 3949 CRC32 0x59a8ed85 Update_rows: table id 74 flags: STMT_END_F
    
    BINLOG '
    UisSWxMBAAAARAAAAAAOsOAAAAAEoAAAAAAAEACndlYnNlcnZpY2UACGJ1aWxkaW5nAAYIDwEPEREG
    wACAAQAAAAHfq40=
    UisSWx8BAAAAAAggAAAG0PAAAAAEoAAAAAAAEAAgAG///A1gcAAAAAAAAAALYnVpbGRpbmctMTAADwB3
    UkRNbjNLYlV5d1k3ajVbD64WWw+uFsDWBwAAAAAAAAAAtidWlsZGluZy0xMAEPAHdSRE1uM0tiVXl3
    WTdqNVsPrhZbD64Whe2oWQ==
    '/*! */;
    ### UPDATE `webservice`. `building`.
    ### WHERE
    ### @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */
    ### @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */
    ### @3=0 /* TINYINT meta=0 nullable=0 is_null=0 */
    ### @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */
    ### @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ### @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ### SET
    ### @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */
    ### @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */
    ### @3=1 /* TINYINT meta=0 nullable=0 is_null=0 */
    ### @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */
    ### @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ### @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    # at 3949
    #180602 13:29:54 server id 1 end_log_pos 3980 CRC32 0x58226b8f Xid = 182
    COMMIT/*! */;

STATEMENTモードでのbinlogの内容の例です。

SET TIMESTAMP=1527919329/*! */;
    update building set Status=1 where Id=2000
    /*! */;
    # at 688
    #180602 14:02:09 server id 1 end_log_pos 719 CRC32 0x4c550a7d Xid = 200
    COMMIT/*! */;

ROWモードとSTATEMENTモードでのUPDATE操作のログ内容から、ROWモードは変更対象の行の更新前後の全フィールドの値を完全に記録しているのに対し、STATEMENTモードはUPDATE操作のSQL文のみを単独で記録していることが分かります。mysqlのデータをリアルタイムでESに同期化するためには、ROWモードのbinlogを選択し、binlogデータの内容を取得・解析し、ESドキュメントapiを実行し、ESクラスタにデータを同期化すればよいのです。

ビンログモードの表示、変更

# View the binlog pattern
mysql> show variables like "%binlog_format%";
 
# Modify binlog mode
mysql> set global binlog_format='ROW';
 
# Check if binlog is on
mysql> show variables like 'log_bin';
 
# turn on bīnlog
Modify my.cnf file log-bin = mysql-bin

ステップ2 インストール

# Install go
sudo apt-get install go
 
# Install godep
go get github.com/tools/godep
 
# Get the go-mysql-elasticsearch plugin
go get github.com/siddontang/go-mysql-elasticsearch
 
# Install the go-mysql-elasticsearch plugin
cd go/src/github.com/siddontang/go-mysql-elasticsearch
make

ステップ3 構成

go/src/github.com/siddontang/go-mysql-elasticsearch/etc/river.toml

# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "127.0.0.1:3306" # Basic mysql settings to be synchronized
my_user = "root"
my_pass = "root"
 
# Elasticsearch address
es_addr = "127.0.0.1:9200" # Local elasticsearch configuration
 
# Path to store data, like master.info, and dump MySQL data 
data_dir = ". /var" # url for data storage
# The following configuration is saved as default
# Inner Http status address
stat_addr = "127.0.0.1:12800"
 
# pseudo server id like a slave 
server_id = 1001
 
# mysql or mariadb
flavor = "mysql"
# mysqldump execution path
mysqldump = "mysqldump"
 
# MySQL data source
[[source]]
schema = "test" //elasticsearch and mysql sync with the corresponding database name
 
# Only the following tables will be synced into Elasticsearch.
# To sync several tables in the database test. For some projects that use a split table mechanism, we can use wildcards to match, for example, t_[0-9]{4} to # match tables t_0000 to t_9999.
tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]  
 
# Below is for special rule mapping
# For a table, we need to specify the type of index to sync its data to in ES. If not, we default to using the schema # name as the ES index and type
[[rule]]
schema = "test" // database name
table = "t" //table name
index = "test" //the corresponding index name
type = "t" //corresponding type name
 
# Synchronize all tables that satisfy the format t_[0-9]{4} under the ES with index test and type t. Of course, these tables need to ensure that
# schema is the same
[[rule]]
schema = "test"
table = "t_[0-9]{4}"
index = "test"
type = "t"
 
# For table tfilter, we will only synchronize the id and name columns, nothing else will be synchronized
filter = ["id", "name"]
# column id of table tfield, we map to es_id, and tags to es_tags
# This is now commonly used in MySQL for types like varchar # and we may store data like "a,b,c" and then want to sync it to ES as into a list like [a, b, c].
 
[rule.field]
# Map column `id` to ES field `es_id`
id="es_id"
# Map column `tags` to ES field `es_tags` with array type 
tags="es_tags,list"
# Map column `keywords` to ES with array type
keywords=",list"

ステップ4 実行 

cd go/src/github.com/siddontang/go-mysql-elasticsearch
bin/go-mysql-elasticsearch -config=. /etc/river.toml

VII. elasticsearch-jdbc による同期化の実装

ツールのダウンロード

解凍:unzip elasticsearch-jdbc-2.3.2.0-dist.zip

環境変数の設定

[root@autofelix /]# vi /etc/profile
export JDBC_IMPORTER_HOME=/elasticsearch-jdbc-2.3.2.0

環境変数の有効化

[root@autofelix /]# source /etc/profile

コンフィギュレーション参照

ステップ1:ルートディレクトリの下に、以下のように新しいフォルダodbc_esを作成します。

[root@autofelix /]# ll /odbc_es/
drwxr-xr-x 2 root root 4096 Jun 16 03:11 logs
-rwxrwxrwx 1 root root 542 Jun 16 04:03 mysql_import_es.sh

ステップ2:次の内容で新しいスクリプトmysql_import_es.shを作成します。

[root@autofelix odbc_es]# cat mysql_import_es.sh
'#! /bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib
echo '{
"type" : "jdbc",
"jdbc": {
"elasticsearch.autodiscover":true,
"elasticsearch.cluster":"my-application", # Cluster names, see: /usr/local/elasticsearch/config/elasticsearch.yml
"url":"jdbc:mysql://10.8.5.101:3306/test", #mysql database address
"user":"root", #mysql username
"password":"123456", #mysql password
"sql":"select * from cc",
"elasticsearch" : {
  "host" : "10.8.5.101",
  "port" : 9300
},
"index" : "myindex", # new index
"type" : "mytype" # new type
}
}'| java \
  -cp "${lib}/*" \
  -Dlog4j.configurationFile=${bin}/log4j2.xml \
  org.xbib.tools.Runner \
  org.xbib.tools.JDBCImporter

ステップ3:mysql_import_es.shに実行可能なパーミッションを追加します。

[root@autofelix odbc_es]# chmod a+x mysql_import_es.sh

ステップ4:スクリプトmysql_import_es.shを実行します。

[root@autofelix odbc_es]# . /mysql_import_es.sh

今回の記事は、MysqlからElasticsearchにデータを同期させる方法についてです。MysqからElasticsearchへのデータ同期については、スクリプトハウスの過去記事を検索するか、以下の記事を引き続き閲覧してください。