Confluent Schema Registry provides a serving layer for the metadata. It provides a RESTful interface for storing and retrieving schema of Avro, JSON, and Protobuf. It stores a versioned history of all the schemas based on a specified subject name strategy and formate of schema, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility settings and expanded support for these schema types. It provides serializers that plug into Apache Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in any of the supported formats.
Schema Registry lives outside from Kafka brokers. Producers and consumers still talk to Kafka to publish and read data (messages) to topics. Concurrently, they can also talk to Schema Registry to send and retrieve schemas that describe the data models for the messages.
Here is the example:<repositories> <repository> <id>avroserializer</id> <url>https://maven.repository.redhat.com/earlyaccess/all/</url> </repository> </repositories> <dependencies> <!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer --> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>5.3.0</version> </dependency> </dependencies>customer.avsc- avro schema: Place it in src folder.
{ "namespace":"com.test.training", "type":"record", "name":"customer", "fields":[ {"name":"id","type":"int"}, {"name":"name","type":"string"}, {"name":"email","type":"string"} ] }AvroSenderValidation.java Place it in src folder.
package com.confluent.avro; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Properties; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; public class AvroSenderValidation { public static void main(String[] args) { // TODO Auto-generated method stub byte[] array=new byte[1024]; try { //Reading avro file- placed in src folder FileInputStream fin=new FileInputStream("customer.avsc"); fin.read(array); fin.close(); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } Properties props=new Properties(); //Proving Bootstrap server host and port props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); //Schema registry props.put("schema.registry.url", "http://localhost:8081"); KafkaProducerworspace structureproducer=new KafkaProducer<>(props); Parser parser=new Parser(); Schema schema=parser.parse(new String(array)); GenericRecord genericRecord=new GenericData.Record(schema); genericRecord.put("id", 1001); genericRecord.put("email", "mahira.s@gmail.com"); //mh-topic is subject name- in Scema subjects will be denoted by each format //rec-1 is key which is STringSerializer //genericRecord is the value having KafkaAvroSerializer ProducerRecord record=new ProducerRecord ("mh-topic", "rec-1", genericRecord); producer.send(record); producer.close(); System.out.println("message sent"); } }
./confluent statusIf server up here is the message
[kafka@localhost bin]$ ./confluent status This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html control-center is [UP] ksql-server is [UP] connect is [UP] kafka-rest is [UP] schema-registry is [UP] kafka is [UP] zookeeper is [UP]If server is not UP use command to start confluent
./confluent startRun the AVroSenderValidation java you can see Message Sent message after sending to message to topic.
zookeeper is [DOWN] Cannot start Kafka, Zookeeper is not running. Check your deployment [kafka@localhost bin]$ ./confluent start This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry |Schema Registry failed to start schema-registry is [DOWN] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] Cannot start ksql-server, Kafka Server or Schema Registry Server is not running. Check your deployment [kafka@localhost bin]$ ./confluent start schema-registry This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.html zookeeper is already running. Try restarting if needed kafka is already running. Try restarting if needed Starting schema-registry schema-registry is [UP] [kafka@localhost bin]$ ./kafka-console-consumer --topic mh-topic --bootstrap-server localhost:9092 --from-beginning [2021-09-03 09:18:17,596] WARN [Consumer clientId=consumer-1, groupId=console-consumer-95322] Error while fetching metadata with correlation id 2 : {mh-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) ^[[A^[[A^CProcessed a total of 0 messages [kafka@localhost bin]$ ./kafka-console-consumer --topic mh-topic --bootstrap-server localhost:9092 --from-beginning $mahira.s@gmail.com ^CProcessed a total of 1 messages [kafka@localhost bin]$ ./kafka-avro-console-consumer --topic mh-topic --bootstrap-server localhost:9092 --from-beginning {"id":1001,"email":"mahira.s@gmail.com"} ^CProcessed a total of 1 messages [kafka@localhost bin]$ ./kafka-avro-console-consumer --topic mh-topic --bootstrap-server localhost:9092 --from-beginning --property print.schema.ids=true --property schema.id.separator=: {"id":1001,"email":"mahira.s@gmail.com"}:21 ^CProcessed a total of 1 messages [kafka@localhost bin]$ curl -X GET http://localhost:8081/subjects ["mysql-etl-topic-emp_bofa-value","test-specific-topic-value","test-topic-value","mh-topic-value","test-topic-1-value"][kafka@localhost bin]$ [kafka@localhost bin]$ curl -X GET http://localhost:8081/subjects/mh-topic-value/versions [1][kafka@localhost bin]$ curl -X GET http://localhost:8081/subjects/mh-topic-value/versions/1 {"subject":"mh-topic-value","version":1,"id":21,"schema":"{\"type\":\"record\",\"name\":\"customer1\",\"namespace\":\"com.test.training1\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"email\",\"type\":\"string\"}]}"}[kafka@localhost bin]$ [kafka@localhost bin]$ curl -X GET http://localhost:8081/schemas/ids/21 {"schema":"{\"type\":\"record\",\"name\":\"customer1\",\"namespace\":\"com.test.training1\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"email\",\"type\":\"string\"}]}"}[kafka@localhost bin]$ [kafka@localhost bin]$ curl -X GET http://localhost:8081/subjects/mh-topic-value/versions/1 {"subject":"mh-topic-value","version":1,"id":21,"schema":"{\"type\":\"record\",\"name\":\"customer1\",\"namespace\":\"com.test.training1\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"email\",\"type\":\"string\"}]}"}[kafka@localhost bin]$ curl -X GET http://localhost:8081/subjects/mh-topic-value/versions/21 {"error_code":40402,"message":"Version not found."}[kafka@localhost bin]$