diff --git a/README.md b/README.md index 7b84bf0..08c0640 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,9 @@ # reverse_sql 工具介绍 reverse_sql 是一个用于解析和转换 MySQL 二进制日志(binlog)的工具。它可以将二进制日志文件中记录的数据库更改操作(如插入、更新、删除)转换为反向的 SQL 语句,以便进行数据恢复。其运行模式需二进制日志设置为 ROW 格式。 - +``` +reverse_sql工具版本号: 2.1.2,更新日期:2024-08-09 - 增加表字段是json或text类型的支持 +编译后的二进制版本下载地址:https://pan.quark.cn/s/35abca11c589 +``` 该工具的主要功能和特点包括: 1、解析二进制日志:reverse_sql 能够解析 MySQL 的二进制日志文件,并还原出其中的 SQL 语句。 @@ -31,6 +34,9 @@ reverse_sql 是一个用于解析和转换 MySQL 二进制日志(binlog)的 这样,每个线程的开始时间都会有所偏移,确保处理的时间范围没有重叠,并且覆盖了整个时间范围。最后,将结果保存在一个列表里,并对列表做升序排序,取得最终结果。 +### 演示视频 +https://edu.51cto.com/video/1659.html + ### 使用 ``` shell> chmod 755 reverse_sql @@ -107,4 +113,25 @@ shell> awk '/^-- SQL执行时间/{filename = "output" ++count ".sql"; print > fi #### 注:reverse_sql 支持MySQL 5.7/8.0 和 MariaDB,适用于CentOS 7系统。 +------------------------------------------------------------------------------------ +### Docker部署使用 +shell> wget https://github.com/hcymysql/reverse_sql/archive/refs/heads/reverse_sql_progress.zip + +shell> unzip reverse_sql_progress.zip + +shell> cd reverse_sql_progress + +shell> vim Dockerfile +``` +FROM centos:7 + +COPY reverse_sql /root/ +RUN chmod 755 /root/reverse_sql +``` +shell> docker build -t reverse_sql . + +shell> docker run -itd --name reverse_sql reverse_sql /bin/bash + +shell> docker exec -it reverse_sql /root/reverse_sql --help +------------------------------------------------------------------------------------ diff --git a/requirements_mysql57.txt b/requirements_mysql57.txt new file mode 100644 index 0000000..abbbc50 --- /dev/null +++ b/requirements_mysql57.txt @@ -0,0 +1,4 @@ +pytz +pymysql +mysql-replication==0.45.1 +tqdm diff --git a/requirements_mysql8.txt b/requirements_mysql8.txt new file mode 100644 index 0000000..e2568e1 --- /dev/null +++ b/requirements_mysql8.txt @@ -0,0 +1,4 @@ +pytz +pymysql +mysql-replication +tqdm diff --git a/reverse_sql b/reverse_sql deleted file mode 100644 index 09329f3..0000000 Binary files a/reverse_sql and /dev/null differ diff --git a/reverse_sql.py b/reverse_sql_json.py similarity index 69% rename from reverse_sql.py rename to reverse_sql_json.py index e70e7eb..51f8d20 100644 --- a/reverse_sql.py +++ b/reverse_sql_json.py @@ -14,6 +14,8 @@ UpdateRowsEvent, DeleteRowsEvent ) +from tqdm import tqdm +import json timezone = pytz.timezone('Asia/Shanghai') @@ -25,6 +27,7 @@ # 创建一个锁对象 file_lock = threading.Lock() + def check_binlog_settings(mysql_host=None, mysql_port=None, mysql_user=None, mysql_passwd=None, mysql_database=None, mysql_charset=None): # 连接 MySQL 数据库 @@ -54,7 +57,7 @@ def check_binlog_settings(mysql_host=None, mysql_port=None, mysql_user=None, # 检查参数值是否满足条件 if binlog_format != 'ROW' and binlog_row_image != 'FULL': exit("\nMySQL 的变量参数 binlog_format 的值应为 ROW,参数 binlog_row_image 的值应为 FULL\n") - + finally: # 关闭数据库连接 cursor.close() @@ -62,8 +65,20 @@ def check_binlog_settings(mysql_host=None, mysql_port=None, mysql_user=None, def process_binlogevent(binlogevent, start_time, end_time): + + def convert_bytes_to_str(data): + if isinstance(data, dict): + return {convert_bytes_to_str(key): convert_bytes_to_str(value) for key, value in data.items()} + elif isinstance(data, list): + return [convert_bytes_to_str(item) for item in data] + elif isinstance(data, bytes): + return data.decode('utf-8') + else: + return data + + database_name = binlogevent.schema - + if start_time <= binlogevent.timestamp <= end_time: for row in binlogevent.rows: event_time = binlogevent.timestamp @@ -72,41 +87,53 @@ def process_binlogevent(binlogevent, start_time, end_time): if only_operation and only_operation != 'insert': continue else: + values = convert_bytes_to_str(row["values"]) sql = "INSERT INTO {}({}) VALUES ({});".format( f"`{database_name}`.`{binlogevent.table}`" if database_name else binlogevent.table, - ','.join(["`{}`".format(k) for k in row["values"].keys()]), - ','.join(["'{}'".format(v) if isinstance(v, ( - str, datetime.datetime)) else 'NULL' if v is None else str(v) - for v in row["values"].values()]) + ','.join(["`{}`".format(k) for k in values.keys()]), + ','.join(["'{}'".format(json.dumps(v, ensure_ascii=False) if isinstance(v, (dict, list)) else v) + if isinstance(v, (str, datetime.datetime, datetime.date, dict, list)) + else 'NULL' if v is None else str(v) for v in values.values()]) ) - rollback_sql = "DELETE FROM {} WHERE {};".format(f"`{database_name}`.`{binlogevent.table}`" - if database_name else binlogevent.table, ' AND '.join(["`{}`={}".format(k, "'{}'".format(v) - if isinstance(v, (str, datetime.datetime)) else 'NULL' if v is None else str(v)) - for k, v in row["values"].items()])) - + rollback_sql = "DELETE FROM {} WHERE {};".format( + f"`{database_name}`.`{binlogevent.table}`" if database_name else binlogevent.table, + ' AND '.join([ + "`{}`={}".format( + k, + "'{}'".format(json.dumps(v, ensure_ascii=False) if isinstance(v, (dict, list)) else v) + if isinstance(v, (str, datetime.datetime, datetime.date, dict, list)) + else 'NULL' if v is None + else str(v) + ) + for k, v in values.items() + ]) + ) result_queue.put({"event_time": event_time, "sql": sql, "rollback_sql": rollback_sql}) elif isinstance(binlogevent, UpdateRowsEvent): if only_operation and only_operation != 'update': continue else: + before_values = convert_bytes_to_str(row["before_values"]) + after_values = convert_bytes_to_str(row["after_values"]) + set_values = [] - for k, v in row["after_values"].items(): - if isinstance(v, str): + for k, v in after_values.items(): + if isinstance(v, (dict, list)): + set_values.append(f"`{k}`='{json.dumps(v, ensure_ascii=False)}'") + elif isinstance(v, (str, datetime.datetime, datetime.date)): set_values.append(f"`{k}`='{v}'") - elif isinstance(v, datetime.datetime): - set_values.append(f"`{k}`='{v}'") # 将时间字段转换为字符串形式 else: - set_values.append(f"`{k}`={v}" if v is not None else f"`{k}`= NULL") + set_values.append(f"`{k}`={v}" if v is not None else f"`{k}`=NULL") set_clause = ','.join(set_values) where_values = [] - for k, v in row["before_values"].items(): - if isinstance(v, str): + for k, v in before_values.items(): + if isinstance(v, (dict, list)): + where_values.append(f"`{k}`='{json.dumps(v, ensure_ascii=False)}'") + elif isinstance(v, (str, datetime.datetime, datetime.date)): where_values.append(f"`{k}`='{v}'") - elif isinstance(v, datetime.datetime): - where_values.append(f"`{k}`='{v}'") # 添加对时间类型的处理 else: where_values.append(f"`{k}`={v}" if v is not None else f"`{k}` IS NULL") where_clause = ' AND '.join(where_values) @@ -114,69 +141,81 @@ def process_binlogevent(binlogevent, start_time, end_time): sql = f"UPDATE `{database_name}`.`{binlogevent.table}` SET {set_clause} WHERE {where_clause};" rollback_set_values = [] - for k, v in row["before_values"].items(): - if isinstance(v, str): + for k, v in before_values.items(): + if isinstance(v, (dict, list)): + rollback_set_values.append(f"`{k}`='{json.dumps(v, ensure_ascii=False)}'") + elif isinstance(v, (str, datetime.datetime, datetime.date)): rollback_set_values.append(f"`{k}`='{v}'") - elif isinstance(v, datetime.datetime): - rollback_set_values.append(f"`{k}`='{v}'") # 添加对时间类型的处理 else: rollback_set_values.append(f"`{k}`={v}" if v is not None else f"`{k}`=NULL") rollback_set_clause = ','.join(rollback_set_values) rollback_where_values = [] - for k, v in row["after_values"].items(): - if isinstance(v, str): + for k, v in after_values.items(): + if isinstance(v, (dict, list)): + rollback_where_values.append(f"`{k}`='{json.dumps(v, ensure_ascii=False)}'") + elif isinstance(v, (str, datetime.datetime, datetime.date)): rollback_where_values.append(f"`{k}`='{v}'") - elif isinstance(v, datetime.datetime): - rollback_where_values.append(f"`{k}`='{v}'") # 添加对时间类型的处理 else: rollback_where_values.append(f"`{k}`={v}" if v is not None else f"`{k}` IS NULL") rollback_where_clause = ' AND '.join(rollback_where_values) + rollback_sql = f"UPDATE `{database_name}`.`{binlogevent.table}` SET {rollback_set_clause} WHERE {rollback_where_clause};" - + try: rollback_replace_set_values = [] - for v in row["before_values"].values(): - if v is None: - rollback_replace_set_values.append("NULL") - elif isinstance(v, (str, datetime.datetime)): - rollback_replace_set_values.append(f"'{v}'") - else: - rollback_replace_set_values.append(str(v)) + for v in convert_bytes_to_str(row["before_values"]).values(): + if v is None: + rollback_replace_set_values.append("NULL") + elif isinstance(v, (str, datetime.datetime, datetime.date)): + rollback_replace_set_values.append(f"'{v}'") + elif isinstance(v, (dict, list)): + v = json.dumps(v, ensure_ascii=False) + rollback_replace_set_values.append(f"'{v}'") + else: + rollback_replace_set_values.append(str(v)) rollback_replace_set_clause = ','.join(rollback_replace_set_values) fields_clause = ','.join([f"`{k}`" for k in row["after_values"].keys()]) rollback_replace_sql = f"REPLACE INTO `{database_name}`.`{binlogevent.table}` ({fields_clause}) VALUES ({rollback_replace_set_clause});" except Exception as e: print("出现异常错误:", e) - #print(rollback_replace_sql) result_queue.put({"event_time": event_time, "sql": sql, "rollback_sql": rollback_sql}) - result_queue_replace.put({"event_time": event_time, "sql": sql, "rollback_sql": rollback_replace_sql}) + result_queue_replace.put( + {"event_time": event_time, "sql": sql, "rollback_sql": rollback_replace_sql}) elif isinstance(binlogevent, DeleteRowsEvent): if only_operation and only_operation != 'delete': continue else: - sql = "DELETE FROM {} WHERE {};".format( - f"`{database_name}`.`{binlogevent.table}`" if database_name else binlogevent.table, - ' AND '.join(["`{}`={}".format(k, "'{}'".format(v) if isinstance(v, (str, datetime.datetime)) - else 'NULL' if v is None else str(v)) - for k, v in row["values"].items()]) + values = convert_bytes_to_str(row["values"]) + sql = "DELETE FROM `{}` WHERE {};".format( + "`{}`.`{}`".format(database_name, binlogevent.table) if database_name else "`{}`".format(binlogevent.table), + ' AND '.join(["`{}`={}".format(k, + "'{}'".format(json.dumps(v, ensure_ascii=False) if isinstance(v, (dict, list)) else v) + if isinstance(v, (str, datetime.datetime, datetime.date, dict, list)) + else 'NULL' if v is None + else str(v)) + for k, v in values.items()]) ) rollback_sql = "INSERT INTO {}({}) VALUES ({});".format( - f"`{database_name}`.`{binlogevent.table}`" if database_name else binlogevent.table, - '`' + '`,`'.join(list(row["values"].keys())) + '`', - ','.join(["'%s'" % str(i) if isinstance(i, (str, datetime.datetime)) else 'NULL' if i is None else str(i) - for i in list(row["values"].values())]) + "`{}`.`{}`".format(database_name, binlogevent.table) if database_name else "`{}`".format(binlogevent.table), + '`' + '`,`'.join(list(values.keys())) + '`', + ','.join(["'{}'".format(json.dumps(i, ensure_ascii=False) if isinstance(i, (dict, list)) else i) + if isinstance(i, (str, datetime.datetime, datetime.date, dict, list)) + else 'NULL' if i is None + else str(i) + for i in list(values.values())]) ) result_queue.put({"event_time": event_time, "sql": sql, "rollback_sql": rollback_sql}) def main(only_tables=None, only_operation=None, mysql_host=None, mysql_port=None, mysql_user=None, mysql_passwd=None, - mysql_database=None, mysql_charset=None, binlog_file=None, binlog_pos=None, st=None, et=None, max_workers=None, print_output=False, replace_output=False): + mysql_database=None, mysql_charset=None, binlog_file=None, binlog_pos=None, st=None, et=None, max_workers=None, + print_output=False, replace_output=False): valid_operations = ['insert', 'delete', 'update'] if only_operation: @@ -216,16 +255,26 @@ def main(only_tables=None, only_operation=None, mysql_host=None, mysql_port=None next_binlog_file_lock = threading.Lock() next_binlog_pos_lock = threading.Lock() - + for i in range(max_workers): task_start_time = start_time + i * interval task_end_time = task_start_time + interval - if i == (max_workers-1): - #task_end_time = end_time - (max_workers-1) * interval + if i == (max_workers - 1): + # task_end_time = end_time - (max_workers-1) * interval task_end_time = end_time tasks = [] + + # 创建进度条对象 + progress_bar = tqdm(desc='Processing binlogevents', unit='event', leave=True) + + event_count = 0 # 初始化事件计数器 + for binlogevent in stream: + event_count += 1 # 每迭代一次,计数器加一 + # 更新进度条 + progress_bar.update(1) + #for binlogevent in tqdm(stream, desc='Processing binlogevents', unit='event'): if binlogevent.timestamp < task_start_time: # 如果事件的时间小于任务的起始时间,则继续迭代下一个事件 continue elif binlogevent.timestamp > task_end_time: # 如果事件的时间大于任务的结束时间,则结束该任务的迭代 @@ -248,6 +297,9 @@ def main(only_tables=None, only_operation=None, mysql_host=None, mysql_port=None """ tasks.append(task) + # 刷新进度条显示 + progress_bar.refresh() + wait(tasks) stream.close() @@ -263,6 +315,12 @@ def main(only_tables=None, only_operation=None, mysql_host=None, mysql_port=None only_tables=only_tables ) + # 设置进度条的总长度为事件计数器的值 + progress_bar.total = event_count + + # 完成后关闭进度条 + progress_bar.close() + while not result_queue.empty(): combined_array.append(result_queue.get()) @@ -284,14 +342,14 @@ def main(only_tables=None, only_operation=None, mysql_host=None, mysql_port=None rollback_sql = item["rollback_sql"] if print_output: - print(f"-- SQL执行时间:{current_time} \n-- 原生sql:\n \t-- {sql} \n-- 回滚sql:\n \t{rollback_sql}\n-- ----------------------------------------------------------\n") + print( + f"-- SQL执行时间:{current_time} \n-- 原生sql:\n \t-- {sql} \n-- 回滚sql:\n \t{rollback_sql}\n-- ----------------------------------------------------------\n") # 写入文件 filename = f"{binlogevent.schema}_{binlogevent.table}_recover_{formatted_time}.sql" - #filename = f"{binlogevent.schema}_{binlogevent.table}_recover.sql" + # filename = f"{binlogevent.schema}_{binlogevent.table}_recover.sql" with file_lock: # 获取文件锁 with open(filename, "a", encoding="utf-8") as file: - file.write(f"-- SQL执行时间:{current_time}\n") file.write(f"-- 原生sql:\n \t-- {sql}\n") file.write(f"-- 回滚sql:\n \t{rollback_sql}\n") @@ -303,14 +361,14 @@ def main(only_tables=None, only_operation=None, mysql_host=None, mysql_port=None event_time = item["event_time"] dt = datetime.datetime.fromtimestamp(event_time, tz=timezone) current_time = dt.strftime('%Y-%m-%d %H:%M:%S') - + sql = item["sql"] rollback_sql = item["rollback_sql"] - + if print_output: print( f"-- SQL执行时间:{current_time} \n-- 原生sql:\n \t-- {sql} \n-- 回滚sql:\n \t{rollback_sql}\n-- ----------------------------------------------------------\n") - + # 写入文件 filename = f"{binlogevent.schema}_{binlogevent.table}_recover_{formatted_time}_replace.sql" # filename = f"{binlogevent.schema}_{binlogevent.table}_recover.sql" @@ -330,9 +388,10 @@ def main(only_tables=None, only_operation=None, mysql_host=None, mysql_port=None Example usage: shell> ./reverse_sql -ot table1 -op delete -H 192.168.198.239 -P 3336 -u admin -p hechunyang -d hcy \ --binlog-file mysql-bin.000124 --start-time "2023-07-06 10:00:00" --end-time "2023-07-06 22:00:00" """, - formatter_class=argparse.RawTextHelpFormatter) + formatter_class=argparse.RawTextHelpFormatter) parser.add_argument("-ot", "--only-tables", dest="only_tables", nargs="+", type=str, help="设置要恢复的表,多张表用,逗号分隔") - parser.add_argument("-op", "--only-operation", dest="only_operation", type=str, help="设置误操作时的命令(insert/update/delete)") + parser.add_argument("-op", "--only-operation", dest="only_operation", type=str, + help="设置误操作时的命令(insert/update/delete)") parser.add_argument("-H", "--mysql-host", dest="mysql_host", type=str, help="MySQL主机名", required=True) parser.add_argument("-P", "--mysql-port", dest="mysql_port", type=int, help="MySQL端口号", required=True) parser.add_argument("-u", "--mysql-user", dest="mysql_user", type=str, help="MySQL用户名", required=True) @@ -346,6 +405,7 @@ def main(only_tables=None, only_operation=None, mysql_host=None, mysql_port=None parser.add_argument("--max-workers", dest="max_workers", type=int, default=4, help="线程数,默认4(并发越高,锁的开销就越大,适当调整并发数)") parser.add_argument("--print", dest="print_output", action="store_true", help="将解析后的SQL输出到终端") parser.add_argument("--replace", dest="replace_output", action="store_true", help="将update转换为replace操作") + parser.add_argument('-v', '--version', action='version', version='reverse_sql工具版本号: 2.1.2,更新日期:2024-08-09') args = parser.parse_args() if args.only_tables: @@ -384,4 +444,6 @@ def main(only_tables=None, only_operation=None, mysql_host=None, mysql_port=None max_workers=args.max_workers, print_output=args.print_output, replace_output=args.replace_output - ) \ No newline at end of file + ) + +