如何借助Kafka持久化存储K8S事件数据?

发布时间:2025-05-21 18:30:59 作者:益华网络 来源:undefined 浏览量(2) 点赞(3)
摘要:大家应该对 Kubernetes Events 并不陌生,特别是当你使用 kubectl describe 命令或 Event API 资源来了解集群中的故障时。 $kubectlgetevents15mWarning

大家应该对 Kubernetes Events 并不陌生,特别是当你使用 kubectl describe 命令或 Event API 资源来了解集群中的故障时。

  $ kubectl get events15m         Warning   FailedCreate                                                                                                      replicaset/ml-pipeline-visualizationserver-865c7865bc     Error creating: pods "ml-pipeline-visualizationserver-865c7865bc-" is forbidden: error looking up service account default/default-editor: serviceaccount "default-editor" not found

尽管这些信息十分有用,但它只是临时的,保留时间最长为30天。如果出于审计或是故障诊断等目的,你可能想要把这些信息保留得更久,比如保存在像 Kafka 这样更持久、高效的存储中。然后你可以借助其他工具(如 Argo Events)或自己的应用程序订阅 Kafka 主题来对某些事件做出响应。

 

构建K8s事件处理链路

我们将构建一整套 Kubernetes 事件处理链路,其主要构成为:

Eventrouter,开源的 Kubernetes event 处理器,它可以将所有集群事件整合汇总到某个 Kafka 主题中。 Strimzi Operator,在 Kubernetes 中轻松管理 Kafka broker。 自定义 Go 二进制文件以将事件分发到相应的 Kafka 主题中。

 

为什么要把事件分发到不同的主题中?比方说,在集群的每个命名空间中存在与特定客户相关的 Kubernetes 资产,那么在使用这些资产之前你当然希望将相关事件隔离开。

创建 Kafka broker 和主题

我选择使用 Strimzi 将 Kafka 部署到 Kubernetes 中。简而言之,它是用于创建和更新 Kafka broker 和主题的。你可以在官方文档中找到如何安装该 Operator 的详细说明。

 

首先,创建一个新的 Kafka 集群:

apiVersion: kafka.strimzi.io/v1beta1kind: Kafkametadata:   name: kube-eventsspec:   entityOperator:     topicOperator: {}    userOperator: {}  kafka:     config:       default.replication.factor: 3       log.message.format.version: "2.6"       offsets.topic.replication.factor: 3       transaction.state.log.min.isr: 2       transaction.state.log.replication.factor: 3     listeners:     - name: plain       port: 9092       tls: false       type: internal     - name: tls       port: 9093       tls: true       type: internal     replicas: 3     storage:       type: jbod       volumes:       - deleteClaim: false         id: 0         size: 10Gi         type: persistent-claim     version: 2.6.0   zookeeper:     replicas: 3     storage:       deleteClaim: false       size: 10Gi       type: persistent-claim

然后创建 Kafka 主题来接收我们的事件:

apiVersion: kafka.strimzi.io/v1beta1kind: KafkaTopicmetadata:   name: cluster-eventsspec:   config:     retention.ms: 7200000     segment.bytes: 1073741824   partitions: 1   replicas: 1

设置 EventRouter

在本教程中使用 kubectl apply 命令即可,我们需要编辑 router 的配置,以指明我们的 Kafka 端点和要使用的主题:

apiVersion: v1data:   config.json: |-     {      "sink": "kafka",      "kafkaBrokers": "kube-events-kafka-bootstrap.kube-events.svc.cluster.local:9092",      "kafkaTopic": "cluster-events"     }kind: ConfigMapmetadata:   name: eventrouter-cm

验证设置是否正常工作

我们的 cluster-events Kafka 的主题现在应该收到所有的事件。最简单的方法是在主题上运行一个 consumer 来检验是否如此。为了方便期间,我们使用我们的一个 Kafka broker pods,它已经有了所有必要的工具,你可以看到事件流:

kubectl -n kube-events exec kube-events-kafka-0 -- bin/kafka-console-consumer.sh \   --bootstrap-server kube-events-kafka-bootstrap:9092 \   --topic kube-events \   --from-beginning{"verb":"ADDED","event":{...}} {"verb":"ADDED","event":{...}} ...

编写 Golang 消费者

现在我们想将我们的 Kubernetes 事件依据其所在的命名空间分发到多个主题中。我们将编写一个 Golang 消费者和生产者来实现这一逻辑:

消费者部分在 cluster-events 主题上监听传入的集群事件 生产者部分写入与事件的命名空间相匹配的 Kafka 主题中

 

如果为Kafka配置了适当的选项(默认情况),就不需要特地创建新的主题,因为 Kafka 会默认为你创建主题。这是 Kafka 客户端 API 的一个非常酷的功能。

p, err := kafka.NewProducer(cfg.Endpoint)if err != nil {         sugar.Fatal("cannot create producer") }defer p.Close() c, err := kafka.NewConsumer(cfg.Endpoint, cfg.Topic)if err != nil {         sugar.Fatal("cannot create consumer") }defer c.Close() run := truesigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)go func() {         sig := <-sigs         sugar.Infof("signal %s received, terminating", sig)         run = false}()var wg sync.WaitGroupgo func() {         wg.Add(1)        for run {                 data, err := c.Read()                if err != nil {                         sugar.Errorf("read event error: %v", err)                         time.Sleep(5 * time.Second)                        continue                 }                if data == nil {                        continue                 }                 msg, err := event.CreateDestinationMessage(data)                if err != nil {                         sugar.Errorf("cannot create destination event: %v", err)                 }                 p.Write(msg.Topic, msg.Message)         }         sugar.Info("worker thread done")         wg.Done() }() wg.Wait()

当然还有更高性能的选择,这取决于预计的事件量和扇出(fanout)逻辑的复杂性。对于一个更强大的实现,使用 Spark Structured Streaming 的消费者将是一个很好的选择。

 

部署消费者

构建并将二进制文件推送到 Docker 镜像之后,我们将它封装为 Kubernetes deployment:

apiVersion: apps/v1kind: Deploymentmetadata:   labels:     app: events-fanout   name: events-fanoutspec:   replicas: 1   selector:     matchLabels:       app: events-fanout   template:     metadata:       labels:         app: events-fanout     spec:       containers:         - image: emmsys/events-fanout:latest           name: events-fanout           command: [ "./events-fanout"]          args:             - -logLevel=info           env:             - name: ENDPOINT               value: kube-events-kafka-bootstrap:9092             - name: TOPIC               value: cluster-events

检查目标主题是否创建

现在,新的主题已经创建完成:

kubectl -n kube-events get kafkatopics.kafka.strimzi.io -o name kafkatopic.kafka.strimzi.io/cluster-events kafkatopic.kafka.strimzi.io/kube-system kafkatopic.kafka.strimzi.io/default kafkatopic.kafka.strimzi.io/kafka kafkatopic.kafka.strimzi.io/kube-events

你会发现你的事件根据其命名空间整齐地存储在这些主题中。

 

总结

访问 Kubernetes 历史事件日志可以使你对 Kubernetes 系统的状态有了更好的了解,但这单靠 kubectl 比较难做到。更重要的是,它可以通过对事件做出反应来实现集群或应用运维自动化,并以此来构建可靠、反应灵敏的软件。

 

原文链接:

https://hackernoon.com/monitor-your-kubernetes-cluster-events-with-eventrouter-golang-and-kafka-wh2a35l0

二维码

扫一扫,关注我们

声明:本文由【益华网络】编辑上传发布,转载此文章须经作者同意,并请附上出处【益华网络】及本页链接。如内容、图片有任何版权问题,请联系我们进行处理。

感兴趣吗?

欢迎联系我们,我们愿意为您解答任何有关网站疑难问题!

您身边的【网站建设专家】

搜索千万次不如咨询1次

主营项目:网站建设,手机网站,响应式网站,SEO优化,小程序开发,公众号系统,软件开发等

立即咨询 15368564009
在线客服
嘿,我来帮您!