canal来实现mysql的数据同步
时间:2020-11-03 17:10
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 基于日志增量订阅和消费的业务包括 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x name xxx :xxx为容器名
p 111:222 其中111是宿主机端口,222是容器端口
MYSQL_ROOT_PASSWORD=root 设置root账户密码为root 相关命令解释
master_port:Master的端口号,指的是容器的端口号
master_user:用于数据同步的用户
master_password:用于同步的用户的密码
master_log_file:指定 Slave 从哪个日志文件开始复制数据,即上文中提到的 File 字段的值
master_log_pos:从哪个 Position 开始读,即上文中提到的 Position 字段的值
master_connect_retry:如果连接失败,重试的时间间隔,单位是秒,默认是60秒
在Slave 中的mysql终端执行show slave status \\\\G;用于查看主从同步状态。 其实就是 通过 同步二进制日志文件,从服务器 会起一个io进程,读取二进制文件同步到 从服务器 参考链接 canal-php 是阿里巴巴开源项目 Canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 php 客户端。为 php 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql数据库binlog的增量订阅&消费组件。 基于日志增量订阅&消费支持的业务: 关于 Canal 的更多信息请访问 github.com/alibaba/can… canal-php 作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。举一些实际的使用例子: 1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。 2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等 3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis 4.数据库异地备份、数据同步 5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。 6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。 canal-php 是 Canal 的 php 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。 相关免费学习推荐:mysql视频教程 以上就是canal来实现mysql的数据同步的详细内容,更多请关注gxlsystem.com其它相关文章!Mysql视频教程栏目介绍基于canal实现数据同步。
canal是什么?
工作原理
基于上面的讲解,我们在实现canal之前,先简单做一个主从复制。一主 一从
docker pull mysql:latest
docker run -itd --name mysql-1 -p 23306:3306 -e MYSQL_ROOT_PASSWORD=root mysql
docker run -itd --name mysql-2 -p 23307:3306 -e MYSQL_ROOT_PASSWORD=root mysql复制代码
apt-get update
apt-get install vim复制代码
CREATE USER 'slave'@'%' IDENTIFIED BY '123456';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'slave'@'%';
FLUSH PRIVILEGES;复制代码
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
secure-file-priv= NULL
server_id=100
log-bin=mysql-slave-bin
relay_log=edu-mysql-relay-bin复制代码
mysql> change master to master_host='172.17.0.4', master_user='slave', master_password='123456', master_port=3306, master_log_file='edu-mysql-bin.000001', master_log_pos= 877, master_connect_retry=30;复制代码
为什么再 将canal 之前要先说主从复制呢,其实canal 就是把自己伪装成了从服务器,从而读取日志,拿到数据;
使用docker 部署canal
docker pull canal/canal-server:latest
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
# 构建一个destination name为test的队列, address 对应的数据库ip+端口 ,dbUsername对应数据库用户名,dbPassword对应数据库密码,注意修改为自己的
sh run.sh -e canal.auto.scan=false \\\\
-e canal.destinations=test \\\\
-e canal.instance.master.address=172.17.0.4:3306 \\\\
-e canal.instance.dbUsername=canal \\\\
-e canal.instance.dbPassword=canal \\\\
-e canal.instance.connectionCharset=UTF-8 \\\\
-e canal.instance.tsdb.enable=true \\\\
-e canal.instance.gtidon=false \\\\复制代码
php 监听数据变化
# 安装组件canal-php
composer require xingwenge/canal_php
# 编写脚本监听
<?php
namespace App\\\\Console\\\\Commands;
use xingwenge\\\\canal_php\\\\CanalClient;
use xingwenge\\\\canal_php\\\\CanalConnectorFactory;
use xingwenge\\\\canal_php\\\\Fmt;
use Illuminate\\\\Console\\\\Command;
ini_set('display_errors', 'On');
error_reporting(E_ALL);
class CanalDemo extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'CanalDemo';
/**
* The console command description.
*
* @var string
*/
protected $description = '测试canal';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
try {
$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
$client->connect("172.17.0.5", 11111);//此处修改为自己的配置
$client->checkValid();
$client->subscribe("1001", "test", ".*\\\\\\\\..*");//对应启动容器时test的队列名
while (true) {
$message = $client->get(100);
if ($entries = $message->getEntries()) {
foreach ($entries as $entry) {
Fmt::println($entry);
}
}
sleep(1);
}
$client->disConnect();
} catch (\\\\Exception $e) {
echo $e->getMessage(), PHP_EOL;
}
}
}复制代码