Confluent KSql

It provides sql based interface to interact with kafka. Supports csv,json and avro formats. The default format is csv format(delimited----csv).. Supports most of the syntax support by ansi sql standard. Supports aggregations and joins .

Easily build stream processing applications with a simple and lightweight SQL syntax. Continuously transform, enrich, join together, and aggregate your Kafka events without writing any complex application code.

Start KSQL
KSQL
KSQL: Create Stream: add_stream on top of test-ksql-topic
create stream add_stream(user varchar,country varchar) with (kafka_topic='test-ksql-topic',value_format='delimited');
Create STream
Columns in add_stream
ROWKEY- always null there is no joins.
ROWTIME- ROW insterted timestamp
USER and COUNTRY are defined columns while creating stream
Set/run below command while running any of stream commands in ksql. set 'auto.offset.reset'='earliest';
KSQL SELECT query
Here is the actual topic data
KSQL -TOPIC data

Create joins inbetween stream

create stream emp_address_stream with(kafka_topic='join-topic',value_format='delimited') as select a.emp_id,a.name,a.designation,b.location,b.city from emp_stream a inner join address_stream b within 5 days on a.emp_id=b.id;

forming joins and creating new topic called join-topic. Here are the results after running join query
ksql-joins