Kafka Streams#

Kafka Streams is an API where both the input and output is stored in Kafka topics. Streams are a sequence of events; real-time events. Example of events:

  • Transactions

  • Stock trades

  • Deliveries

  • Network events

  • Moves in a game

Stateless and Stateful transformations#

Tranformations that are stateless do not require a special data store. Transformation that are stateful require a special data store to store the current state for the stream.

Stateless transformations#

Some stateless transformations inlcude:

  • .branch, splitting a stream into multiple streams

  • .filter, remove records based on a criteria or condition

  • .flatMap, converting input into any number of output records

  • .foreach, to perform some operation on each record. Prevents further processing

  • .groupBy, groups the stream by a new key

  • .groupByKey, groups by existing key

  • .map, converts and modify one and one record

  • .merge, used to merge two streams

  • .peek, perform stateless operation (non-terminal)

Stateful transformations#

A stateful transformation in Kafka is the aggregation of records, performed by the .aggregate stream function. Types of aggregation include:

  • .aggregate, a generalized aggregation

  • .count, counts the number of records

  • .reduce, combines grouped records to reduce the number of input records based on a value or condition

Another stateful transformation is join operation, which combines two input streams. Different types of join can be used, including:

  • .join, an inner join (intersection)

  • .leftJoin, joining “left” stream and matching records from the “right” stream

  • .outerJoin, combining two streams with all records from both streams. (union)

Note on Co-Partitioning#

When joining streams, the data must be co-partitioned:

  • Same number of partitions for input topics.

  • Same partitioning strategies for producers.

Join operations#

The following join operations are available in Kafka streams:

  • Inner join

  • Left join

  • Outer join

Join operations are performed between two streams from two topics:

val left = builder.stream(topicLeft)
val right = builder.stream(topicRight)

val innerJoined = left
    .join(
        right,
        { leftVal, rightVal -> "left=" + leftVal + ", right=" + rightVal },
        JoinWindows.of(Duration.ofMinutes(5))
    ) // Windowing
    .to(topicJoined) // Output topic

Similarly, a .leftJoin and .outerJoin can be defined. The values will join on the topics keys.

Windowing#

One way to perform operations sequentially is by using windowing. This operation allows subdivision of groups into “ time buckets”, aggregating them bit by bit. Different window types exist:

  • Tumbling: No overlap, no gap

  • Hopping: Time-based, but can have overlap and/or gaps

  • Sliding: Dynamically based, only used for joins. Defined by the length of time between timestamps of any two records. No gaps, but overlap.

  • Session: Dynamic windows formed around activity based on the timestamps. Will contain gaps due to inactivity / idle time.

Late-Arriving Records is records that fall into a time window received after the end of that window’s grace period. These records can be processed by adjusting the retention period for a window. Any records arriving after this period will not be processed.

Setting a tumbling and hopping windowing can be controlled by the .advanceBy(Duration.):

import java.time.Duration

// New window will be started after 12 seconds => Hopping time window
val windowedStream = stream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(12))) 

Time in Streams#

Time in the context of streams is used in operations such as windowing, and define time boundaries. These include:

  • Event time: Time when event was created at the source.

  • Ingestion time: Time when the event was stored in a topic partition by a Kafka broker.

  • Processing time/Wall-clock-time: Time when the event started to be handled/processed by the stream processing application.

Processing time can happen days after the event actually happened. Time zones should be standardized.

Streams vs Database/Tables#

There are some notable differences, including:

  • Streams keep a history of changes

  • No deleting a stream

  • A table can be converted to a stream (or other way)

  • Streams are preferred for transactions or real-time events / logs

Streams Design Patterns#

Depending on the goal and purpose of the data, several stream design patterns are available. Patterns:

  • Single event processing: Each event is handled independently. No state is needed.

  • Local state processing: May include storing min and max values; need to store some state. Suffers from increased memory usage.

  • Multiphase processing: Partially uses local state, but kicks over to a new partition for aggregating part of the incoming data.

  • External processing: When data is external to a stream and performs an external lookup. May cause latency issues.

  • Windowed join: Design where two event streams are matched within a time window.

  • Out of Sequence events: For handling events arriving at the wrong time.

Stream Frameworks#

What framework to use will depend on the type of application.

  • Ingest: Try Kafka Connect.

  • Low millisecond: Request/Response system.

  • Real-time data analytics: Performing complex aggregations for insight.

  • Asynchronous Microservices: Requires local state caching events.