Apache Kafka Sink
This page shows how to install and configure an Apache KafkaSink.
Prerequisites
You must have access to a Kubernetes cluster with Knative Eventing installed.
Installation
Install the Kafka controller:
kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.24.0/eventing-kafka-controller.yaml
Install the KafkaSink data plane:
kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.24.0/eventing-kafka-sink.yaml
Verify that
kafka-controller
andkafka-sink-receiver
Deployments are running:kubectl get deployments.apps -n knative-eventing
Example output:
NAME READY UP-TO-DATE AVAILABLE AGE
eventing-controller 1/1 1 1 10s
eventing-webhook 1/1 1 1 9s
kafka-controller 1/1 1 1 3s
kafka-sink-receiver 1/1 1 1 5s
KafkaSink example
A KafkaSink object looks similar to the following:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
Security
Knative supports the following Apache Kafka security features:
- Authentication using
SASL
without encryption - Authentication using
SASL
and encryption usingSSL
- Authentication and encryption using
SSL
- Encryption using
SSL
without client authentication
Enabling security features
To enable security features, in the KafkaSink spec, you can reference a secret:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
auth:
secret:
ref:
name: my_secret
Note
The secret my_secret
must exist in the same namespace of the KafkaSink. Certificates and keys must be in PEM
format._
Authentication using SASL
Knative supports the following SASL mechanisms:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
To use a specific SASL mechanism replace <sasl_mechanism>
with the mechanism of your choice.
Authentication using SASL without encryption
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
Authentication using SASL and encryption using SSL
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=caroot.pem \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
Encryption using SSL without client authentication
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-literal=user.skip=true
Authentication and encryption using SSL
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
Note
The ca.crt
can be omitted to enable fallback and use the system’s root CA set.
Kafka Producer configurations
A Kafka Producer is the component responsible for sending events to the Apache Kafka cluster. You can change the configuration for Kafka Producers in your cluster by modifying the config-kafka-sink-data-plane
ConfigMap in the knative-eventing
namespace.
Documentation for the settings available in this ConfigMap is available on the Apache Kafka website, in particular, Producer configurations.
Enable debug logging for data plane components
To enable debug logging for data plane components change the logging level to DEBUG
in the kafka-config-logging
ConfigMap.
Apply the
kafka-config-logging
ConfigMap by running the command:kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config-logging
namespace: knative-eventing
data:
config.xml: |
<configuration>
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="DEBUG">
<appender-ref ref="jsonConsoleAppender"/>
</root>
</configuration>
EOF
Restart the
kafka-sink-receiver
:kubectl rollout restart deployment -n knative-eventing kafka-sink-receiver