个别场景下,开发提需求,需要把某个MySQL里面指定数据同步到ES中,希望能有一个通用的脚本,用于特殊场景下的补数据或者临时性的数据同步。
注意: python es包的版本如果和es服务端的版本不一致的话,可能遇到报错。把python es的包版本换成和server端一致的版本即可。
下面的这个脚本,是用python+django+celery来实现上述功能的。核心代码如下:
方法1 逐条记录同步
代码运行次数:0
Cloud Studio 代码运行
# -*- coding: utf-8 -*- | |
# 根据MySQL表的update_time列同步增量数据到ES中,注意必须带上esId这个字段,这个值是作为ES的_id的 | |
import os | |
import sys | |
import time | |
import mysql.connector | |
from elasticsearch import Elasticsearch | |
def sync(task_name, mysql_host, mysql_port, mysql_user, mysql_pass, sql_condition, es_addr, es_index_name): | |
try: | |
mydb = mysql.connector.connect( | |
host=mysql_host, | |
port=mysql_port, | |
user=mysql_user, | |
passwd=mysql_pass, | |
ssl_disabled=True, | |
) | |
# 注意这里把结果输出为dict格式 | |
mycursor = mydb.cursor(dictionary=True) | |
except Exception as e: | |
# 发送钉钉告警 | |
sys.exit(10) | |
mycursor.execute(sql_condition) | |
res = mycursor.fetchall() | |
es = Elasticsearch(es_addr, timeout=60, max_retries=10, retry_on_timeout=True) | |
for i in res: | |
try: | |
es.index(index=es_index_name, body=dict(i), id=i['esId']) | |
except Exception as e: | |
# 发送钉钉告警 | |
pass | |
finally: | |
continue | |
if __name__ == '__main__': | |
t1 = time.time() | |
sync("DBA-测试", | |
"192.168.31.181", 3306, "dts", "dts", | |
"select id as esId,k,c,pad,ccc from sbtest.sbtest3", | |
['http://192.168.31.181:8989'], | |
'dba-test-new-2', | |
) | |
t2 = time.time() | |
print(t2-t1) |
方法2 批量同步方式【推荐用这种】
代码运行次数:0
# -*- coding: utf-8 -*- | |
import time | |
import os | |
import sys | |
from elasticsearch import Elasticsearch,helpers | |
import mysql.connector | |
def sync(task_name, mysql_host, mysql_port, mysql_user, mysql_pass, sql_condition, es_addr, es_index_name): | |
try: | |
mydb = mysql.connector.connect( | |
host=mysql_host, | |
port=mysql_port, | |
user=mysql_user, | |
passwd=mysql_pass, | |
ssl_disabled=True, | |
) | |
mycursor = mydb.cursor(dictionary=True) | |
except Exception as e: | |
sys.exit(10) | |
try: | |
mycursor.execute(sql_condition) | |
res = mycursor.fetchall() | |
es = Elasticsearch(es_addr, request_timeout=60, max_retries=10, retry_on_timeout=True) | |
# 准备bulk操作的数据 | |
actions = ( | |
{ | |
"_index": es_index_name, | |
"_id": i['esId'], | |
"_source": dict(i) | |
} for i in res | |
) | |
# 使用helpers.bulk进行批量写入 | |
success, _ = helpers.bulk(es, actions) | |
print(f"Indexed {success} documents.") | |
except Exception as e: | |
print(f"Error indexing documents to Elasticsearch: {e}") | |
finally: | |
mycursor.close() | |
mydb.close() | |
if __name__ == '__main__': | |
t1 = time.time() | |
sync("DBA-测试", | |
"192.168.31.181", 3306, "dts", "dts", | |
"select id as esId,k,c,pad,ccc from sbtest.sbtest3", | |
['http://192.168.31.181:8989'], | |
'dba-test-new-2', | |
) | |
t2 = time.time() | |
print(t2-t1) |
耗时
MySQL端记录数: 94326 | |
行记录demo: | |
[sbtest]> select * from sbtest3 limit 10,1 \G | |
*************************** 1. row *************************** | |
id: 5685 | |
k: 50479 | |
c: 91674238320-66576604523-84892597271-42298112537-31748098687-87592861234-27236853894-78260103493-10155978333-85784381566 | |
pad: 50437784151-86772162187-31166376983-54827989967-72340867827 | |
ccc: NULL | |
1 row in set (0.00 sec) | |
逐条提交: 耗时 389秒 | |
批量提交: 耗时 12秒 |
设置索引mapping,否则可能出现时间列格式等其它问题
代码运行次数:0
Cloud Studio 代码运行
PUT dba-test | |
{ | |
"mappings": { | |
"properties": { | |
"orderNo": { | |
"type": "text", | |
"fields": { | |
"keyword": { | |
"type": "keyword", | |
"ignore_above": 256 | |
} | |
} | |
}, | |
"esId": { | |
"type": "keyword" | |
}, | |
"name": { | |
"type": "text", | |
"fields": { | |
"keyword": { | |
"type": "keyword", | |
"ignore_above": 256 | |
} | |
} | |
}, | |
"sex": { | |
"type": "keyword" | |
}, | |
"update_time": { | |
"type": "date", | |
"format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis" | |
}, | |
"remark": { | |
"type": "text", | |
"fields": { | |
"keyword": { | |
"type": "keyword", | |
"ignore_above": 256 | |
} | |
} | |
} | |
} | |
} | |
} |
celery 任务配置类似如下:
代码运行次数:0
Cloud Studio 代码运行
{ | |
"task_name": "dba-test", | |
"mysql_host": "192.168.1.11", | |
"mysql_port": 3306, | |
"mysql_user": "dts", | |
"mysql_pass": "dts", | |
"sql_condition": "select a as esId,a as orderNo,name,sex,update_time,remark from sbtest.t1 where update_time>DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 5 MINUTE), '%Y-%m-%d %H:%i:%s')", | |
"es_addr": [ | |
"192.168.1.110:9200", | |
"192.168.1.111:9200", | |
"192.168.1.112:9200" | |
], | |
"es_index_name": "dba-test" | |
} |
根据实际需要,celery里面可以使用周期性任务或者一次性任务。
此外,这里的sql_condition 也支持复杂条件,例如直接进行2表关联取值(注意esId不要重复就行):
"sql_condition": "select b.a as esId,a.update_time,a.name,a.sex,b.addr,b.job from sbtest.t1 a inner join sbtest.t2 b on a.name=b.name where a.update_time>DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 5 MINUTE), '%Y-%m-%d %H:%i:%s')",
生产上,还需要接钉钉告警,如果数据同步失败,会及时通知,类似如下: