专业的编程技术博客社区

网站首页 > 博客文章 正文

canal中间件的使用(canal 中间件)

baijin 2024-09-03 10:02:35 博客文章 7 ℃ 0 评论

最近正在迭代公司的一个项目,由于业务调整,需要用到阿里开源的canal中间件,业余时间整理了一下相关的知识点,有需要的小伙伴可以参考一下!

生产环境遇到的问题

  • 数据库更新数据后,缓存也要相应的更新
  • 数据库更新后,elasticsearch,hbase中的数据也要及时更新
  • 数据库更新后,kafaka消息队列中也要及时更新

也就是说,在数据库更新后,依赖这些数据的服务都需要做相应的变化,需要在相应的服务中写相应的逻辑,对原有代码侵入量比较大,也不利于后期的维护

什么canal

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件,canal通过binlog同步拿到变更数据,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等多源同步

测试环境

系统:ubuntu:21

以下服务都在docker中运行

mysql:8.0

canal-server:latest

安装mysql步骤省略

  • 注意点如下
  • 1. 一定要开启binlog 日志

    2. 设置binglog 行模式为row

    3.设置一个全局唯一的server_id

    创建canal用户,并授予权限

    创建用户

    create user canal identified by 'canal';

    授予权限

    grant select ,replication slave ,replication client on *.* to 'canal'@'%';

    刷新权限

    flush privileges;

    //如果是mysql8.0以上的版本,需要修改一下加密方式,否则同步时会有异常,我这里以前设置过,所以省略了

    安装canal-server服务端

    下载canal-sever镜像

    docker pull canal/canal-server:latest

    启动方式一,直接使用 docker run在命令行运行

    说明:

    mysql 和canal-server必须可以通信,由于我的mysql和canal-server在同一台机器上,但是mysql并没有默认的网络,这里使用的是自定义网络wwwdata_frontend,所以也需要把canal-server加入到这个网络中


    docker run -d --name canal-server --network wwwdata_frontend -p 11111:11111 canal/canal-server:latest

    启动方式二,使用docker-compose启动,方便多个服务管理

    docker-compose.yml部分配置如下

    canal-server:

    image: canal/canal-server:latest

    container_name: canal-server

    restart: always

    ports:

    - "9100:9100"

    - "11111:11111"

    - "11110:11110"

    - "11112:11112"

    networks:

    - frontend

    volumes:

    - /wwwdata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties

    - /wwwdata/canal/conf/canal.properties:/home/admin/canal-server/conf/example/canal.properties

    需要完整的docker-compose.yml(测试服务器上的服务只包含nginx,redis,php,mysql,canal)可以私信我哈,有需要帮忙制作dockerfiledocker-compose.yml文件的,都可以私信我哈

    修改配置文件instance.properties和canal.properties

    我这里由于测试,大部分使用了默认的参数,有以下几个参数需要注意

    instance.properties文件中

    canal.instance.master.address=mysql80:3306 我这里的mysql80是我的docker中运行的mysql服务名,当然也可以mysql对应的ip,(两个服务必须可以ping)

    这里是在mysql中创建的用户名

    canal.instance.dbUsername=canal canal.instance.dbPassword=canal

    canal.properties文件中

    canal端口 canal.port = 11111 canal.metrics.pull.port = 11112

    canal.destinations = example //我这里用的是默认的

    再重新启动canal-server,查看日志,如下则说明执行成功

    编写客户端来测试一下

    我这里使用go客户端来实现

    package main
    
    import (
    	"fmt"
    	"github.com/golang/protobuf/proto"
    	"github.com/withlin/canal-go/client"
    	pbe "github.com/withlin/canal-go/protocol/entry"
    	"log"
    	"os"
    	"time"
    )
    
    func main() {
    	//idletimeout 设置为0 表示不限制
    	connector := client.NewSimpleCanalConnector("canal-serve地址", 端口号默认11111, "用户名", "密码", "destination默认是example", sotimeOut, idletimeout设置为0表示不退出)
    	err := connector.Connect()
    	if err != nil {
    		log.Println(err)
    		os.Exit(1)
    	}
    	//订阅表,所有表
    	err = connector.Subscribe(".*\\..*")
    	if err != nil {
    		fmt.Println(err)
    		return
    	}
    
    	for {
    
    		message, err := connector.Get(100, nil, nil)
    		if err != nil {
    			log.Println(err)
    			os.Exit(1)
    		}
    
    		batchId := message.Id
    		if batchId == -1 || len(message.Entries) <= 0 {
    			time.Sleep(3000 * time.Millisecond)
    			fmt.Println("===没有数据了===")
    			continue
    		}
    
    		printEntry(message.Entries)
    
    	}
    
    }
    func printEntry(entrys []pbe.Entry) {
    
    	for _, entry := range entrys {
    		if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {
    			continue
    		}
    		rowChange := new(pbe.RowChange)
    
    		err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
    		checkError(err)
    		if rowChange != nil {
    			eventType := rowChange.GetEventType()
    			header := entry.GetHeader()
    			fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType()))
    
    			for _, rowData := range rowChange.GetRowDatas() {
    				if eventType == pbe.EventType_DELETE {
    					printColumn(rowData.GetBeforeColumns())
    				} else if eventType == pbe.EventType_INSERT {
    					printColumn(rowData.GetAfterColumns())
    				} else {
    					fmt.Println("-------> before")
    					printColumn(rowData.GetBeforeColumns())
    					fmt.Println("-------> after")
    					printColumn(rowData.GetAfterColumns())
    				}
    			}
    		}
    	}
    }
    
    func printColumn(columns []*pbe.Column) {
    	for _, col := range columns {
    		fmt.Println(fmt.Sprintf("%s : %s  update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))
    	}
    }
    
    func checkError(err error) {
    	if err != nil {
    		fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
    		os.Exit(1)
    	}
    }

    测试

    mysql添加数据

    //创建表

    CREATE TABLE `canal` (

    `id` int NOT NULL AUTO_INCREMENT,

    `name` varchar(255) DEFAULT NULL,

    PRIMARY KEY (`id`)

    ) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

    //添加数据

    insert INTO canal (name) VALUE("abcccc")

    在客户端可以看到,实时更新了

    本次测试并没有实现HA,只有单机实例做的测试,生产中遇到啥坑,后面再填吧

    本文暂时没有评论,来添加一个吧(●'◡'●)

    欢迎 发表评论:

    最近发表
    标签列表