21 September 2021

How to Load Test Your Kafka Producers and Consumers using k6

Wai Foong Ng

Recently, k6 started supporting k6 extensions to extend k6 capabilities for other cases required by the community. The community has already built plenty of extensions. k6 extensions are written in Go, and many of them are reusing existing Go libraries.

This makes k6 to be a versatile tool to test different protocols and adapt to multiple cases. This post is the third part of my series of articles testing various systems using k6:

Let's look in this post how we test the popular Kafka project. Apache Kafka is a powerful event streaming platform that provides the following features:

  • Write and read streams of events
  • Store streams of events for as long as you want to
  • Process streams of events in parallel retrospectively

It works by having client applications write events to the Kafka server. We term this type of application as Producers. Client applications which read and process events from the Kafka server are called Consumers.

Kafka itself is capable of handling hundreds to millions of events per second seamlessly on simple setup. But what if you wanted to test and observe how your Kafka service behaves before going live?

The xk6-kafka extension provides some convenient functions for interacting with Kafka Producers and Consumers. It serves as a producer that can send a high volume of messages per second, allowing you to monitor the system under test (SUT) and test how the application will keep up with the load.

xk6-kafka

At the time of this writing, the xk6-kafka extension provides the following APIs:

FunctionDescription
consume(reader, limit)Consume messages from the Kafka server.
createTopic(address, topic)Create a new topic.
listTopics(address)Return a unique set of topics.
produce(writer, messages)Produce messages to the Kafka server.
reader(brokers, topic)Instantiate a new Reader instance.
writer(brokers, topic)Instantiate a new Writer instance.

Some of the APIs mentioned above do accept additional optional parameters meant for authentication and message compression. Refer to more examples for additional information.

Building k6 with the kafka extension

By default, k6 does not support testing Kafka. Building k6 with the xk6-kafka extension creates a k6 version with the capabilities to test Kafka producers and consumers.

Make sure you have the following installed and ready before you continue:

  • Go (>=1.7)
  • Git

Next, continue the installation by running the following command in your terminal to install the xk6 module:

go install go.k6.io/xk6/cmd/xk6@latest

Once the command finishes successfully, you can start making your own custom k6 binary for Kafka as follows:

xk6 build --with github.com/mostafa/xk6-kafka@latest

It will take some time for the process to create a new k6 binary in your working directory.

Troubleshooting: If you experienced issues installing xk6 extension, kindly head over to the Github repository to download the pre-compiled binaries instead.

Running Kafka

The recommended approach is to use docker as manual installation is quite complicated and prone to errors. You can pull the following image by lensesio from DockerHub. It contains the complete Kafka setup for development.

docker pull lensesio/fast-data-dev:latest

After that, run the following to start the docker in detached mode:

sudo docker run -d --rm --name lenseio -p 2181:2181 -p 3030:3030 \
-p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 \
-e ADV_HOST=127.0.0.1 lensesio/fast-data-dev
sudo docker logs -f -t lenseio

Visit http://localhost:3030 to get into the fast-data-dev environment.

k6 test

Import

Now, let’s create a new JavaScript file called test_script.js in the same directory as your k6 binary. Then, add the following import statement at the top of the file:

import { check } from 'k6';
import { writer, produce, reader, consume, createTopic } from 'k6/x/kafka';

Initialization

Continue by appending the following initialization code:

const bootstrapServers = ['localhost:9092'];
const kafkaTopic = 'xk6_kafka_json_topic';
const producer = writer(bootstrapServers, kafkaTopic);
const consumer = reader(bootstrapServers, kafkaTopic);

The code will initialize both the writer and reader instances based on the configuration specified. If you are using a different IP/host address and port for your Kafka server, kindly modify it accordingly.

Next, call the createTopic function to create a new topic. Rest assured that this function will do nothing if the topic already exists.

createTopic(bootstrapServers[0], kafkaTopic);

Let’s create a function which generates a random integer as a unique identifier for each message later on. Please be noted that this is optional and not a mandatory requirement to do load testing.

function getRandomInt(max = 1000) {
return Math.floor(Math.random() * max + 1);
}

Default function

As for the default function, define it as follows:

export default function () {
const messages = [
{
key: JSON.stringify({
correlationId: 'test-id-sql-' + getRandomInt(),
}),
value: JSON.stringify({
title: 'Load Testing SQL Databases with k6',
url: 'https://k6.io/blog/load-testing-sql-databases-with-k6/',
locale: 'en',
}),
},
{
key: JSON.stringify({
correlationId: 'test-id-redis-' + getRandomInt(),
}),
value: JSON.stringify({
title: 'Benchmarking Redis with k6',
url: 'https://k6.io/blog/benchmarking-redis-with-k6/',
locale: 'en',
}),
},
];
const error = produce(producer, messages);
check(error, {
'is sent': (err) => err == undefined,
});
}

The code block above works as follows:

  • Initialize a list of messages
  • Call produce function to publish the messages
  • Check if messages are successfully sent

Teardown

Once you are done with it, create a teardown function and close the connections:

export function teardown(data) {
producer.close();
consumer.close();
}

Run the test

Save the file and run the following command on your terminal:

./k6 run --vus 50 --duration 5s test_script.js

You should see the following output:

running (05.0s), 00/50 VUs, 15136 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs 5s
✓ is sent
█ teardown
checks.........................: 100.00% ✓ 151360
data_received..................: 0 B 0 B/s
data_sent......................: 0 B 0 B/s
iteration_duration.............: avg=16.49ms min=31.9µs med=13.52ms max=1.14s p(90)=28.55ms p(95)=36.46ms
iterations.....................: 15136 3017.4609/s
kafka.writer.dial.count........: 151 30.102841/s
kafka.writer.error.count.......: 0 0/s
kafka.writer.message.bytes.....: 5.2 MB 1.0 MB/s
kafka.writer.message.count.....: 30272 6034.9218/s
kafka.writer.rebalance.count...: 0 0/s
kafka.writer.write.count.......: 30272 6034.9218/s
vus............................: 5 min=5 max=50
vus_max........................: 50 min=50 max=50

Scale the load

You can easily scale the load by increasing the number of vus. For example, the following command uses 500 vus to load test for a minute:

./k6 run --vus 500 --duration 1m test_script.js

If you are new to k6, check out how to configure the load options in the script or run a stress test with k6.

Extend the test

The script above is all about producing messages to your Kafka server. In fact, you can easily modify the code into a test which produces and consumes messages.

Simply add the following code below the for loop code:

let result = consume(consumer, 10);
check(result, {
"10 messages returned": (msgs) => msgs.length == 10,
});

The code will read 10 messages each time. Simply modify the value to something higher if you wish to consume more messages.

The output is as follows when you run it with the same command:

running (05.0s), 00/50 VUs, 9778 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs 5s
✓ is sent
10 messages returned
█ teardown
checks.........................: 100.00% ✓ 195560
data_received..................: 0 B 0 B/s
data_sent......................: 0 B 0 B/s
iteration_duration.............: avg=25.53ms min=41.4µs med=18ms max=1.41s p(90)=37.73ms p(95)=52.37ms
iterations.....................: 9778 1946.80798/s
kafka.reader.dial.count........: 50 9.955042/s
kafka.reader.error.count.......: 0 0/s
kafka.reader.fetches.count.....: 101 20.109184/s
kafka.reader.message.bytes.....: 15 MB 2.9 MB/s
kafka.reader.message.count.....: 97830 19478.034846/s
kafka.reader.rebalance.count...: 0 0/s
kafka.reader.timeouts.count....: 46 9.158638/s
kafka.writer.dial.count........: 152 30.263327/s
kafka.writer.error.count.......: 0 0/s
kafka.writer.message.bytes.....: 3.4 MB 669 kB/s
kafka.writer.message.count.....: 19556 3893.615961/s
kafka.writer.rebalance.count...: 0 0/s
kafka.writer.write.count.......: 19556 3893.615961/s
vus............................: 50 min=50 max=50
vus_max........................: 50 min=50 max=50

Kafka metrics in k6

By default, k6 has its own built-in metrics which are collected automatically. Apart from that, you can create your own custom metrics. Custom metrics can be categorized into the following types:

  • Counter: A metric that cumulatively sums added values.
  • Gauge: A metric that stores the min, max and last values added to it.
  • Rate: A metric that tracks the percentage of added values that are non-zero.
  • Trend: A metric that allows for calculating statistics on the added values (min, max, average and percentiles).

Besides k6, k6 extensions can collect metrics and report them as part of the k6 results output. In this case, xk6-kafka collects individual stats for both reader and writer.

Reader

Let’s have a look at the metrics meant for the reader.

MetricsTypeDescription
kafka.reader.dial.countCounterTotal number of times the reader tries to connect to Kafka.
kafka.reader.error.countCounterTotal number of errors occurred when reading from Kafka.
kafka.reader.fetches.countCounterTotal number of times the reader fetches batches of messages from Kafka.
kafka.reader.message.bytesCounterTotal bytes consumed.
kafka.reader.message.countCounterTotal number of messages consumed.
kafka.reader.rebalance.countCounterTotal number of rebalances of a topic in a consumer group (deprecated) .
kafka.reader.timeouts.countCounterTotal number of timeouts occurred when reading from Kafka

Writer

As for the writer, the metrics are as follows:

MetricsTypeDescription
kafka.writer.dial.countCounterTotal number of times the writer tries to connect to Kafka.
kafka.writer.error.countCounterTotal number of errors occurred when writing to Kafka.
kafka.writer.message.bytesCounterTotal bytes produced.
kafka.writer.message.countCounterTotal number of messages produced.
kafka.writer.rebalance.countCounterTotal number of rebalances of a topic (deprecated).
kafka.writer.write.countCounterTotal number of times the writer writes batches of messages to Kafka.

There are more available kafka metrics, as you can find them here. However, the extension does not collect all metrics yet. You can follow this GitHub issue to track the progress of their additions.

More examples

Moreover, the xk6-kafka repository provides a few test scripts that work out-of-the-box for new users. At the time of this writing, it comes with the following tests:

Feel free to experiment with them and modify the code accordingly based on your own use cases. If you bump into issues, report them on GitHub.

Conclusion

In conclusion, load testing Apache Kafka is now a lot easier with k6. k6 provides the foundation to create and scale your load tests, and the xk6-kafka extension brings a convenient API to interact with a Kafka server.

If you wish to find out more on other available k6 extensions, simply head over to the bundle builder page. The page also allows you to generate the corresponding command for building your own custom k6 binary.

If you have any questions or are interested in building an extension, join the k6 community on Slack.

< Back to all posts