编程知识 cdmana.com

Tencent Daniel teaches you click house to synchronize MySQL data in real time

author Shi Pengzhou CSIG R & D Engineer of cloud and smart industry business group

ClickHouse As OLAP Analysis engines have been widely used , Data import and export is the first problem faced by users . because ClickHouse It can't support a large number of single writes , Therefore, we need other services to help us synchronize data in real time . This paper gives a combination of Canal+Kafka The plan , And given in multiple MySQL In the case of instance sub database and sub table , How to put more than one MySQL The data table is written to the same sheet ClickHouse The method of table , Welcome criticism .

Let's first look at the context of our needs :

  1. Real time synchronization of multiple MySQL Instance data to ClickHouse, Daily scale 500G, The number of records is billion levels , The synchronization delay of minutes is acceptable ;

  2. Some database tables have the operation of sub database and sub table , Users need to cross MySQL Instance synchronizes tables across databases to ClickHouse In a table of ;

  3. The existing MySQL binlog Open source components (Canal), It is impossible to map multiple source data tables to one destination table .

The basic principle

One 、 Use JDBC Mode synchronization

  1. Use Canal Component complete binlog Analysis and data synchronization of ;

  2. Canal-Server The process will be disguised as MySQL Of slave, Use MySQL Of binlog Synchronization protocol completes data synchronization ;

  3. Canal-Adapter The process is responsible for starting from canal-server Get parsed binlog, And through jdbc Interface is written to ClickHouse;

image.png

advantage :

  1. Canal Component native support ;

shortcoming :

  1. Canal-Adpater When writing, the source table corresponds to the destination table one by one , Lack of flexibility ;

  2. Need to maintain two Canal Component process ;

Two 、Kafka+ClickHouse Materialized view mode synchronization

  1. Canal-Server complete binlog Parsing , And will parse the json write in Kafka;

  2. Canal-Server You can filter database and table names based on regular expressions , And write... According to the rules Kafka Of topic;

  3. ClickHouse Use KafkaEngine and Materialized View Complete message consumption , And write to the local table ;

image.png

advantage :

  1. Kafka Support horizontal scaling , It can be adjusted according to the size of the data partition number ;

  2. Kafka Merge write requests after introduction , prevent ClickHouse Generate a lot of small files , This will affect query performance ;

  3. Canal-Server Support rule filtering , You can flexibly configure the upstream MySQL The database name and table name of the instance , And indicate what is written Kafka topic name ;

shortcoming :

  1. Need to maintain Kafka And configuration rules ;

  2. ClickHouse You need to create a new related view 、Kafka Engine And so on ;

Specific steps

One 、 preparation

  1. If you use TencentDB, Confirm at the console binlog_format by ROW, No unnecessary operation is required .

image.png

If it's self built MySQL, Query variables in the client :

>   show variables like '%binlog%';

+-----------------------------------------+----------------------+

| Variable_name                           | Value                |

+-----------------------------------------+----------------------+

| binlog_format                           | ROW                  |

+-----------------------------------------+----------------------+

 

> show variables like '%log_bin%';

+---------------------------------+--------------------------------------------+

| Variable_name                   | Value                                      |

+---------------------------------+--------------------------------------------+

| log_bin                         | ON                                         |

| log_bin_basename                |  /data/mysql_root/log/20146/mysql-bin        |

| log_bin_index                   |  /data/mysql_root/log/20146/mysql-bin.index |

+---------------------------------+--------------------------------------------+
  1. Create account canal, For synchronization binlog

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%';

FLUSH PRIVILEGES;

Two 、Canal Component deployment

precondition :

Canal The machine for component deployment needs to follow ClickHouse Service and MySQL Inter-switch communication ;

It needs to be deployed on the machine java8, To configure JAVA_HOME、PATH And other environmental variables ;

Basic concepts :

image.png

1. Canal-Server Component deployment

Canal-Server Its main function is to subscribe to binlog Information and parsing and defining instance Related information , Suggest that each Canal-Server The process corresponds to a MySQL example ;

1) download canal.deployer-1.1.4.tar.gz, decompression

2) Modify the configuration file conf/canal.properties, The configuration that needs attention is as follows :

...

#  Port related information , If multiple processes are deployed on the same machine, they need to be modified 

canal.port = 11111

canal.metrics.pull.port = 11112

canal.admin.port = 11110

...

#  Service mode 

canal.serverMode = tcp

...

# Kafka Address 

canal.mq.servers = 172.21.48.11:9092

#  When using message queuing   These two values must be true

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

...

# instance list ,conf There must be a directory with the same name 

canal.destinations = example,example2

3) To configure instance

You can refer to example Add new instance, Mainly modify the configuration file conf/${instance_name}/instance.properties file .

Examples 1: To synchronize a database with XX The table at the beginning of the prefix

subscribe 172.21.48.35 Of MySQL Of testdb In the database tb_ Data changes in the beginning table ( for example tb_20200801 、 tb_20200802 etc. ), The main steps are as follows :

step 1: establish example2 example :cddeployer/conf && cp -r example example2

step 2: modify deployer/conf/example2/instance.properties file

...

#  The upstream MySQL Address of the instance 

canal.instance.master.address=172.21.48.35:3306

...

#  Synchronize account information 

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

#  Filter database name and table name 

canal.instance.filter.regex=testdb\\.tb_.*,

step 3: stay conf/canal.properties Revision in China canal.destinations , newly added example2

Examples 2: Synchronize multiple databases to XX The table at the beginning of the prefix , And output to Kafka

subscribe 172.21.48.35 Of MySQL Of empdb_0 Database employees_20200801 surface ,empdb_1 Database employees_20200802 surface , And the data is written to Kafka;

step 1: establish example2 example :cddeployer/conf && cp -r example example3

step 2: modify deployer/conf/example3/instance.properties file

...

#  The upstream MySQL Address of the instance 

canal.instance.master.address=172.21.48.35:3306

...

#  Synchronize account information 

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

#  Filter database name and table name 

canal.instance.filter.regex=empdb_.*\\.employees_.*

...

# Kafka Of topic Names and matching rules 

canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*

canal.mq.partition=0

 

# Kafka topic Number of partitions for ( namely partition number )

canal.mq.partitionsNum=3

 

#  according to employees_ At the beginning of the table  emp_no Field for data hash, It's distributed in different ways partition

canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

step 3: stay Kafka New China topic employees_topic, Specify the number of partitions as 3

step 4: stay conf/canal.properties Revision in China canal.destinations , newly added example3; Change the service mode to kafka, To configure kafka Related information ;

#  Service mode 

canal.serverMode = kafka

...

# Kafka Address 

canal.mq.servers = 172.21.48.11:9092

#  When using message queuing   These two values must be true

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

...

# instance list ,conf There must be a directory with the same name 

canal.destinations =  example,example2,example3

2. Canal-Adapter Component deployment ( Only plan one )

Canal-Adapter The main role of JDBC Interface write ClickHouse data , You can configure the writing of multiple tables ;

1) download canal.adapter-1.1.4.tar.gz, decompression ;

2) stay lib New under directory clickhouse drive jar Package and httpclient Of jar package httpcore-4.4.13.jar、httpclient-4.3.3.jar、clickhouse-jdbc-0.2.4.jar;

3) Modify the configuration file conf/application.yml file , modify canalServerHost、srcDataSources、canalAdapters Configuration of ;

server:

   port: 8081

spring:

   jackson:

     date-format: yyyy-MM-dd HHss

     time-zone: GMT+8

     default-property-inclusion: non_null

 

canal.conf:

   mode: tcp

   canalServerHost: 127.0.0.1:11111   # canal-server Service address of 

   batchSize: 500

   syncBatchSize: 1000

   retries: 0

   timeout:

   accessKey:

  secretKey:

  #  MySQL Configuration of , Modify user name, password and make database 

   srcDataSources:

     defaultDS:

       url: jdbc:mysql://172.21.48.35:3306

       username: root

       password: yourpasswordhere

   canalAdapters:

  -  instance: example

     groups:

     - groupId: g1

       outerAdapters:

       - name: logger

       - name: rdb

         key: mysql1

         # clickhouse Configuration of , Modify user name password database 

         properties:

           jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver

           jdbc.url: jdbc:clickhouse://172.21.48.18:8123

           jdbc.username: default

           jdbc.password:

4) Modify the configuration file conf/rdb/mytest_user.yml file

dataSourceKey: defaultDS

destination: example

groupId: g1

outerAdapterKey: mysql1

concurrent: true

dbMapping:

  database:  testdb

   mirrorDb: true

In the above configuration file , Because it's on mirrorDb: true, The end ClickHouse Must have the same name as the database .

Examples 1: The name of the source database is different from that of the target database , The source table name is different from the target table name

modify adapter Of conf/rdb/mytest_user.yml The configuration file , Specify the source and target databases

dataSourceKey: defaultDS

destination: example

groupId: g1

outerAdapterKey: mysql1

concurrent: true

dbMapping:

   database: source_database_name

   table: source_table

   targetTable: destination_database_name.destination_table

   targetColumns:

     id:

     name:

  commitBatch:  3000 #  The size of the batch submission 


Examples 2: Multiple source database tables are written to the same table on the destination side

stay conf/rdb Directory configuration multiple yml file , Point out the different table name .

Kafka Service configuration

One 、 Adjust reasonably producer Parameters

confirm Canal-Server Inside canal.properties file , The important parameters are shown in the table below ;

image.png

Two 、 New related topic name

according to Canal-Server in instance In the configuration file instance.properties, Pay attention to the number of partitions and canal.mq.partitionsNum bring into correspondence with ;

partition The number needs to take into account the following factors :

  1. Upstream MySQL The amount of data . In principle, the more data is written , More should be allocated partition number ;

  2. Consider downstream ClickHouse Number of instances .topic Of partition Total number of divisions best No more than The downstream ClickHouse The total number of instances of , Make sure that each ClickHouse Instances can be assigned to at least one partition;

ClickHouse Service configuration

According to the upstream MySQL Instance table schema New data table ;

introduce Kafka You need to build a new one Engine=Kafka The appearance and related materialization chart of the ;

Suggest :

  1. Add a different kafka_group_name, Prevent interaction ;

  2. Set up kafka_skip_broken_messages The parameter is a reasonable value , If the data cannot be parsed, it will skip ;

  3. Reasonable setting kafka_num_consumers value , It's better to make sure that all ClickHouse Instance the sum of the values is greater than topic Of partition number ;

Create a related distributed query table ;

Service startup

Start related Canal Component process ;

  1. canal-server: sh bin/startup.sh

  2. canal-adapter: sh bin/startup.sh

stay MySQL Insert data , Observe whether the log works properly ;

If you use Kafka, Can pass kafka-console-consumer.sh Script observation binlog Data analysis ;

Observe ClickHouse Whether the data is normally written in the data table ;

Actual case

demand : Real time synchronization MySQL Example of empdb_0.employees_20200801 Table and empdb_1.employees_20200802 Data sheet

programme : Use scheme two

Environment and parameters :

MySQL Address 172.21.48.35:3306
CKafka Address 172.21.48.11:9092
Canal instance name employees
Kafka Purpose topic employees_topic

1. stay MySQL New related table

# MySQL Table creation statement 

CREATE DATABASE `empdb_0`;

CREATE DATABASE `empdb_1`;

 

CREATE TABLE  `empdb_0`.`employees_20200801` (

   `emp_no` int(11) NOT NULL,

   `birth_date` date NOT NULL,

   `first_name` varchar(14) NOT NULL,

   `last_name` varchar(16) NOT NULL,

   `gender` enum('M','F') NOT NULL,

   `hire_date` date NOT NULL,

   PRIMARY KEY (`emp_no`)

);

 

CREATE TABLE  `empdb_1`.`employees_20200802` (

   `emp_no` int(11) NOT NULL,

   `birth_date` date NOT NULL,

   `first_name` varchar(14) NOT NULL,

   `last_name` varchar(16) NOT NULL,

   `gender` enum('M','F') NOT NULL,

   `hire_date` date NOT NULL,

   PRIMARY KEY (`emp_no`)

);

2. Canal-Server To configure

step 1. modify conf/canal.properties file

canal.serverMode = kafka

...

canal.destinations = example,employees

...

canal.mq.servers = 172.21.48.11:9092

canal.mq.retries = 0

canal.mq.batchSize = 16384

canal.mq.maxRequestSize = 1048576

canal.mq.lingerMs = 100

canal.mq.bufferMemory = 33554432

canal.mq.canalBatchSize = 50

canal.mq.canalGetTimeout = 100

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

canal.mq.compressionType = none

canal.mq.acks = all

canal.mq.producerGroup = cdbproducer

canal.mq.accessChannel = local

...

step 2. newly added employees example , modify employees/instances.properties To configure

...

canal.instance.master.address=172.21.48.35:3306

...

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

canal.instance.filter.regex=empdb_.*\\.employees_.*

...

canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*

canal.mq.partition=0

canal.mq.partitionsNum=3

canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

3. Kafka To configure

4. newly added topic employees_topic, The number of partitions is 3

5. ClickHouse Build table

CREATE DATABASE testckdb ON CLUSTER  default_cluster;

 

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees ON CLUSTER default_cluster (

   `emp_no` Int32,

   `birth_date` String,

   `first_name` String,

   `last_name` String,

   `gender` String,

   `hire_date` String

) ENGINE=MergeTree() ORDER BY (emp_no)

SETTINGS index_granularity = 8192;

 

 

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees_stream ON CLUSTER default_cluster (

   `emp_no` Int32,

   `birth_date` String,

   `first_name` String,

   `last_name` String,

   `gender` String,

   `hire_date` String

) ENGINE = Kafka()

SETTINGS

   kafka_broker_list = '172.21.48.11:9092',

   kafka_topic_list = 'employees_topic',

   kafka_group_name = 'employees_group',

   kafka_format = 'JSONEachRow',

   kafka_skip_broken_messages = 1024,

  kafka_num_consumers  = 1;

 

 

CREATE MATERIALIZED VIEW IF NOT EXISTS  testckdb.ck_employees_mv ON CLUSTER default_cluster TO testckdb.ck_employees(

   `emp_no` Int32,

   `birth_date` String,

   `first_name` String,

   `last_name` String,

   `gender` String,

  `hire_date`  String

) AS SELECT

   `emp_no`,

   `birth_date`,

   `first_name`,

   `last_name`,

   `gender`,

   `hire_date`

FROM

   testckdb.ck_employees_stream;

 

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees_dis ON CLUSTER default_cluster AS testckdb.ck_employees  

ENGINE=Distributed(default_cluster,  testckdb, ck_employees);

6. start-up Canal-Server service

MySQL Instance upstream inserts data , Observe whether the data is in Canal-Server Parsing is normal , Whether in ClickHouse Complete synchronization in .

This article is a platform of operation tools such as blog group sending one article and multiple sending OpenWrite Release

版权声明
本文为[Tencent cloud database]所创,转载请带上原文链接,感谢

Scroll to Top