kratos接入kafka

今天开始准备制作聊天功能,希望使用kafka进行消息的读写操作,保障消息的可靠性,以及对mysql的持久化要求
Kafka 基础概念
Producer 和 Consumer
对于kafka来说,有两种基本类型: Producer 和 Consumer
Producer 简单来说就是用来生产消息或者源信息的,而Consumer 则负责消费消息,使用消息
Topic 和 Partition
Topic
在kafka中,消息根据Topic进行分类,每个Topic对应一个消息队列。
从大的来讲,我们可以将每个服务划分为一个Topic,数据按照服务进行分类处理。
从小的来看,我们也能细化每个服务的增删改查操作,每个操作维护一个Topic,进行更细致的划分
Partition
Partition是为了解决Topic内数据传输方式不同,导致单个Topic出现吞吐量不足的问题
他将单个Topic分化为多个块,对内部以不同方式传输的数据进行分块处理,例如将发送消息与删除消息分为两块进行,提高了未分块时串行输出的效率
Broker 和 Cluster
一个kafka服务器称为一个Broker,多个Broker组成一个Cluster集群
其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责
Docker-Compose 安装 Kafka
这里我们直接使用docker-compose安装kafka,简便快捷
当然,kafka依赖于zookeeper,虽然已经有版本不需要依赖,但我还是使用一般模式,目前仅使用单机,也就不再提及集群
配置docker-compose文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
volumes:
- ../../../data/zookeeper/data:/data # 挂载到项目data目录
kafka:
image: confluentinc/cp-kafka:6.2.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:${ZOOKEEPER_PORT}
volumes:
- ../../../data/kafka/data:/data # 挂载到项目data使用命令安装(Linux)
1
2# 下载docker-compose文件内依赖容器,并后台运行
sudo docker-compose up -d
使用Kafka-go在Kratos中操作Kafka
因为我们的服务框架为Kratos,所以就在Kratos中进行操作‘
对于Kafka-go,是我个人喜欢使用的一个操作kafka的三方
在聊天服务配置文件中添加kafka配置
在聊天服务中,我使用kafka-go的Writer,Reader Api进行写入与读取
易于使用:
Writer
和Reader
提供了简单的接口和方法,使得发送和接收消息变得非常直观和易于理解。它们封装了 Kafka 的复杂性,提供了一种更简单的方式来与 Kafka 进行交互。高级功能:
kafka-go
库的Writer
和Reader
支持许多高级功能,例如批量写入消息、异步写入、读取消息的偏移量控制等。这些功能使得处理和管理消息变得更加灵活和高效。性能优化:
kafka-go
库的Writer
和Reader
实现了一些性能优化策略,例如批量发送和异步处理,以提高消息的吞吐量和性能。它们经过了优化和测试,可以在高负载的情况下提供良好的性能表现。
我们看看官方给我们的实例
Reader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// make a new reader that consumes from topic-A,
// partition 0, at offset 42
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Partition: 0,
MaxBytes: 10e6, // 10MB
})
r.SetOffset(42)
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at offset %d: %s = %s\n", m.Offset,
string(m.Key), string(m.Value))
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}Writer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// make a writer that produces to topic-A, using the
// least-bytes distribution
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
}
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}我们看到这两个配置使用的方法并不相同(NewReader和Writer{}),因为官方把NewWriter方法废弃了,由于某些安全原因
在这个两个连接实例中,我们不难得出我们的配置必备的是哪些东西
localhost:9092 broker地址
topic 分类名
partition 分块号
我们可以添加读取与写入超时时间,提高连接使用效率
configs/config.yaml
1 | kafka: |
conf/conf.proto
1 | message Bootstrap { |
设置Kafka连接
因为是在Kratos中使用,我们将Kafka连接建立写在服务data层内,并合并至NewData中,为data层提供操作
我们首先整合一下Writer和Reader类型,创建一个新的Conn结构体
1 | type KafkaConn struct { |
添加一个KafkaConn的New方法
1 | func NewKafkaConn(c *conf.Data) *KafkaConn { |
kratos集成了wire依赖注入工具,直接生成代码确实好用
在服务入口文件中使用
wire
完成操作
接下来,你就可以在外部文件中引入KafkaConn进行生产与消费
wait… 还有一小步,初始化开启一个goroutine进行消费者循环
初始化消费者循环读取及生产者
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// InitStoreMessageQueue 初始化聊天记录存储队列
func (r *messageRepo) InitStoreMessageQueue() {
ctx := context.Background()
for {
msg, err := r.data.kfk.reader.ReadMessage(ctx)
if err != nil {
r.log.Errorf("read message error, err: %v", err)
}
value := msg.Value
var mg *Message
err = json.Unmarshal(value, mg)
if err != nil {
r.log.Errorf("json unmarshal error, err: %v", err)
}
err = r.InsertMessage(ctx, mg.FromUserId, mg.ToUserId, mg.Content)
if err != nil {
r.log.Errorf("insert message error, err: %v", err)
}
r.log.Infof("message: UserId-%v to UserId-%v: %v ", mg.FromUserId, mg.ToUserId, mg.Content)
}
}在biz层使用接口,并创建
1
2
3
4
5
6
7
8func NewMessageUsecase(
repo MessageRepo, logger log.Logger) *MessageUsecase {
go repo.InitStoreMessageQueue()
return &MessageUsecase{
repo: repo,
log: log.NewHelper(log.With(logger)),
}
}生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// PublishMessage 生产者发送消息
func (r *messageRepo) PublishMessage(ctx context.Context, userId, toUSerId uint32, content string) error {
mg := &Message{
FromUserId: userId,
ToUserId: toUSerId,
Content: content,
CreatedAt: time.Now().UnixMilli(),
}
byteValue, err := json.Marshal(mg)
if err != nil {
return fmt.Errorf("json marshal error, err: %w", err)
}
err = r.data.kfk.writer.WriteMessages(ctx, kafka.Message{
Value: byteValue,
})
if err != nil {
return fmt.Errorf("write message error, err: %w", err)
}
return nil
}
到此为止,我们就建立了一个可以持续接收消息,并存储到数据库的kafka消息队列啦
- 标题: kratos接入kafka
- 作者: IntYou
- 创建于: 2023-08-22 08:33:21
- 更新于: 2023-08-22 16:04:17
- 链接: https://intyou.netlify.app/2023/08/22/kratos接入kafka/
- 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。