Confluent MYSQL to Stream

Steps to follow to connect MySql to Stream

1. Download mysql jdbc connector from https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-5.1.39.zip and extract it. 2. From the extracted directory, copy the mysql-connector-java.jar file share/java/kafka-connect-jdbc directory. 3. In the etc/kafka-connect-jdbc directory, create a file called mysql-connect-source.properties and add the following content. Confluent is in Linux, mysql is available in windows. Proving host details of mysql server instead of localhost
name=mysql-source-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://111.24.0.5:3306/trainingdb?user=root&password=abc@12345
table.whitelist=employee

mode=incrementing

incrementing.column.name=emp_id

topic.prefix=mysql-etl-topic-
4. Run the connect using the following command
Go to confluent folder and run
bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/mysql-connect-source.properties.
Above command is reading data from mysql sending to topic=mysql-etl-topic-employee.Just check the data in topic
./kafka-avro-console-consumer --topic mysql-etl-topic-employee --bootstrap-server localhost:9092 --from-beginning

Keeps inserting data in mysql in windows, You can see values coming to topic.
mysql> insert into employee(emp_name,designation) values("SK","Project Manager");

mysql> insert into employee(emp_name,designation) values("Suresh","Accountant");


Small intro on KTable:

KTable contains aggregated data of KStreams.
KStreams Example Data:
key data
101 name:Arjun,color:Blue
105 name:Suresh,color:Green
102 name:Arun,color:Yellow
101 name:Arjun,color:Orange
102 name:Arun,color:Red

KTable data for the above KStreams will be: no duplicates. based on key , duplicates are removed and latest data will be available
key data
105 name:Suresh,color:Green
101 name:Arjun,color:Orange
102 name:Arun,color:Red

KStreams vs KTable

In KStreams, all operations are inserts and in KTable, it is upsert(update/insert).
KStream is immutable and KTable is mutable.
KTable contains only the latest data.
KStream is stateless and KTable is stateful.
In Windows Will right streaming api which will read the topic send to other topic(aggregate-topic-employee) which is connected to a stream.

import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import com.boa.training.domain.Employee;
import com.boa.training.serde.EmployeeSerde;

public class MysqlStreamGroupByTest {
	public static void main(String[] args) {
		
		Properties props=new Properties();
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mysql-aggregation-app");
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class.getName());
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, EmployeeSerde.class.getName());
		
		
		KafkaStreams streams=new KafkaStreams(createTopology(), props);
		streams.start();
		try {
			Thread.sleep(10*60*1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		streams.close();
	}
	
	
	static Topology createTopology()
	{
		String inputTopic="mysql-etl-topic-employee";
		String outputTopic="aggregate-topic-employee";
		StreamsBuilder builder=new StreamsBuilder();
		KStream inputStream=builder.stream(inputTopic);
		KStream employeeDataWithKeyStream=inputStream.selectKey(
				(key,value)->{
				System.out.println("processing employee with id "+value.getEmp_id());	
				return value.getDesignation();
				});
		KGroupedStream groupedStream=employeeDataWithKeyStream.groupByKey();
		KTable countTable=groupedStream.count();
		countTable.toStream().to(outputTopic,Produced.with(Serdes.String(), Serdes.Long()));
		return builder.build();
	}

}
#Start the Zookeeper
zookeeper-server-start c:\kafka_2.12-2.5.0\config\zookeeper.properties

#Start Kafka server
kafka-server-start c:\kafka_2.12-2.5.0\config\server.properties
Below is mysql-aggregate-source.properties place it in c:\kafka_2.12-2.5.0\config
name=mysql-source-aggregate-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://localhost:3306/trainingdb?user=root&password=abc@12345
table.whitelist=employee

mode=incrementing

incrementing.column.name=emp_id

topic.prefix=mysql-ktable-topic-
Run below command
connect-standalone c:\kafka_2.12-2.5.0\config\connect-standalone.properties c:\kafka_2.12-2.5.0\config\mysql-aggregate-source.properties
Check for the topic now data is available in aggregate-topic-employee(topic)
kafka-console-consumer --topic aggregate-topic-employee --bootstrap-server localhost:9092 --from-beginning --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true
ksql-joins