基于canal监听MySQL binlog实现数据增量同步

一、背景

业务反馈客服消息列表查询速度慢,有时候甚至要差不多20秒,急需优化提升速度。

二、方案

引入

首先,体验系统,发现查询慢的正是消息列表查询接口。

接着去看代码的设计,流程比较长,但从代码逻辑上设计没有问题什么大问题。

接着拿到查库的主SQL,发现连接的表比较多,然后在测试库看索引,索引缺了一些。加上索引,然后确定确实走了预期的索引之后就给正式库加上了索引。速度确实有了较大提升。能够进入十秒内,但因为数据量大,还是需要4~5秒左右。

不过,业务对这个已经比较满意了,但还是提出能否速度更快。优化完索引之后,可以试试上缓存,但细看了一下数据,变化频率不低,不太适合缓存。

接着就想到了本文的主角-----Elastic Search。支持高性能检索,倒排索引的机制,也更适合大数据量的场景。于是就以测试库来尝试,做引入ES的探索。

选型

ES和数据库作为不同的系统,要查询效果一致,首先就要考虑数据的同步问题。

首先对于数据库已有的数据,做全量同步。这个是没有异议的。

但对于增量数据如何处理就引出了几种常用的方案。

方案一

数据库更新的时候,直接同步更新ES。这种方式实现简单,数据同步也及时,但每处更新数据库都要加上更新ES的操作,在后续开发中,很容易会遗漏。(某些时候可能需要用SQL改数据库数据,这种操作无法更新ES)

方案二

数据库更新的时候,就将更新的操作发送到MQ,让MQ异步去更新ES。这种方案相比第一种复用性更高,可维护性更强一些,但还是有缺漏的风险。(某些时候可能需要用SQL改数据库数据,这种操作无法更新ES)

方案三

监听数据库binlog日志,只要有变更记录的操作,就同步更新ES。这种方案实现起来较复杂,但基本写完一套之后,后续基本不需要再变动。

综合考虑下来,我觉得第三种方案是最好的。在之前的学习中,我尝试了给项目引入Easy ES框架来实现ES的引入。

springboot整合easy-es实现数据的增删改查_easy es-CSDN博客

接下来就差数据的全量同步和增量同步了。全量同步可以直接通过工具来实现,而增量同步就需要工具结合代码来实现了。本文就是打算用阿里的canal来实现监听MySQL数据库的binlog日志。

三、原理

MySQL的分布式是基于主从架构实现的。一般情况下是一主多从,其中一个数据库作为主节点,其他数据库作为从节点,主从节点之间通过订阅binlog的方式实现数据同步。

canal的数据增量同步底层就是利用MySQL的主从同步机制实现的。将canal伪装成master的一个slave节点,向master节点发起dump协议,master节点在接收到dump协议之后,就会将binlog日志推送给canal,canal拿到binlog日志之后执行相应的操作从而实现数据同步。

四、配置流程

官方教程

4.1、配置MySQL

查看源MySQL的binlog配置,确保MySQL开启了binlog日志。

SHOW VARIABLES LIKE "%bin%"

日志的格式为ROW

查看源MySQL的server_id

准备好一个拥有slave权限的MySQL账号。

4.2、配置canal

1.下载canalicon-default.png?t=N7T8https://github.com/alibaba/canal/releases

2.解压,配置canal,修改文件conf/example/instance.properties

配置canal的server_id,注意要和上面查看的源MySQL的不一样。

配置源MySQL的ip+端口

配置源MySQL的账号密码

3.启动项目

在bin目录下找到对应系统的启动文件,双击启动。

/bin/startup.bat(window)

/bin/startup.sh(linux)

查看日志文件,检查是否启动成功。logs/canal/canal.log和logs/example/example.log

服务启动成功

4.3、canal集成到springboot

添加依赖

		<dependency>
			<groupId>com.alibaba.otter</groupId>
			<artifactId>canal.client</artifactId>
			<version>1.1.4</version>
		</dependency>

直接用官方的测试代码验证(不需要更改,如果canal安装在本地的话)

package org.jeecg.modules.admin.assignImClassTeacher;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;


public class SimpleCanalClientExample {


	public static void main(String args[]) {
		// 创建链接
		CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
				11111), "example", "", "");
		int batchSize = 1000;
		int emptyCount = 0;
		try {
			connector.connect();
			connector.subscribe(".*\\..*");
			connector.rollback();
			int totalEmptyCount = 120;
			while (emptyCount < totalEmptyCount) {
				Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
				long batchId = message.getId();
				int size = message.getEntries().size();
				if (batchId == -1 || size == 0) {
					emptyCount++;
					System.out.println("empty count : " + emptyCount);
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
					}
				} else {
					emptyCount = 0;
					// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
					printEntry(message.getEntries());
				}

				connector.ack(batchId); // 提交确认
				// connector.rollback(batchId); // 处理失败, 回滚数据
			}

			System.out.println("empty too many times, exit");
		} finally {
			connector.disconnect();
		}
	}

	/**
	 * 打印canal server解析binlog获得的实体类信息
	 */
	private static void printEntry(List<Entry> entrys) {
		for (Entry entry : entrys) {
			if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
				//开启/关闭事务的实体类型,跳过
				continue;
			}
			//RowChange对象,包含了一行数据变化的所有特征
			//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
			RowChange rowChage;
			try {
				rowChage = RowChange.parseFrom(entry.getStoreValue());
			} catch (Exception e) {
				throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
			}
			//获取操作类型:insert/update/delete类型
			EventType eventType = rowChage.getEventType();
			//打印Header信息
			System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
					entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
					entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
					eventType));
			//判断是否是DDL语句
			if (rowChage.getIsDdl()) {
				System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
			}
			//获取RowChange对象里的每一行数据,打印出来
			for (RowData rowData : rowChage.getRowDatasList()) {
				//如果是删除语句
				if (eventType == EventType.DELETE) {
					printColumn(rowData.getBeforeColumnsList());
					//如果是新增语句
				} else if (eventType == EventType.INSERT) {
					printColumn(rowData.getAfterColumnsList());
					//如果是更新的语句
				} else {
					//变更前的数据
					System.out.println("------->; before");
					printColumn(rowData.getBeforeColumnsList());
					//变更后的数据
					System.out.println("------->; after");
					printColumn(rowData.getAfterColumnsList());
				}
			}
		}
	}

	private static void printColumn(List<Column> columns) {
		for (Column column : columns) {
			System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
		}
	}



}

运行main方法

在更新后,每个字段都有一个update字段,如果值为true代表这个字段更新了,为false代表没更新。

拿到这些涉及数据库变更的事件之后,就可以根据需要去做数据的增量同步了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/578819.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

应用监控(Prometheus + Grafana)

可用于应用监控的系统有很多&#xff0c;有的需要埋点(切面)、有的需要配置Agent(字节码增强)。本节我教大家另外一个监控系统的使用 —— Grafana。 Grafana 监控面板 这套监控主要用到了 SpringBoot Actuator Prometheus Grafana 三个模块组合的起来使用的监控。非常轻量好…

第一个大型汽车ITU-T车载语音通话质量实验室投入使用

中国汽车行业蓬勃发展&#xff0c;尤其是新能源汽车风起云涌&#xff0c;无论是国内还是海外需求旺盛的趋势下&#xff0c;除乘用车等紧凑型车外&#xff0c;中型汽车如MPV、小巴、小型物流车&#xff0c;大型汽车如重卡、泥头车等亦加入了手机互联、智驾的科技行列&#xff0c…

机器人-轨迹规划

旋转矩阵 旋转矩阵--R--一个3*3的矩阵&#xff0c;其每列的值时B坐标系在A坐标系上的投影值。 代表B坐标系相对于A坐标系的姿态。 旋转矩阵的转置矩阵 其实A相对于B的旋转矩阵就相当于把B的列放到行上就行。 视频 &#xff08;将矩阵的行列互换得到的新矩阵称为转置矩阵。&…

基于__torch_dispatch__机制的dump方法

基于__torch_dispatch__机制的dump方法 1.参考链接2.原理3.代码4.效果 之前拦截torch和torch.Tensor的办法,在处理backward时,不能看到aten算子的细节.以下基于__torch_dispatch__机制的方案更节约代码,且能看到调用栈 1.参考链接 [原理] (https://dev-discuss.pytorch.org/t…

matlab学习005-利用matlab设计滤波器

目录 一&#xff0c;含有多个频率成分的三角信号 1&#xff0c;以采样频率fs20KHz对信号采样&#xff0c; 画出信号的波形&#xff1b; 1&#xff09;前期基础 2&#xff09;波形图 3&#xff09;代码 2&#xff0c;选取合适的采样点数&#xff0c;利用DFT分析信号的…

FPGA 以太网通信UDP通信环回

1 实验任务 上位机通过网口调试助手发送数据给 FPGA &#xff0c; FPGA 通过 PL 端以太网接口接收数据并将接收到的数据发送给上位机&#xff0c;完成以太网 UDP 数据的环回。 2 系统设计 系统时钟经过PLL时钟模块后&#xff0c;生成了两种不同频率和相位的时钟信号&#…

基于SpringBoot+VueHome F家居系统的设计与实现

系统介绍 该Home F家居系统采用B/S架构、前后端分离以及MVC模型进行设计&#xff0c;并采用Java语言以及SpringBoot框架进行开发。本系统主要设计并完成了用户注册、登录&#xff0c;购买家具过程、个人信息修改等&#xff0c;商家添加家具信息、对家具进行发货&#xff0c;管理…

缓解程序员工作压力:从心理健康到社交网络

缓解程序员工作压力&#xff1a;从心理健康到社交网络 缓解程序员工作压力&#xff1a;从心理健康到社交网络摘要引言工作与休息的平衡制定有效的工作计划定时休息和放松 心理健康与自我关怀培养良好的生活习惯寻找心灵的慰藉 社交与网络建设加入专业社区和论坛建立良好的同事关…

【静态分析】静态分析笔记09 - 污点分析

参考&#xff1a; 【课程笔记】南大软件分析课程—16课时完整版 - 知乎 ------------------------------------------------------------------------------- 1. 信息流安全 访问控制&#xff1a;关注信息访问。 信息流安全&#xff1a;关注信息传播。 信息流&#xff1a…

自己搭建的大疆无人机RTMP流媒体服务延迟太大

流程&#xff1a;无人机摄像头->图传->遥控器->流媒体服务器->取流播放&#xff0c;延迟有10秒来的&#xff0c;大家有没有什么好的方案。

【介绍下有那些常见的ssh功能】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

python作业 切片逆转

题目&#xff1a; &#xff08;反转显示一个整数&#xff09;编写下面的函数&#xff0c;反向显示一个整数。 列如&#xff1a;reserse(3456)。编写一个测试程序&#xff0c;提示用户输入一个整数&#xff0c;然后显示它的反向数。 第一步定义一个函数&#xff1a; def rev…

Linux进程概念(六):进程控制

目录 进程创建 fork函数 进程终止 终止时干了什么 进程终止的三种情况 main函数的返回值 打印默认退出码 自定义退出码 总结 进程终止 exit函数 _exit函数 exit和_exit的区别 进程等待 什么是进程等待 为什么要有进程等待 wait函数 waitpid函数 阻塞等待与…

【前端开发基础知识快速入门】

前端开发基础知识&快速入门 一、VSCode 使用1.1 安装常用插件1.2 创建项目1.3 创建网页1.4 运行效果二、ES62.1 简介2.2 什么是 ECMAScript2.3 ES6 新特性2.3.1 let 声明变量2.3.2 const 声明常量(只读变量)2.3.3 解构表达式2.3.4 字符串扩展2.3.5 函数优化2.3.6 对象优化…

开发日志(20240422):一次以为是跨域但并不是跨域的问题排查记录

1. 日志 在前后端联调的时候&#xff0c;遇到了报错&#xff0c;如下图所示&#xff08;现在再看感觉非常简单了&#xff09;&#xff0c;发现前一个请求通过了&#xff0c;但是第二个请求报错&#xff0c;然后看到 strict-origin-when-cross-origin 条件反射的认为是跨域配置…

流量网关与服务网关的区别:(面试题,掌握)

流量网关&#xff1a;&#xff08;如Nignx&#xff0c;OpenResty&#xff0c;Kong&#xff09;是指提供全局性的、与后端业务应用无关的策略&#xff0c;例如 HTTPS证书认证、Web防火墙、全局流量监控&#xff0c;黑白名单等。 服务网关&#xff1a;&#xff08;如Spring Clou…

初步认识Java

Java之父 Java 语言源于 1991 年 4 月&#xff0c;Sun 公司 James Gosling博士 领导的绿色计划(Green Project) 开始启动&#xff0c;此计划最初的目标是开发一种能够在各种消费性电子产品(如机顶盒、冰箱、收音机等)上运行的程序架构。这个就是Java的前身&#xff1a; Oak (得…

【Node.js工程师养成计划】之打造自己的脚手架工具

一、创建全局的自定义命令 1、打开一个空文件夹&#xff0c;新建一个bin文件夹&#xff0c;在bin文件夹下新建cli.js文件&#xff0c;js文件可以命名为cli.js&#xff08;您随意&#xff09; 2、在cli.js文件中的开头&#xff08;&#xff01;&#xff01;&#xff09;写下面这…

系统服务(22年国赛)—— 磁盘管理(压缩去重)

前言&#xff1a;原文在我的博客网站中&#xff0c;持续更新数通、系统方面的知识&#xff0c;欢迎来访&#xff01; 系统服务&#xff08;22年国赛&#xff09;—— 磁盘管理(压缩&&去重)https://myweb.myskillstree.cn/90.html 目录 StorageSrv 安装并创建vdo 将…

MIT 6.172 笔记 现代硬件算法案例分析

本文是https://en.algorithmica.org/hpc/和MIT 6.172的课后题解析 课程地址&#xff1a; 文章目录 HW2 Profiling Serial Merge Sort测试DEBUG和非DEBUG区别测试inline和非inline区别Coarsening HW3 向量化为什么用负偏移量测量向量化跨步向量化 HW4 Reducer Hyperobjects比较o…
最新文章