Kafka Streams supports stateless and stateful transformations. Stateful transformations, such as count, aggregate, or join, use operators which store their state in internal Kafka topics. Furthermore, some stateless transformations such as groupBy or repartition store their results in internal Kafka topics. By default, Kafka Streams names these internal topics based on the corresponding operator. If these topics don't exist, Kafka Streams creates internal Kafka topics. For creating the internal topics, Kafka Streams hardcodes the segment.bytes configuration and sets it to 50 MB. MSK Provisioned with Express brokers and MSK Serverless protects some topic configurations, including segment.size during topic creation. Therefore, a Kafka Streams application with stateful transformations fails to create the internal topics using MSK Express brokers or MSK Serverless.
To run such Kafka Streams applications on MSK Express brokers or MSK Serverless, you must create the internal topics yourself. To do this, first identify and name the Kafka Streams operators, which require topics. Then, create the corresponding internal Kafka topics.
Note
-
It's a best practice to name the operators manually in Kafka Streams, especially the ones which depend on internal topics. For information about naming operators, see Naming Operators in a Kafka Streams DSL Application
in the Kafka Streams documentation. -
The internal topic name for a stateful transformation depends on the
application.id
of the Kafka Streams application and the name of the stateful operator,application.id-statefuloperator_name
.
Creating a Kafka Streams application using MSK Express brokers or MSK Serverless
If your Kafka Streams application has its application.id
set to msk-streams-processing
, you can create a Kafka Streams application using MSK Express brokers or MSK Serverless. To do this, use the count()
operator, which requires an internal topic with the name. For example, msk-streams-processing-count-store
.
To create a Kafka Streams application, do the following:
Topics
Identify and name the operators
-
Identify the stateful processors using the Stateful transformations
in the Kafka Streams documentation. Some examples of stateful processors include
count
,aggregate
, orjoin
. -
Identify the processors that create topics for repartitioning.
The following example contains a
count()
operation, which needs a state.var stream = paragraphStream .groupByKey() .count() .toStream();
-
To name the topic, add a name for each stateful processor. Based on the processor type, the naming is done by a different naming class. For example,
count()
operation is an aggregation operation. Therefore, it needs theMaterialized
class.For information about the naming classes for the stateful operations, see Conclusion
in the Kafka Streams documentation. The following example sets the name of
count()
operator tocount-store
using theMaterialized
class.var stream = paragraphStream .groupByKey() .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-store") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())) .toStream();
Create the internal topics
Kafka Streams prefixes application.id
to names of internal topics, where application.id
is user-defined. For example, application.id-internal_topic_name
. The internal topics are normal Kafka topics, and you can create the topics using the information available in Create an Apache Kafka topic or AdminClient
of the Kafka API.
Depending on your use case, you can use the Kafka Streams' default cleanup and retention policies, or customize their values. You define these in cleanup.policy
and retention.ms
.
The following example creates the topics with the AdminClient
API and sets the application.id
to msk-streams-processing
.
try (AdminClient client = AdminClient.create(configs.kafkaProps())) {
Collection<NewTopic> topics = new HashSet<>();
topics.add(new NewTopic("msk-streams-processing-count-store", 3, (short) 3));
client.createTopics(topics);
}
After the topics are created on the cluster, your Kafka Streams application can use the msk-streams-processing-count-store
topic for the count()
operation.
(Optional) Check the topic name
You can use the topography describer to describe the topology of your stream and view the names of the internal topics. The following example shows how to run the topology describer.
final StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
System.out.println(topology.describe());
The following output shows the topology of the stream for the preceding example.
Topology Description:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])
--> KSTREAM-AGGREGATE-0000000001
Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store])
--> KTABLE-TOSTREAM-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-AGGREGATE-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: output_topic)
<-- KTABLE-TOSTREAM-0000000002
For information about how to use the topology describer, see Naming Operators in a Kafka Streams DSL Application
Examples of naming operators
This section provides some examples of naming operators.
Example of naming operator for groupByKey()
groupByKey() -> groupByKey(Grouped.as("kafka-stream-groupby"))
Example of naming operator for normal count()
normal count() -> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("kafka-streams-window") // descriptive name for the store
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
Example of naming operator for windowed count()
windowed count() -> .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("kafka-streams-window") // descriptive name for the store
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
Example of naming operator for windowed suppressed()
windowed suppressed() ->
Suppressed<Windowed> suppressed = Suppressed
.untilWindowCloses(Suppressed.BufferConfig.unbounded())
.withName("kafka-suppressed");
.suppress(suppressed)