Apache Kafka Java code for Producer and Consumer

Create project and convert to maven project.
Add below dependancy entry in pom.xml.
<dependencies>
  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.5.0</version>
   </dependency> 
 </dependencies>
To create the topic here is the commnd.
kafka-topics --create --topic ani-topic --partitions 1 --replication-factor 1 --zookeeper localhost:2181

Make sure u created topic before proceeding-ani-topic.

Apache Kafka Topic Describe
Make sure zookeeper and kafka servers are running.
Producer Code
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class SimpleSender {

	public static void main(String[] args) {
		Properties props=new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		
		KafkaProducer producer=new KafkaProducer<>(props);
		
		String topic="ani-topic";
		for(int i=1;i<=10;i++) {
			ProducerRecord record=new ProducerRecord(topic,
					"test-key", "This is a test message "+i);
			producer.send(record);
		}
			
		System.out.println("messages sent");
		producer.close();
	}
}

Consumer Code

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class SimpleRreceiver {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Properties props=new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-6");
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		
		KafkaConsumer consumer=new KafkaConsumer<>(props);
		
		String topic="ani-topic";

		List topics=Collections.singletonList(topic);
		consumer.subscribe(topics);
		System.out.println("waiting for messages ");
		while(true) {
			ConsumerRecords records=consumer.poll(Duration.ofSeconds(30));
			records.forEach(record->System.out.println("key:"+record.key()+"\tvalue: "+record.value()
			+"\tpartition: "+record.partition()));
		}
		
	}

}

Run Consumer, can see waiting message.

Apache Kafka Consumer waiting

Start Producer, messages will be sent consumer.

Apache Kafka Producer msg sent

Message delivered to partition 0.

Apache Kafka Messages delivered