请教, kafka 如何做到一个 topic 分发不同的类型的消息

2023-04-25 11:46:42 +08:00
 NoKey
场景是这样的,上游服务 A ,通过 kafka 发消息个下游服务 B,C,D

为了后续集成方便,A 使用了一个 Topic

这个时候,需要 BCD 接收自己的消息

这种场景下,如何才能控制 BCD 只收到自己的消息,不收别人的消息呢?

考虑了几种方式:
1. 通过 key 。这样下游服务只有收到消息之后才知道 key 是啥,不是自己的丢弃,但是这样必须收消息,也就是 B 会收到 C ,D 的消息,感觉不好。
2. 通过分区。不同下游的消息放到不同的分区,但是这样会造成分区不均衡,部分分区过大。

请问一下大家有没有更好的办法呢?谢谢
2507 次点击
所在节点    程序员
25 条回复
antipro
2023-04-25 11:52:43 +08:00
给 B ,C ,D 各建一个 Kafka
aijam
2023-04-25 11:57:02 +08:00
1F +1
cloudzhou
2023-04-25 12:09:09 +08:00
给 B ,C ,D 各建一个 Topic 就可以
dddd1919
2023-04-25 12:10:29 +08:00
如果只让 BCD 接收到自己的消息,那就在 push 时分三个 topic ,直接把消息隔离开,缺点就是负载可能不均服务利用率降低
如果只让 BCD 处理自己要的消息并忽略掉无意义消息,可以在各自 consumer 加 filterStrategy 过滤掉无关消息
NoKey
2023-04-25 12:18:19 +08:00
建多个 topic 的麻烦点就是,后续要不断的增加 topic ,有没有办法,一个 topic 就可以解决呢?😂
ChaYedan666
2023-04-25 12:37:41 +08:00
@NoKey 不可能吧,不论怎么说,只要是都发一个 topic ,那么 BCD 就得把里面的消息拉过来做过滤,过滤后再消费自己的;或者另一种就是你自己说的第二种,同一个消费者组,监听不同分区,根据 key 发不同的分区,分区不均衡啥的就你得自己控制了
wuYin
2023-04-25 12:59:56 +08:00
也许可以用 2 个 kafka 集群,A 写集群 1 ,自己写个 connector 做消息解析与分发,写到集群 2 的三个 topic ,再由 B C D 各自消费。
这种做法引入了新的集群和组件,成本和维护代价更高。可行但不建议
securityCoding
2023-04-25 13:02:37 +08:00
kafka 不好做,换阿里云 rocketmq 加 tag
kaddusabagei38
2023-04-25 13:39:54 +08:00
建议换队列
urnoob
2023-04-25 13:47:16 +08:00
B C D 各自作为一个消费者组。
waitwait365
2023-04-25 13:51:24 +08:00
用 rabbitmq
zgzhang
2023-04-25 14:04:05 +08:00
kafka stream 做个任务来处理
WhereverYouGo
2023-04-25 14:12:41 +08:00
在消息体里定义 business_type: B 、C 、D ,然后引进一个中间层 X ,X 直接消费 A 发送的消息,并根据 business_type 决定调用( HTTP 或 RPC ) B 、C 、D 。(计算机科学中的每个问题都可以用一间接层解决 doge )
WhereverYouGo
2023-04-25 14:14:48 +08:00
但是上述方案有个问题:B 、C 、D 直接接受流量的冲击,没有 MQ 来缓冲,服务可能会被打爆
fkdog
2023-04-25 14:20:54 +08:00
明明有现成的高速公路,多建两个 topic 的事,你非得要自己单独再修一条路。我不知道怎么评价你这个需求。。
“为了方便”,请问改成 3 个 topic 不方便在哪里?
awinds
2023-04-25 14:51:37 +08:00
除非你真的有需求有另外的 E 同时消费所有数据,不然就多个 topic 吧
lower
2023-04-25 14:53:23 +08:00
@WhereverYouGo 感觉问题不大,X 其实已经在 mq 后面了,慢慢一个一个取消息就行
Super8
2023-04-25 15:14:49 +08:00
可以在消息的 key 或者 value 中添加标识,例如在消息的 key 中添加 B 、C 、D 等标识,表示该消息是发给 B 、C 、D 的,然后在消费者端使用带有过滤条件的消费者来消费消息,只消费自己需要的消息。具体可以使用 Kafka 的 Consumer API 提供的 subscribe 方法中的参数来实现,例如使用 subscribe(Collections.singleton(topic), new MyPartitionAssignor()) 方法,其中 MyPartitionAssignor 实现了 PartitionAssignor 接口,可以根据标识来分配分区。另外,也可以使用 Kafka Streams 来实现消息过滤和分发。
Super8
2023-04-25 15:15:32 +08:00
rocketmq 中 tag 最适合这个场景
zhaoyy0513
2023-04-25 16:12:30 +08:00
@Super8 我创建的 KafkaConsumer 用到的 api 里面没有这两个参数的方法啊老哥,你说的这个 kafka 是哪个版本的啊

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://tanronggui.xyz/t/935312

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX