个别场景下,开发提需求,需要把某个MySQL里面指定数据同步到ES中,希望能有一个通用的脚本,用于特殊场景下的补数据或者临时性的数据同步。
注意: python es包的版本如果和es服务端的版本不一致的话,可能遇到报错。把python es的包版本换成和server端一致的版本即可。
下面的这个脚本,是用python+django+celery来实现上述功能的。核心代码如下:
方法1 逐条记录同步
代码运行次数:0
Cloud Studio 代码运行
# -*- coding: utf-8 -*-
# 根据MySQL表的update_time列同步增量数据到ES中,注意必须带上esId这个字段,这个值是作为ES的_id的
import os
import sys
import time
import mysql.connector
from elasticsearch import Elasticsearch
def sync(task_name, mysql_host, mysql_port, mysql_user, mysql_pass, sql_condition, es_addr, es_index_name):
try:
mydb = mysql.connector.connect(
host=mysql_host,
port=mysql_port,
user=mysql_user,
passwd=mysql_pass,
ssl_disabled=True,
)
# 注意这里把结果输出为dict格式
mycursor = mydb.cursor(dictionary=True)
except Exception as e:
# 发送钉钉告警
sys.exit(10)
mycursor.execute(sql_condition)
res = mycursor.fetchall()
es = Elasticsearch(es_addr, timeout=60, max_retries=10, retry_on_timeout=True)
for i in res:
try:
es.index(index=es_index_name, body=dict(i), id=i['esId'])
except Exception as e:
# 发送钉钉告警
pass
finally:
continue
if __name__ == '__main__':
t1 = time.time()
sync("DBA-测试",
"192.168.31.181", 3306, "dts", "dts",
"select id as esId,k,c,pad,ccc from sbtest.sbtest3",
['http://192.168.31.181:8989'],
'dba-test-new-2',
)
t2 = time.time()
print(t2-t1)
方法2 批量同步方式【推荐用这种】
代码运行次数:0
# -*- coding: utf-8 -*-
import time
import os
import sys
from elasticsearch import Elasticsearch,helpers
import mysql.connector
def sync(task_name, mysql_host, mysql_port, mysql_user, mysql_pass, sql_condition, es_addr, es_index_name):
try:
mydb = mysql.connector.connect(
host=mysql_host,
port=mysql_port,
user=mysql_user,
passwd=mysql_pass,
ssl_disabled=True,
)
mycursor = mydb.cursor(dictionary=True)
except Exception as e:
sys.exit(10)
try:
mycursor.execute(sql_condition)
res = mycursor.fetchall()
es = Elasticsearch(es_addr, request_timeout=60, max_retries=10, retry_on_timeout=True)
# 准备bulk操作的数据
actions = (
{
"_index": es_index_name,
"_id": i['esId'],
"_source": dict(i)
} for i in res
)
# 使用helpers.bulk进行批量写入
success, _ = helpers.bulk(es, actions)
print(f"Indexed {success} documents.")
except Exception as e:
print(f"Error indexing documents to Elasticsearch: {e}")
finally:
mycursor.close()
mydb.close()
if __name__ == '__main__':
t1 = time.time()
sync("DBA-测试",
"192.168.31.181", 3306, "dts", "dts",
"select id as esId,k,c,pad,ccc from sbtest.sbtest3",
['http://192.168.31.181:8989'],
'dba-test-new-2',
)
t2 = time.time()
print(t2-t1)
耗时
MySQL端记录数: 94326
行记录demo:
[sbtest]> select * from sbtest3 limit 10,1 \G
*************************** 1. row ***************************
id: 5685
k: 50479
c: 91674238320-66576604523-84892597271-42298112537-31748098687-87592861234-27236853894-78260103493-10155978333-85784381566
pad: 50437784151-86772162187-31166376983-54827989967-72340867827
ccc: NULL
1 row in set (0.00 sec)
逐条提交: 耗时 389秒
批量提交: 耗时 12秒
设置索引mapping,否则可能出现时间列格式等其它问题
代码运行次数:0
Cloud Studio 代码运行
PUT dba-test
{
"mappings": {
"properties": {
"orderNo": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"esId": {
"type": "keyword"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"sex": {
"type": "keyword"
},
"update_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
},
"remark": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
celery 任务配置类似如下:
代码运行次数:0
Cloud Studio 代码运行
{
"task_name": "dba-test",
"mysql_host": "192.168.1.11",
"mysql_port": 3306,
"mysql_user": "dts",
"mysql_pass": "dts",
"sql_condition": "select a as esId,a as orderNo,name,sex,update_time,remark from sbtest.t1 where update_time>DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 5 MINUTE), '%Y-%m-%d %H:%i:%s')",
"es_addr": [
"192.168.1.110:9200",
"192.168.1.111:9200",
"192.168.1.112:9200"
],
"es_index_name": "dba-test"
}
根据实际需要,celery里面可以使用周期性任务或者一次性任务。
此外,这里的sql_condition 也支持复杂条件,例如直接进行2表关联取值(注意esId不要重复就行):
"sql_condition": "select b.a as esId,a.update_time,a.name,a.sex,b.addr,b.job from sbtest.t1 a inner join sbtest.t2 b on a.name=b.name where a.update_time>DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 5 MINUTE), '%Y-%m-%d %H:%i:%s')",
生产上,还需要接钉钉告警,如果数据同步失败,会及时通知,类似如下: