Kafka 移除Zookeeper依赖实现
背景
Apache Kafka Raft(KRaft)是共识协议,目的是消除Apache Kafka对ZooKeeper元数据管理的依赖性--简化了 Kafka 的架构。将元数据的集成到 Kafka 本身,而不是将其拆分到两个不同的系统:ZooKeeper 和 Kafka。KRaft 模式使用 Kafka 中新的仲裁控制器服务,该服务取代了以前的控制器,并使用了 Raft 共识协议的基于事件的变体。
Apache Kafka自3.3版始,使用Raft元数据协议来放弃ZooKeeper的附加开销。更多信息可以在Apache Kafka官方文档中查阅:https://kafka.apache.org/documentation/#kraft
KRaft优点
- 提高控制平面性能,使 Kafka 集群能够扩展到数百万个分区
- 提高了稳定性,简化了软件架构
- 允许 Kafka 为整个系统提供单一的安全模型
- 提供轻量级、单一流程的方式来开始使用 Kafka
- 降低故障转移时间,几乎瞬间
配置实现
- Dockerfile
FROM openjdk:17-bullseye
ENV KAFKA_VERSION=3.4.0
ENV SCALA_VERSION=2.13
ENV APP_HOME=/opt/kafka
ENV PATH=${PATH}:${APP_HOME}/bin
ARG USER=kafka
ARG UID=1001
ARG GID=1001
LABEL name="kafka"
version="${KAFKA_VERSION}" \
MAINTAINER="Qiu Shu Hui<qiushuhui@oschina.cn>"
RUN set -xe \
&& groupadd --gid $GID $USER \
&& useradd --uid $UID --gid $GID --shell /bin/bash -d /home/$USER $USER \
&& echo "Asia/Shanghai" > /etc/timezone \
&& cp -rf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
RUN install -d -o $USER -g $USER /opt
COPY --chown=$USER:$USER ./entrypoint.sh /
USER $USER
RUN set -xe \
&& wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
&& tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \
&& rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
&& ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${APP_HOME} \
&& rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
&& chmod +x /entrypoint.sh
EXPOSE 9092 9093
WORKDIR $APP_HOME
ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
- entrypoint.sh
#!/bin/bash
NODE_ID=${HOSTNAME##*-}
LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093"
ADVERTISED_LISTENERS="PLAINTEXT://kafka-$NODE_ID.$SERVICE:9092"
KAFKA_CONFIG_FILE="/opt/kafka/config/kraft/server.properties"
CONTROLLER_QUORUM_VOTERS=""
for i in $( seq 0 $REPLICAS)
do
if [[ $i != $REPLICAS ]]
then
CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@${HOSTNAME%-*}-$i.$SERVICE:9093,"
else
CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1}
fi
done
[[ ! -d $DATA_DIR/$NODE_ID ]] || mkdir -p $DATA_DIR/$NODE_ID
echo $CLUSTER_ID > $DATA_DIR/cluster_id
sed -e "s+^node.id=.*+node.id=$NODE_ID+" \
-e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \
-e "s+^listeners=.*+listeners=$LISTENERS+" \
-e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \
-e "s+^log.dirs=.*+log.dirs=$DATA_DIR/$NODE_ID+" \
/opt/kafka/config/kraft/server.properties > /opt/kafka/server.properties.updated \
&& mv /opt/kafka/server.properties.updated $KAFKA_CONFIG_FILE
kafka-storage.sh format -t $CLUSTER_ID -c $KAFKA_CONFIG_FILE
kafka-server-start.sh $KAFKA_CONFIG_FILE
构建 docker 镜像
# docker build -t registry.cn-shenzhen.aliyuncs.com/shuhui/kafka-kraft:3.4.0 .
镜像推送
# docker push registry.cn-shenzhen.aliyuncs.com/shuhui/kafka-kraft:3.4.0
编写k8s 编排YAML
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
labels:
app: kafka-app
spec:
type: ClusterIP
clusterIP: None
ports:
- name: '9092'
port: 9092
protocol: TCP
targetPort: 9092
- name: '9093'
port: 9093
protocol: TCP
targetPort: 9093
selector:
app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: giteekafka
labels:
app: kafka-app
spec:
serviceName: "kafka-svc"
revisionHistoryLimit: 2
replicas: 3
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
containers:
- name: kafka-container
imagePullPolicy: "IfNotPresent"
image: registry.cn-shenzhen.aliyuncs.com/shuhui/kafka-kraft:3.4.0
resources:
requests:
cpu: "256m"
memory: "1Gi"
limits:
cpu: "2000m"
memory: "4Gi"
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: false
runAsNonRoot: true
runAsUser: 1001
ports:
- name: kafka-client
containerPort: 9092
- name: kafka-internal
containerPort: 9093
livenessProbe:
failureThreshold: 5
initialDelaySeconds: 10
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 5
tcpSocket:
port: kafka-client
readinessProbe:
failureThreshold: 3
initialDelaySeconds: 5
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 5
tcpSocket:
port: kafka-client
env:
- name: REPLICAS
value: '3'
- name: SERVICE
value: 'kafka-svc'
- name: DATA_DIR
value: '/data/kafka'
- name: CLUSTER_ID
value: 'Y5SgRE0zp9AusfyPBDNyON'
- name: DEFAULT_REPLICATION_FACTOR
value: '3'
- name: DEFAULT_MIN_INSYNC_REPLICAS
value: '2'
- name: NODE_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_HEAP_OPTS
value: '-Xmx1024M -Xms1024M'
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: data
mountPath: /data/kafka
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: "5Gi"
EOF
查看状态
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 69s
kafka-1 1/1 Running 0 48s
kafka-2 1/1 Running 0 28s
测试
创建Topic
$ kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic qiush
查看已创建的Topic
$ kafka-topics.sh --list --bootstrap-server localhost:9092
qiush
信息发送
$ kafka-console-producer.sh --broker-list localhost:9092 --topic qiush
>Hello world.
>Welcome to guangzhou.
信息消费
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic qiush --from-beginning
Hello world.
Welcome to guangzhou.
参考引用
- [1] https://developer.confluent.io/learn/kraft/
- [2] https://github.com/moeenz/docker-kafka-kraft/blob/master/docker-entrypoint.sh
- [3] https://msazure.club/deploy-kafka-without-zookeeper/
- [4] https://github.com/stefanjay/kafka-kraft-on-k8s