kratos接入kafka

IntYou (^_^)

今天开始准备制作聊天功能,希望使用kafka进行消息的读写操作,保障消息的可靠性,以及对mysql的持久化要求

Kafka 基础概念

Producer 和 Consumer

对于kafka来说,有两种基本类型: Producer 和 Consumer

Producer 简单来说就是用来生产消息或者源信息的,而Consumer 则负责消费消息,使用消息

Topic 和 Partition

  1. Topic

    在kafka中,消息根据Topic进行分类,每个Topic对应一个消息队列。

    从大的来讲,我们可以将每个服务划分为一个Topic,数据按照服务进行分类处理。

    从小的来看,我们也能细化每个服务的增删改查操作,每个操作维护一个Topic,进行更细致的划分

  2. Partition

    Partition是为了解决Topic内数据传输方式不同,导致单个Topic出现吞吐量不足的问题

    他将单个Topic分化为多个块,对内部以不同方式传输的数据进行分块处理,例如将发送消息与删除消息分为两块进行,提高了未分块时串行输出的效率

Broker 和 Cluster

一个kafka服务器称为一个Broker,多个Broker组成一个Cluster集群

其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责

Docker-Compose 安装 Kafka

这里我们直接使用docker-compose安装kafka,简便快捷

当然,kafka依赖于zookeeper,虽然已经有版本不需要依赖,但我还是使用一般模式,目前仅使用单机,也就不再提及集群

  1. 配置docker-compose文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    version: '3'
    services:
    zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    container_name: zookeeper
    ports:
    - "2181:2181"
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    volumes:
    - ../../../data/zookeeper/data:/data # 挂载到项目data目录

    kafka:
    image: confluentinc/cp-kafka:6.2.0
    container_name: kafka
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:${ZOOKEEPER_PORT}
    volumes:
    - ../../../data/kafka/data:/data # 挂载到项目data
  2. 使用命令安装(Linux)

    1
    2
    # 下载docker-compose文件内依赖容器,并后台运行
    sudo docker-compose up -d

使用Kafka-go在Kratos中操作Kafka

因为我们的服务框架为Kratos,所以就在Kratos中进行操作‘

对于Kafka-go,是我个人喜欢使用的一个操作kafka的三方

在聊天服务配置文件中添加kafka配置

在聊天服务中,我使用kafka-go的Writer,Reader Api进行写入与读取

  1. 易于使用Writer 和 Reader 提供了简单的接口和方法,使得发送和接收消息变得非常直观和易于理解。它们封装了 Kafka 的复杂性,提供了一种更简单的方式来与 Kafka 进行交互。

  2. 高级功能kafka-go 库的 Writer 和 Reader 支持许多高级功能,例如批量写入消息、异步写入、读取消息的偏移量控制等。这些功能使得处理和管理消息变得更加灵活和高效。

  3. 性能优化kafka-go 库的 Writer 和 Reader 实现了一些性能优化策略,例如批量发送和异步处理,以提高消息的吞吐量和性能。它们经过了优化和测试,可以在高负载的情况下提供良好的性能表现。

我们看看官方给我们的实例

  • Reader

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // make a new reader that consumes from topic-A, 
    // partition 0, at offset 42
    r := kafka.NewReader(kafka.ReaderConfig{
    Brokers: []string{"localhost:9092"},
    Topic: "topic-A",
    Partition: 0,
    MaxBytes: 10e6, // 10MB
    })
    r.SetOffset(42)

    for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
    break
    }
    fmt.Printf("message at offset %d: %s = %s\n", m.Offset,
    string(m.Key), string(m.Value))
    }

    if err := r.Close(); err != nil {
    log.Fatal("failed to close reader:", err)
    }
  • Writer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // make a writer that produces to topic-A, using the 
    // least-bytes distribution
    w := &kafka.Writer{
    Addr: kafka.TCP("localhost:9092"),
    Topic: "topic-A",
    Balancer: &kafka.LeastBytes{},
    }

    err := w.WriteMessages(context.Background(),
    kafka.Message{
    Key: []byte("Key-A"),
    Value: []byte("Hello World!"),
    },
    kafka.Message{
    Key: []byte("Key-B"),
    Value: []byte("One!"),
    },
    kafka.Message{
    Key: []byte("Key-C"),
    Value: []byte("Two!"),
    },
    )
    if err != nil {
    log.Fatal("failed to write messages:", err)
    }

    if err := w.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
    }

    我们看到这两个配置使用的方法并不相同(NewReader和Writer{}),因为官方把NewWriter方法废弃了,由于某些安全原因

在这个两个连接实例中,我们不难得出我们的配置必备的是哪些东西

localhost:9092 broker地址

topic 分类名

partition 分块号

我们可以添加读取与写入超时时间,提高连接使用效率

configs/config.yaml

1
2
3
4
5
6
kafka:
addr: 127.0.0.1:9092
topic: "message"
partition: 0
read_timeout: 0.2s
write_timeout: 0.2s

conf/conf.proto

1
2
3
4
5
6
7
8
9
10
11
 message Bootstrap {
Kafka kafka = 1;
}

message Kafka {
string addr = 1;
string topic = 2;
string partition = 3;
google.protobuf.Duration read_timeout = 4;
google.protobuf.Duration write_timeout = 5;
}

设置Kafka连接

因为是在Kratos中使用,我们将Kafka连接建立写在服务data层内,并合并至NewData中,为data层提供操作

我们首先整合一下Writer和Reader类型,创建一个新的Conn结构体

1
2
3
4
5
6
7
8
9
10
11
type KafkaConn struct {
writer *kafka.Writer
reader *kafka.Reader
}

// 在Data中引入,并小写属性隐藏连接,为提供给外部文件做准备
type Data struct {
db *gorm.DB
kfk *KafkaConn
log *log.Helper
}

添加一个KafkaConn的New方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func NewKafkaConn(c *conf.Data) *KafkaConn {
writer := kafka.Writer{
Addr: kafka.TCP(c.Kafka.Addr),
Topic: c.Kafka.Topic,
Balancer: &kafka.LeastBytes{},
WriteTimeout: c.Kafka.WriteTimeout.AsDuration(),
ReadTimeout: c.Kafka.ReadTimeout.AsDuration(),
}
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{c.Kafka.Addr},
Topic: c.Kafka.Topic,
MaxBytes: 10e6, // 10MB
})
return &KafkaConn{
writer: &writer,
reader: reader,
}
}

// 在wire中添加New方法,使用kratos自带的依赖注入工具进行处理
var ProviderSet = wire.NewSet(NewData, NewKafkaConn)

kratos集成了wire依赖注入工具,直接生成代码确实好用

在服务入口文件中使用 wire 完成操作

接下来,你就可以在外部文件中引入KafkaConn进行生产与消费

wait… 还有一小步,初始化开启一个goroutine进行消费者循环

初始化消费者循环读取及生产者

  • 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // InitStoreMessageQueue 初始化聊天记录存储队列
    func (r *messageRepo) InitStoreMessageQueue() {
    ctx := context.Background()
    for {
    msg, err := r.data.kfk.reader.ReadMessage(ctx)
    if err != nil {
    r.log.Errorf("read message error, err: %v", err)
    }
    value := msg.Value
    var mg *Message
    err = json.Unmarshal(value, mg)
    if err != nil {
    r.log.Errorf("json unmarshal error, err: %v", err)
    }
    err = r.InsertMessage(ctx, mg.FromUserId, mg.ToUserId, mg.Content)
    if err != nil {
    r.log.Errorf("insert message error, err: %v", err)
    }
    r.log.Infof("message: UserId-%v to UserId-%v: %v ", mg.FromUserId, mg.ToUserId, mg.Content)
    }
    }

    在biz层使用接口,并创建

    1
    2
    3
    4
    5
    6
    7
    8
    func NewMessageUsecase(
        repo MessageRepo, logger log.Logger) *MessageUsecase {
    go repo.InitStoreMessageQueue()
    return &MessageUsecase{
    repo: repo,
    log: log.NewHelper(log.With(logger)),
    }
    }
  • 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // PublishMessage 生产者发送消息
    func (r *messageRepo) PublishMessage(ctx context.Context, userId, toUSerId uint32, content string) error {
    mg := &Message{
    FromUserId: userId,
    ToUserId: toUSerId,
    Content: content,
    CreatedAt: time.Now().UnixMilli(),
    }
    byteValue, err := json.Marshal(mg)
    if err != nil {
    return fmt.Errorf("json marshal error, err: %w", err)
    }
    err = r.data.kfk.writer.WriteMessages(ctx, kafka.Message{
    Value: byteValue,
    })
    if err != nil {
    return fmt.Errorf("write message error, err: %w", err)
    }
    return nil
    }

到此为止,我们就建立了一个可以持续接收消息,并存储到数据库的kafka消息队列啦

  • 标题: kratos接入kafka
  • 作者: IntYou
  • 创建于: 2023-08-22 08:33:21
  • 更新于: 2023-08-22 16:04:17
  • 链接: https://intyou.netlify.app/2023/08/22/kratos接入kafka/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。