Confluent kSQL#

kSQL/ksqlDB/KSQL is a database project meant to help create stream processing applications together with Kafka, providing SQL-like interface on top of Kafka Streams. Hence, many Streams operations can be performed using kSQL, including

  • Data transformations

  • Aggregations

  • Joins

  • Windowing

  • Modeling data with streams and tables

In contrast to SQL, kSQL queries run continuously until stopped, streaming data in real time.

kSQL Configuration#

Similar to server.properties, ad ksql.properties may contain the following properties:

bootstrap.servers=zoo1:9092
ksql.streams.state.dir=/tmp/kafka-streams

Connecting to kSQL server#

Connect to kSQL server using:

(sudo) ksql

Using kSQL – Commands#

Commands are similar to SQL, but they query data from Kafka streams and tables.

Set configuration:

SET 'auto.offset.reset' = 'earliest';

List topics:

SHOW TOPICS;

Print records in a topic (from beginning):

PRINT '<topic-name>' FROM BEGINNING;

Create a stream from a topic:

CREATE STREAM <name> (<fields>) WITH (kafka_topic='<topic>', value_format='<format>');

Here, value_format is commonly DELIMITED, while fields could be (employee_id INTEGER, name VARCHAR). Similarly, a table can be created with CREATE TABLE, which requires setting an ID using e.g. key='employee_id'.

Aggregate data:

SELECT sum(<field>) FROM <table or stream> GROUP BY <field>;

for example:

SELECT sum(vacation_days) FROM my_test_stream GROUP BY employee_id;

Note that aggregations must have a GROUP BY statement.