基于查询的MySQL到ES的数据同步

MySQL
188
0
0
2024-08-02

个别场景下,开发提需求,需要把某个MySQL里面指定数据同步到ES中,希望能有一个通用的脚本,用于特殊场景下的补数据或者临时性的数据同步。

注意: python es包的版本如果和es服务端的版本不一致的话,可能遇到报错。把python es的包版本换成和server端一致的版本即可。

下面的这个脚本,是用python+django+celery来实现上述功能的。核心代码如下:

方法1 逐条记录同步

代码运行次数:0

Cloud Studio 代码运行

# -*- coding: utf-8 -*-
# 根据MySQL表的update_time列同步增量数据到ES中,注意必须带上esId这个字段,这个值是作为ES的_id的


import os
import sys
import time
import mysql.connector
from elasticsearch import Elasticsearch


def sync(task_name, mysql_host, mysql_port, mysql_user, mysql_pass, sql_condition, es_addr, es_index_name):
    try:
        mydb = mysql.connector.connect(
            host=mysql_host,
            port=mysql_port,
            user=mysql_user,
            passwd=mysql_pass,
			ssl_disabled=True,
        )
        # 注意这里把结果输出为dict格式
        mycursor = mydb.cursor(dictionary=True)
    except Exception as e:
		# 发送钉钉告警
        sys.exit(10)

    mycursor.execute(sql_condition)
    res = mycursor.fetchall()

    es = Elasticsearch(es_addr, timeout=60, max_retries=10, retry_on_timeout=True)

    for i in res:
        try:
            es.index(index=es_index_name, body=dict(i), id=i['esId'])
        except Exception as e:
            # 发送钉钉告警
			pass
        finally:
            continue


if __name__ == '__main__':
    t1 = time.time()
    sync("DBA-测试",
         "192.168.31.181", 3306, "dts", "dts",
         "select id as esId,k,c,pad,ccc from sbtest.sbtest3",
         ['http://192.168.31.181:8989'],
         'dba-test-new-2',
         )
    t2 = time.time()
    print(t2-t1)

方法2 批量同步方式【推荐用这种】

代码运行次数:0

# -*- coding: utf-8 -*-
import time
import os
import sys

from elasticsearch import Elasticsearch,helpers
import mysql.connector  

def sync(task_name, mysql_host, mysql_port, mysql_user, mysql_pass, sql_condition, es_addr, es_index_name):
    try:
        mydb = mysql.connector.connect(
            host=mysql_host,
            port=mysql_port,
            user=mysql_user,
            passwd=mysql_pass,
            ssl_disabled=True,
        )
        mycursor = mydb.cursor(dictionary=True)
    except Exception as e:
        sys.exit(10)

    try:
        mycursor.execute(sql_condition)
        res = mycursor.fetchall()

        es = Elasticsearch(es_addr, request_timeout=60, max_retries=10, retry_on_timeout=True)
        
        # 准备bulk操作的数据  
        actions = (  
            {  
                "_index": es_index_name,  
                "_id": i['esId'],  
                "_source": dict(i)  
            } for i in res  
        )
 
        # 使用helpers.bulk进行批量写入  
        success, _ = helpers.bulk(es, actions)  
        print(f"Indexed {success} documents.")  

    except Exception as e:  
        print(f"Error indexing documents to Elasticsearch: {e}")  
    finally:  
        mycursor.close()  
        mydb.close()

if __name__ == '__main__':
    t1 = time.time()
    sync("DBA-测试",
         "192.168.31.181", 3306, "dts", "dts",
         "select id as esId,k,c,pad,ccc from sbtest.sbtest3",
         ['http://192.168.31.181:8989'],
         'dba-test-new-2',
         )
    t2 = time.time()
    print(t2-t1)

耗时

MySQL端记录数: 94326
行记录demo:
[sbtest]> select  * from sbtest3 limit 10,1 \G
*************************** 1. row ***************************
 id: 5685
  k: 50479
  c: 91674238320-66576604523-84892597271-42298112537-31748098687-87592861234-27236853894-78260103493-10155978333-85784381566
pad: 50437784151-86772162187-31166376983-54827989967-72340867827
ccc: NULL
1 row in set (0.00 sec)



逐条提交: 耗时 389秒
批量提交: 耗时 12秒

设置索引mapping,否则可能出现时间列格式等其它问题

代码运行次数:0

Cloud Studio 代码运行

PUT dba-test
{
  "mappings": {
    "properties": {
      "orderNo": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "esId": {
        "type": "keyword"
      },
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "sex": {
        "type": "keyword"
      },
      "update_time": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
      },
      "remark": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      }
    }
  }
}

celery 任务配置类似如下:

代码运行次数:0

Cloud Studio 代码运行

{
    "task_name": "dba-test",
    "mysql_host": "192.168.1.11",
    "mysql_port": 3306,
    "mysql_user": "dts",
    "mysql_pass": "dts",
    "sql_condition": "select a as esId,a as orderNo,name,sex,update_time,remark from sbtest.t1 where update_time>DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 5 MINUTE), '%Y-%m-%d %H:%i:%s')",
    "es_addr": [
        "192.168.1.110:9200",
        "192.168.1.111:9200",
        "192.168.1.112:9200"
    ],
    "es_index_name": "dba-test"
}

根据实际需要,celery里面可以使用周期性任务或者一次性任务。

此外,这里的sql_condition 也支持复杂条件,例如直接进行2表关联取值(注意esId不要重复就行):

"sql_condition": "select b.a as esId,a.update_time,a.name,a.sex,b.addr,b.job from sbtest.t1 a inner join sbtest.t2 b on a.name=b.name where a.update_time>DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 5 MINUTE), '%Y-%m-%d %H:%i:%s')",

生产上,还需要接钉钉告警,如果数据同步失败,会及时通知,类似如下:

告警示例