Sunday, 17 May 2020

MYSQL CDC with Apache Kafa, Debezium and Talend

Hi,

In This blog I would like to explain capture change data from MySQL Database by using Kafa, Debezium plugin, and Talend.

Flow Diagram



The Debezium Mysqlconnector will connect MySQL server binlog file capture the changes on Table and produce the changes to Kafa Topic.
We can implement a Talend job to consume the changes and Sync with target databases.
Now we can see the configuration.

Install Debizeum MySQL Connector on Windows.
 Here I am doing all the configuration on windows. it is very simple. please follow the below steps.

1. Download and install the Kafka and zookeeper. select BInary download method.
https://kafka.apache.org/downloads.
Here I have used Kafka2.11-2.10 version this folder contain zookeeper also. so I no need to install zookeeper separately.

2. Download Debezium Myconnector plugin.  and Extract into a folder.


3. Download and install Mysql Server. I already MySQL 8 version in my system.

4. Start the Zookeeper server.
       open the command prompt and move to the Kafka folder and execute the below command. so the default zookeeper port is 2181.
 C:\kafka_2.11-2.1.0>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties.
check the zookeeper server status you can see the below message.

5. Start the Kafka server.
       open the command prompt and move to the Kafka folder and execute the below command. so the default Kafka port is 9092.
 C:\kafka_2.11-2.1.0>.\bin\windows\kafka-server-start.bat .\config\server.properties
check the Kafka server status you can see the below message.

6. copy all Debezium connector jar files to the Kaka lib folder.






















7. Create properties file in the config folder of Kafka with Debezium connector configuration parameters. the Debezium connector requires information to connect MySQL database like host, username, port, Database, and topic name. we have other parameters also for that you need to read Kafka connect.
for now use the properties to connect MySQL.

8. Start Kafa Connect
    Open the command prompt and move to the Kafka folder and execute the below command. Here Kafa Connect we can use Port 8083 to check the connector configuration and status.
C:\kafka_2.11-2.1.0>.\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-Mysql.properties

If the connector configuration success the first time Debezium connector applies snapshot on your specified Database and create the topics in Kafka. the topic names will create in the below format.
Database.server.name.Dtabase.name.tablename.
Ex: in My case "jilani-PC.test.Customer".


9. Check the connector status by using http://localhost:8083/connectors/mysql-connector/status
     Response should be like this
   "name":"mysql-connector","connector":      {"state":"RUNNING","worker_id":"192.168.56.1:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"192.168.56.1:8083"}],"type":"source"}

10. check the list of topics created 

Kafka-topics --zookeeper localhost:2181 --list

11. test the scenarios.
Consume any one topic from the above topics, I would like to do the changes on the customer table will consume the messages from "jilani-PC.test.customer".
Run the Kafka consumer on the above topic to watch the messages.

C:\kafka_2.11-2.1.0\bin\windows>kafka-console-consumer.bat -bootstrap-server "localhost:9092" -topic "jilani-PC.test.customer" -from-beginning.

if we change the customer table the changes need to sync with the customer_sync table.
check the customer table and customer_sync table both are empty.









Create a Talend Job to consume the message from Kafka and insert or update or delete the data into the Customer_Sync table.















Insert Record:

let's insert a record in the customer table the same record should insert into the Customer_Sync table.


Update Record:
let's update a record in the customer table the same record should update into the Customer_Sync table.

Delete Record:
let's delete a record in the customer table the same record should logical delete (update the flag to 'd') into the Customer_Sync table.
Thanks
Jilani Syed

java.io.IOException: org.eclipse.aether.deployment.DeploymentException: Failed to deploy artifacts: Could not transfer artifact Return code is: 400, ReasonPhrase: Repository does not allow updating assets: releases.

 HI, by Default in Nexus release branch will not allow redeploy of the Same version of Talend job. if we need to redeploy same job again int...