Handling Kafka Events with Knative

Dominik Vrbanic

DATA ENGINEER

Previous blog post fom Knative series:
Knative Overview

Introduction

Knative can be used to build serverless applications that respond to events in real-time. It provides a platform for running stateless workloads that can be scaled up or down based on demand. Knative enables developers to focus on writing code without worrying about the underlying infrastructure.

Tutorial

This tutorial demonstrates how to use Knative to trigger a user-defined function written in Go for processing Kafka records, in order to showcase the autoscaling capabilities of Knative. The tutorial is designed to be as simple as possible, using only the Knative Service and KafkaSource resources. Service is used to deploy the user-defined function as a Knative Service and automatically scale it up and down based on demand. KafkaSource is used to trigger the Knative Service every time a new record is produced to the Kafka topic.

Prerequisites

To follow this tutorial, you will need the following prerequisites:

Step 1: Deploy Apache Kafka Cluster and create a topic

The Strimzi Kubernetes operator can be used to deploy the Apache Kafka Cluster in your Kubernetes cluster. For deploying a Kafka cluster and creating a topic with 6 partitions, which is used in this demo, you can follow the steps at . If carried out exactly as written, this will result in your Kafka topic having 3 partitions. To create a topic with 6 partitions, simply set the spec.partitions inside your KafkaTopic YAML file to 6. Once completed, you will have a Kafka cluster named my-cluster on your AKS with a topic named my-topic, consisting of 6 partitions.

Step 2: Install the KafkaSource controller

The KafkaSource controller reads the messages stored in the existing Apache Kafka topics and sends those messages as CloudEvents through HTTP to its configured sink. Simultaneously, it also preserves the order of the messages stored in the topic partitions. It does this by waiting for a successful response from the sink, before delivering the next message in the same partition.

Install the KafkaSource controller by entering the following command (check the latest versions on ):

kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.9.3/eventing-kafka-controller.yaml

Then, install the KafkaSource data plane by entering the following command:

kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.9.3/eventing-kafka-source.yaml

Step 3: Create a Knative Service

Now that an Apache Kafka cluster with a topic has been deployed, you can create a Knative Service to manage the scaling and deployment of our function. Here’s the example file (event-display.yaml):

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-display
  namespace: default
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/target: "5"
        autoscaling.knative.dev/metric: "rps"
    spec:
      containers:
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display

This YAML file creates a Knative Service named event-display that runs the container image: gcr.io/knative-releases/github.com/knative/eventing-sources/cmd/event_display. This image corresponds to a Go function, which simply displays the received events on the console.

The provided Go code sets up an HTTP server using the https://github.com/cloudevents/sdk-go/v2 library to receive and handle CloudEvents.

Knative does not require the use of the CloudEvents SDK for handling Kafka events. You can also use a plain HTTP handler to handle Kafka events. When working directly with HTTP, keep in mind that the CloudEvents metadata is in the headers, and the actual message data is in the body (the Binary content mode is used). The HTTP endpoint servers need to be built to accept messages at the root path. However, the CloudEvents SDK simplifies the handling of events by providing a standard way to encode and decode event data in a format that can be easily consumed by different event sources and event sinks.

Save this file to your working directory, and run the following command to deploy the service:

kubectl apply -f event-display.yaml

This will create a new Knative service in your cluster.

Autoscaling configurations

Inside the Service YAML file, you can change some autoscaling options by configuring spec.template.metadata.annotations.

Knative Serving supports the implementation of Knative Pod Autoscaler (KPA) and Kubernetes’ Horizontal Pod Autoscaler (HPA). The type of Autoscaler implementation (KPA or HPA) can be configured by using the class annotation. The default Autoscaler type is "kpa.autoscaling.knative.dev".

The main difference between these two types is that KPA supports scale-to-zero functionality, and HPA doesn’t. On the other hand, KPA doesn’t support CPU-based autoscaling, while HPA does

If you want to use Kubernetes Horizontal Pod Autoscaler (HPA), you must install it after you install Knative Serving.

The metric configuration defines which metric type is watched by the Autoscaler. The default KPA Autoscaler supports concurrency and rps (requests-per-second) metrics:

  • concurrency – measures the number of active requests being processed simultaneously.
  • rps – measures the rate at which requests are being made to a service.

More about metrics can be found at .

Target configuration provides the Autoscaler with a value that it will try to maintain for the particular metric, in a specific Revision. In the example above, the rps metric is used with the target set to 5.

You can also enable or disable scale to zero functionality, how long the last replica will be kept after traffic ends and the minimum amount of time that the last pod will remain active after the Autoscaler decides to scale pods to zero.

Scale to zero can only be enabled if you are using the KnativePodAutoscaler (KPA) and can only be configured globally.

Upper and lower bounds can also be configured to control the autoscaling behavior. The initial scale that a Revision is scaled to immediately after creation can also be specified. This can be used as a default configuration for all Revisions, or for a specific Revision using an annotation.

More in-detail explanation of this can be found on .

There are also some additional autoscaling configurations for KPA, such as scale rates. One of the configurations you can change is max-scale-down-rate, which defaults to 2.0. This setting determines the maximum ratio of existing to desired pods. For example, with a value of 2.0, the Revision can only scale from N to N/2 pods in one evaluation cycle.

Step 4: Create a KafkaSource

The next step is to create a KafkaSource which acts as the trigger for the Knative Service. Here’s an example file (event-source.yaml):

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
  consumerGroup: <my-consumer-group>
  bootstrapServers:
    - <kafka-bootstrap-server:9092>  # e.g. my-cluster-kafka-bootstrap.kafka-cluster:9092
  topics:
    - my-topic
  consumers: 6
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display

This YAML file defines a KafkaSource called kafka-source that consumes records from the chosen Kafka topic and sends them to the Knative Service called event-display. The best practice is to configure the KafkaSource resource to have as many consumers as your topic has partitions.

The current implementation of KafkaSource does not provide a way to disable message ordering or send batches of messages for faster processing.

Save this file to your working directory and run the following command to deploy the service:

kubectl apply -f event-source.yaml

Step 5: Test the Knative service

To test the Knative service, send some records to the topic you specified in your Kafka cluster. Running the following command will prompt you to produce messages:

kubectl -n kafka-cluster run kafka-producer -i --image=quay.io/strimzi/kafka:0.33.2-kafka-3.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic

You can monitor the logs of the Knative service to see the output of the user-defined function, which is in this case the message you have just published. You can verify that the service received the message by opening another terminal and executing the following command:

kubectl logs -f --selector='serving.knative.dev/service=event-display' -c user-container

This will show you the logs of all the pods belonging to the event-display service.

event-display service logs after initially testing the Knative Service

Step 6: Scale the Knative service

To test the autoscaling, you can send a large number of messages to the Kafka topic and monitor the number of replicas of the Knative Service. The Knative Serving component will automatically scale up or down the number of replicas based on the incoming traffic. This can be tested with a simple command which creates some random data and sends it to our topic:

base64 /dev/urandom | head -c 10000 | kubectl -n kafka-cluster run kafka-producer -i --image=quay.io/strimzi/kafka:0.33.2-kafka-3.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic

You can view these logs again with the command from the previous step.

event-display service logs after scaling the Knative Service

You can use the following command to continuously monitor the number of available replicas of the application:

kubectl get deploy -w

This will display the current state of all deployments in the cluster, including the event-display deployment, as well as the number of available replicas, desired replicas and the current status of each replica set.

Note that the deployment name may include a suffix number, such as event-display-00001-deployment, event-display-00002-deployment, etc. These suffix numbers represent different replica sets and are created when you update the deployment. Each replica set corresponds to a specific version of the deployment, and the suffix number is used to differentiate them from each other. The latest version of the event-display deployment should be indicated by the highest suffix number, which in this case is event-display-00004-deployment.

The autoscaler is designed to automatically adjust the number of replicas of a service based on the current load and the configured scaling policies. In the specific case of the event-display deployment, the autoscaler was able to scale up to a maximum of 13 replicas based on the load generated by the Kafka source. However, as the load decreased, the autoscaler began to scale down the number of replicas gradually, according to the configured scaling policies. In this case, the max-scale-down-rate was set to 2 by default, which means that the autoscaler was allowed to decrease the number of replicas by a maximum factor of 2 in each evaluation cycle. As a result, the number of replicas gradually decreased from 13 to 6, and then to 3 and 1, over the course of several evaluation cycles.

Replica fluctuation during demo

If you don’t set the upper bound using a max-scale parameter inside the Service YAML file, there is a chance that the number of replicas of your service will scale up to a number greater than the number of partitions of the KafkaSource topic, as in this case.

In this demo, we have configured kafka-source to have 6 consumers because my-topic has 6 partitions. In the screenshot above, we can see that the number of replicas of the event-display service scales up to 13, even though there is no increase in processing speed by adding the extra 7 pods, due to the fact that each message has to wait for a response before the next one can be sent (because the ordering is preserved at the partition level).

This happens because the entirety of the event-display service is abstracted to the KafkaSource, so the service can’t know that the burst traffic is only limited to 6 parallel requests at a time. On the other hand, the service determines that 13 replicas might be a good target based on the incoming traffic and KPA metrics we configured.

To avoid unnecessary cost of creating more service pods than there are partitions, the person creating the service needs to be aware of the number of partitions and consumers being used in order to properly configure the service YAML. In such cases, the max-scale parameter should be set to the number of partitions. For example, if the KafkaSource is configured to have 6 consumers because the topic has 6 partitions, the max-scale parameter should be set to 6.

If the incoming traffic is large enough, this will result in 6 pods being created. These pods are placed behind the load balancer, and they are not directly connected to specific partitions. That’s why each of the pods will receive messages from all partitions of the KafkaSource topic. Messages will be distributed evenly among the pods. In our case, the first pod will have approximately 1/6 of all the messages in the topic and those messages will come from all 6 partitions.

Architecture using KafkaSource and Knative Service

Below, we can see a simple visualization of how the KafkaSource and the Knative Service are connected, and how the KafkaSource references the Service inside a YAML file. The KafkaSource resource is much more complex, and that’s why it is only illustrated here as a black box. The following blog of this series will explain its inner workings and how it sends messages to the Knative Service.

KafkaSource and Knative Service visualization

Additionally, you can see some specific details about your deployment. The last part of the output you get with describe command are events with their messages indicating scaling up or scaling down:

kubectl describe deploy event-display-00004-deployment

Log details displaying scaling events (up and down) during deployment

Conclusion

In conclusion, this tutorial showcased the power of Knative in orchestrating serverless applications, leveraging its event-driven architecture to enable seamless scaling and efficient handling of Kafka events. By demonstrating the simple configuration of Knative Services and KafkaSources, it highlights the potential for dynamic scaling based on incoming traffic. Moreover, it underlines the importance of careful configuration, especially regarding autoscaling policies and their alignment with resource constraints. This short guide serves as a starting point for deeper explorations into Knative’s capabilities and its integration within event-driven architectures. For further insights into KafkaSource-to-Service communication within Knative, stay tuned for the next blog post in this series.

References