4 min read

Kafka 移除Zookeeper依赖实现

Kafka 移除Zookeeper依赖实现

背景

Apache Kafka Raft(KRaft)是共识协议,目的是消除Apache Kafka对ZooKeeper元数据管理的依赖性--简化了 Kafka 的架构。将元数据的集成到 Kafka 本身,而不是将其拆分到两个不同的系统:ZooKeeper 和 Kafka。KRaft 模式使用 Kafka 中新的仲裁控制器服务,该服务取代了以前的控制器,并使用了 Raft 共识协议的基于事件的变体。
Apache Kafka自3.3版始,使用Raft元数据协议来放弃ZooKeeper的附加开销。更多信息可以在Apache Kafka官方文档中查阅:https://kafka.apache.org/documentation/#kraft

KRaft优点

  1. 提高控制平面性能,使 Kafka 集群能够扩展到数百万个分区
  2. 提高了稳定性,简化了软件架构
  3. 允许 Kafka 为整个系统提供单一的安全模型
  4. 提供轻量级、单一流程的方式来开始使用 Kafka
  5. 降低故障转移时间,几乎瞬间

配置实现

  • Dockerfile
FROM openjdk:17-bullseye

ENV KAFKA_VERSION=3.4.0
ENV SCALA_VERSION=2.13
ENV APP_HOME=/opt/kafka
ENV PATH=${PATH}:${APP_HOME}/bin

ARG USER=kafka
ARG UID=1001
ARG GID=1001

LABEL name="kafka"
      version="${KAFKA_VERSION}" \
      MAINTAINER="Qiu Shu Hui<qiushuhui@oschina.cn>"

RUN set -xe \
         && groupadd --gid $GID $USER \
         && useradd --uid $UID --gid $GID --shell /bin/bash -d /home/$USER $USER \
         && echo "Asia/Shanghai" > /etc/timezone \
         && cp -rf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

RUN install -d -o $USER -g $USER /opt

COPY --chown=$USER:$USER ./entrypoint.sh /

USER $USER

RUN set -xe \
         && wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
         && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \
         && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
         && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${APP_HOME} \
         && rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
         && chmod +x /entrypoint.sh

EXPOSE 9092 9093

WORKDIR $APP_HOME

ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]

  • entrypoint.sh
#!/bin/bash

NODE_ID=${HOSTNAME##*-}

LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093"
ADVERTISED_LISTENERS="PLAINTEXT://kafka-$NODE_ID.$SERVICE:9092"
KAFKA_CONFIG_FILE="/opt/kafka/config/kraft/server.properties"

CONTROLLER_QUORUM_VOTERS=""
for i in $( seq 0 $REPLICAS)
do
    if [[ $i != $REPLICAS ]]
    then
        CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@${HOSTNAME%-*}-$i.$SERVICE:9093,"
    else
        CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1}
    fi
done

[[ ! -d $DATA_DIR/$NODE_ID ]] || mkdir -p $DATA_DIR/$NODE_ID

echo $CLUSTER_ID > $DATA_DIR/cluster_id

sed -e "s+^node.id=.*+node.id=$NODE_ID+" \
    -e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \
    -e "s+^listeners=.*+listeners=$LISTENERS+" \
    -e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \
    -e "s+^log.dirs=.*+log.dirs=$DATA_DIR/$NODE_ID+" \
    /opt/kafka/config/kraft/server.properties > /opt/kafka/server.properties.updated \
    && mv /opt/kafka/server.properties.updated $KAFKA_CONFIG_FILE

kafka-storage.sh format -t $CLUSTER_ID -c $KAFKA_CONFIG_FILE

kafka-server-start.sh $KAFKA_CONFIG_FILE

构建 docker 镜像

# docker build -t registry.cn-shenzhen.aliyuncs.com/shuhui/kafka-kraft:3.4.0 .

镜像推送

# docker push registry.cn-shenzhen.aliyuncs.com/shuhui/kafka-kraft:3.4.0

编写k8s 编排YAML

cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  labels:
    app: kafka-app
spec:
  type: ClusterIP
  clusterIP: None
  ports:
    - name: '9092'
      port: 9092
      protocol: TCP
      targetPort: 9092
    - name: '9093'
      port: 9093
      protocol: TCP
      targetPort: 9093
  selector:
    app: kafka-app

---

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: giteekafka
  labels:
    app: kafka-app
spec:
  serviceName: "kafka-svc"
  revisionHistoryLimit: 2
  replicas: 3
  selector:
    matchLabels:
      app: kafka-app
  template:
    metadata:
      labels:
        app: kafka-app
    spec:
      containers:
        - name: kafka-container
          imagePullPolicy: "IfNotPresent"
          image:  registry.cn-shenzhen.aliyuncs.com/shuhui/kafka-kraft:3.4.0
          resources:
            requests:
              cpu: "256m"
              memory: "1Gi"
            limits:
              cpu: "2000m"
              memory: "4Gi"
          securityContext:
            allowPrivilegeEscalation: false
            readOnlyRootFilesystem: false
            runAsNonRoot: true
            runAsUser: 1001
          ports:
            - name: kafka-client
              containerPort: 9092
            - name: kafka-internal
              containerPort: 9093
          livenessProbe:
            failureThreshold: 5
            initialDelaySeconds: 10
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 5
            tcpSocket:
              port: kafka-client
          readinessProbe:
            failureThreshold: 3
            initialDelaySeconds: 5
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 5
            tcpSocket:
              port: kafka-client
          env:
            - name: REPLICAS
              value: '3'
            - name: SERVICE
              value: 'kafka-svc'
            - name: DATA_DIR
              value: '/data/kafka'
            - name: CLUSTER_ID
              value: 'Y5SgRE0zp9AusfyPBDNyON'
            - name: DEFAULT_REPLICATION_FACTOR
              value: '3'
            - name: DEFAULT_MIN_INSYNC_REPLICAS
              value: '2'
            - name: NODE_ID
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: KAFKA_HEAP_OPTS
              value: '-Xmx1024M -Xms1024M'
            - name: HOST_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
          volumeMounts:
            - name: data
              mountPath: /data/kafka
  volumeClaimTemplates:
    - metadata:
        name: data
      spec:
        accessModes:
          - "ReadWriteOnce"
        resources:
          requests:
            storage: "5Gi"
EOF


查看状态

$ kubectl get pod
NAME      READY   STATUS    RESTARTS   AGE
kafka-0   1/1     Running   0          69s
kafka-1   1/1     Running   0          48s
kafka-2   1/1     Running   0          28s



测试


创建Topic

$ kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic qiush


查看已创建的Topic

$ kafka-topics.sh --list --bootstrap-server localhost:9092
qiush


信息发送

$ kafka-console-producer.sh --broker-list localhost:9092 --topic qiush
>Hello world.
>Welcome to guangzhou.


信息消费

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic qiush --from-beginning
Hello world.
Welcome to guangzhou.



参考引用

  • [1] https://developer.confluent.io/learn/kraft/
  • [2] https://github.com/moeenz/docker-kafka-kraft/blob/master/docker-entrypoint.sh
  • [3] https://msazure.club/deploy-kafka-without-zookeeper/
  • [4] https://github.com/stefanjay/kafka-kraft-on-k8s