kratos接入kafka
 
                    今天开始准备制作聊天功能,希望使用kafka进行消息的读写操作,保障消息的可靠性,以及对mysql的持久化要求
Kafka 基础概念
Producer 和 Consumer
对于kafka来说,有两种基本类型: Producer 和 Consumer
Producer 简单来说就是用来生产消息或者源信息的,而Consumer 则负责消费消息,使用消息
Topic 和 Partition
- Topic - 在kafka中,消息根据Topic进行分类,每个Topic对应一个消息队列。 - 从大的来讲,我们可以将每个服务划分为一个Topic,数据按照服务进行分类处理。 - 从小的来看,我们也能细化每个服务的增删改查操作,每个操作维护一个Topic,进行更细致的划分 
- Partition - Partition是为了解决Topic内数据传输方式不同,导致单个Topic出现吞吐量不足的问题 - 他将单个Topic分化为多个块,对内部以不同方式传输的数据进行分块处理,例如将发送消息与删除消息分为两块进行,提高了未分块时串行输出的效率 
Broker 和 Cluster
一个kafka服务器称为一个Broker,多个Broker组成一个Cluster集群
其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责
Docker-Compose 安装 Kafka
这里我们直接使用docker-compose安装kafka,简便快捷
当然,kafka依赖于zookeeper,虽然已经有版本不需要依赖,但我还是使用一般模式,目前仅使用单机,也就不再提及集群
- 配置docker-compose文件 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24- version: '3' 
 services:
 zookeeper:
 image: confluentinc/cp-zookeeper:6.2.0
 container_name: zookeeper
 ports:
 - "2181:2181"
 environment:
 ZOOKEEPER_CLIENT_PORT: 2181
 volumes:
 - ../../../data/zookeeper/data:/data # 挂载到项目data目录
 kafka:
 image: confluentinc/cp-kafka:6.2.0
 container_name: kafka
 depends_on:
 - zookeeper
 ports:
 - "9092:9092"
 environment:
 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
 KAFKA_ZOOKEEPER_CONNECT: zookeeper:${ZOOKEEPER_PORT}
 volumes:
 - ../../../data/kafka/data:/data # 挂载到项目data
- 使用命令安装(Linux) - 1 
 2- # 下载docker-compose文件内依赖容器,并后台运行 
 sudo docker-compose up -d
使用Kafka-go在Kratos中操作Kafka
因为我们的服务框架为Kratos,所以就在Kratos中进行操作‘
对于Kafka-go,是我个人喜欢使用的一个操作kafka的三方
在聊天服务配置文件中添加kafka配置
在聊天服务中,我使用kafka-go的Writer,Reader Api进行写入与读取
易于使用:
Writer和Reader提供了简单的接口和方法,使得发送和接收消息变得非常直观和易于理解。它们封装了 Kafka 的复杂性,提供了一种更简单的方式来与 Kafka 进行交互。
高级功能:
kafka-go库的Writer和Reader支持许多高级功能,例如批量写入消息、异步写入、读取消息的偏移量控制等。这些功能使得处理和管理消息变得更加灵活和高效。
性能优化:
kafka-go库的Writer和Reader实现了一些性能优化策略,例如批量发送和异步处理,以提高消息的吞吐量和性能。它们经过了优化和测试,可以在高负载的情况下提供良好的性能表现。
我们看看官方给我们的实例
- Reader - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22- // make a new reader that consumes from topic-A, 
 // partition 0, at offset 42
 r := kafka.NewReader(kafka.ReaderConfig{
 Brokers: []string{"localhost:9092"},
 Topic: "topic-A",
 Partition: 0,
 MaxBytes: 10e6, // 10MB
 })
 r.SetOffset(42)
 for {
 m, err := r.ReadMessage(context.Background())
 if err != nil {
 break
 }
 fmt.Printf("message at offset %d: %s = %s\n", m.Offset,
 string(m.Key), string(m.Value))
 }
 if err := r.Close(); err != nil {
 log.Fatal("failed to close reader:", err)
 }
- Writer - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29- // make a writer that produces to topic-A, using the 
 // least-bytes distribution
 w := &kafka.Writer{
 Addr: kafka.TCP("localhost:9092"),
 Topic: "topic-A",
 Balancer: &kafka.LeastBytes{},
 }
 err := w.WriteMessages(context.Background(),
 kafka.Message{
 Key: []byte("Key-A"),
 Value: []byte("Hello World!"),
 },
 kafka.Message{
 Key: []byte("Key-B"),
 Value: []byte("One!"),
 },
 kafka.Message{
 Key: []byte("Key-C"),
 Value: []byte("Two!"),
 },
 )
 if err != nil {
 log.Fatal("failed to write messages:", err)
 }
 if err := w.Close(); err != nil {
 log.Fatal("failed to close writer:", err)
 }- 我们看到这两个配置使用的方法并不相同(NewReader和Writer{}),因为官方把NewWriter方法废弃了,由于某些安全原因 
在这个两个连接实例中,我们不难得出我们的配置必备的是哪些东西
localhost:9092 broker地址
topic 分类名
partition 分块号
我们可以添加读取与写入超时时间,提高连接使用效率
configs/config.yaml
| 1 | kafka: | 
conf/conf.proto
| 1 | message Bootstrap { | 
设置Kafka连接
因为是在Kratos中使用,我们将Kafka连接建立写在服务data层内,并合并至NewData中,为data层提供操作
我们首先整合一下Writer和Reader类型,创建一个新的Conn结构体
| 1 | type KafkaConn struct { | 
添加一个KafkaConn的New方法
| 1 | func NewKafkaConn(c *conf.Data) *KafkaConn { | 
kratos集成了wire依赖注入工具,直接生成代码确实好用
在服务入口文件中使用
wire完成操作
接下来,你就可以在外部文件中引入KafkaConn进行生产与消费
wait… 还有一小步,初始化开启一个goroutine进行消费者循环
初始化消费者循环读取及生产者
- 消费者 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21- // InitStoreMessageQueue 初始化聊天记录存储队列 
 func (r *messageRepo) InitStoreMessageQueue() {
 ctx := context.Background()
 for {
 msg, err := r.data.kfk.reader.ReadMessage(ctx)
 if err != nil {
 r.log.Errorf("read message error, err: %v", err)
 }
 value := msg.Value
 var mg *Message
 err = json.Unmarshal(value, mg)
 if err != nil {
 r.log.Errorf("json unmarshal error, err: %v", err)
 }
 err = r.InsertMessage(ctx, mg.FromUserId, mg.ToUserId, mg.Content)
 if err != nil {
 r.log.Errorf("insert message error, err: %v", err)
 }
 r.log.Infof("message: UserId-%v to UserId-%v: %v ", mg.FromUserId, mg.ToUserId, mg.Content)
 }
 }- 在biz层使用接口,并创建 - 1 
 2
 3
 4
 5
 6
 7
 8- func NewMessageUsecase( 
 repo MessageRepo, logger log.Logger) *MessageUsecase {
 go repo.InitStoreMessageQueue()
 return &MessageUsecase{
 repo: repo,
 log: log.NewHelper(log.With(logger)),
 }
 }
- 生产者 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20- // PublishMessage 生产者发送消息 
 func (r *messageRepo) PublishMessage(ctx context.Context, userId, toUSerId uint32, content string) error {
 mg := &Message{
 FromUserId: userId,
 ToUserId: toUSerId,
 Content: content,
 CreatedAt: time.Now().UnixMilli(),
 }
 byteValue, err := json.Marshal(mg)
 if err != nil {
 return fmt.Errorf("json marshal error, err: %w", err)
 }
 err = r.data.kfk.writer.WriteMessages(ctx, kafka.Message{
 Value: byteValue,
 })
 if err != nil {
 return fmt.Errorf("write message error, err: %w", err)
 }
 return nil
 }
到此为止,我们就建立了一个可以持续接收消息,并存储到数据库的kafka消息队列啦
- 标题: kratos接入kafka
- 作者: IntYou
- 创建于: 2023-08-22 08:33:21
- 更新于: 2023-08-22 16:04:17
- 链接: https://intyou.netlify.app/2023/08/22/kratos接入kafka/
- 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。