Confluent Schema Registry with Avro Example

Confluent Schema Registry provides a serving layer for the metadata. It provides a RESTful interface for storing and retrieving schema of Avro, JSON, and Protobuf. It stores a versioned history of all the schemas based on a specified subject name strategy and formate of schema, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility settings and expanded support for these schema types. It provides serializers that plug into Apache Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in any of the supported formats.

Schema Registry lives outside from Kafka brokers. Producers and consumers still talk to Kafka to publish and read data (messages) to topics. Concurrently, they can also talk to Schema Registry to send and retrieve schemas that describe the data models for the messages.

Here is the example:
Create Java Project and convert to Maven project. Add below dependancies:
<repositories>
	 <repository>
	<id>avroserializer</id>
	<url>https://maven.repository.redhat.com/earlyaccess/all/</url>
	</repository>
 </repositories>
 <dependencies>
  <!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
	<dependency>
		<groupId>io.confluent</groupId>
		<artifactId>kafka-avro-serializer</artifactId>
		<version>5.3.0</version>
	</dependency>
  </dependencies>
customer.avsc- avro schema: Place it in src folder.
When a message is sent to kafka in avro format, it should be in the form of GenericRecord. For each topic, there is a subject in the schema registry. Each subject represents a type name. Each subject can have multiple versions.
{
"namespace":"com.test.training",
"type":"record",
"name":"customer",
"fields":[
{"name":"id","type":"int"},
{"name":"name","type":"string"},

{"name":"email","type":"string"}
]
}
AvroSenderValidation.java Place it in src folder.
package com.confluent.avro;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import io.confluent.kafka.serializers.KafkaAvroSerializer;

public class AvroSenderValidation {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		byte[] array=new byte[1024];
		try {
			//Reading avro file- placed in src folder
			FileInputStream fin=new FileInputStream("customer.avsc");
			fin.read(array);
			fin.close();
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		Properties props=new Properties();
		//Proving Bootstrap server host and port
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
		//Schema registry
		props.put("schema.registry.url", "http://localhost:8081");
		KafkaProducer producer=new KafkaProducer<>(props);
		
		Parser parser=new Parser();
		Schema schema=parser.parse(new String(array));
		GenericRecord genericRecord=new GenericData.Record(schema);
		genericRecord.put("id", 1001);
		
		genericRecord.put("email", "mahira.s@gmail.com");
		//mh-topic is subject name- in Scema subjects will be denoted by each format
		//rec-1 is key which is STringSerializer
		//genericRecord is the value having KafkaAvroSerializer
		ProducerRecord record=new ProducerRecord
			("mh-topic", "rec-1", genericRecord);
		producer.send(record);
		producer.close();
		System.out.println("message sent");
	}
}
worspace structure
AVro workspace structure
Check for Confluent server is up by
./confluent status
If server up here is the message
[kafka@localhost bin]$ ./confluent status
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html

control-center is [UP]
ksql-server is [UP]
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]

If server is not UP use command to start confluent
./confluent start
Run the AVroSenderValidation java you can see Message Sent message after sending to message to topic.

AVro Sender

Just run below commands to see the response.

zookeeper is [DOWN]
Cannot start Kafka, Zookeeper is not running. Check your deployment
[kafka@localhost bin]$ ./confluent start
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html


Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
|Schema Registry failed to start
schema-registry is [DOWN]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Cannot start ksql-server, Kafka Server or Schema Registry Server is not running. Check your deployment
[kafka@localhost bin]$ ./confluent start schema-registry
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html


zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
Starting schema-registry
schema-registry is [UP]
 
[kafka@localhost bin]$ ./kafka-console-consumer --topic mh-topic --bootstrap-server localhost:9092 --from-beginning
[2021-09-03 09:18:17,596] WARN [Consumer clientId=consumer-1, groupId=console-consumer-95322] Error while fetching metadata with correlation id 2 : {mh-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
^[[A^[[A^CProcessed a total of 0 messages
[kafka@localhost bin]$ ./kafka-console-consumer --topic mh-topic --bootstrap-server localhost:9092 --from-beginning
$mahira.s@gmail.com
^CProcessed a total of 1 messages

[kafka@localhost bin]$ ./kafka-avro-console-consumer --topic mh-topic --bootstrap-server localhost:9092 --from-beginning
{"id":1001,"email":"mahira.s@gmail.com"}
^CProcessed a total of 1 messages
 
[kafka@localhost bin]$ ./kafka-avro-console-consumer --topic mh-topic --bootstrap-server localhost:9092 --from-beginning  --property print.schema.ids=true --property schema.id.separator=:
{"id":1001,"email":"mahira.s@gmail.com"}:21
^CProcessed a total of 1 messages

[kafka@localhost bin]$ curl -X GET http://localhost:8081/subjects
["mysql-etl-topic-emp_bofa-value","test-specific-topic-value","test-topic-value","mh-topic-value","test-topic-1-value"][kafka@localhost bin]$ 
 
[kafka@localhost bin]$ curl -X GET http://localhost:8081/subjects/mh-topic-value/versions
[1][kafka@localhost bin]$ curl -X GET http://localhost:8081/subjects/mh-topic-value/versions/1
{"subject":"mh-topic-value","version":1,"id":21,"schema":"{\"type\":\"record\",\"name\":\"customer1\",\"namespace\":\"com.test.training1\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"email\",\"type\":\"string\"}]}"}[kafka@localhost bin]$ 

[kafka@localhost bin]$ curl -X GET http://localhost:8081/schemas/ids/21
{"schema":"{\"type\":\"record\",\"name\":\"customer1\",\"namespace\":\"com.test.training1\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"email\",\"type\":\"string\"}]}"}[kafka@localhost bin]$ 

[kafka@localhost bin]$ curl -X GET http://localhost:8081/subjects/mh-topic-value/versions/1
{"subject":"mh-topic-value","version":1,"id":21,"schema":"{\"type\":\"record\",\"name\":\"customer1\",\"namespace\":\"com.test.training1\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"email\",\"type\":\"string\"}]}"}[kafka@localhost bin]$ curl -X GET http://localhost:8081/subjects/mh-topic-value/versions/21
{"error_code":40402,"message":"Version not found."}[kafka@localhost bin]$