请教各位 Golang 大神一段代码

2024-01-04 10:40:59 +08:00
 fruitmonster

背景:

这是一个批量接口,目的是接口接收数据,数据交给协程存入 Kafka ,接口立即响应成功,越快越好,请求频率每秒 70 次,一次请求数组携带 1000 条数据。

压测的时候发现这个接口使用协程会导致服务内存暴涨,昨天查了一下午也没有头绪,即使什么也不做,只打印个数组元素的长度,发现如果不使用协程直接 for 循环处理 requestData 数据,内存就不会上涨,但这样会影响接口的响应速度,使用协程处理的话内存会立即上涨,这是为什么呢? 如果开启协程占用了内存,可是协程只有几 KB 呀,若协程内的数据处理占用,那不用协程也占用了内存啊,我的理解用了协程处理能力应该会提高才对啊,为什么反倒下降了呢?附上 pprof 内存图


func PushBatch(c *gin.Context) {
	appGin := app.Gin{C: c}

	gameId := c.Query("game_id")
	if gameId == "" {
		// 记录错误日志···
		return
	}

	// 读取请求体
	bodyBytes, err := io.ReadAll(c.Request.Body)
	if err != nil {
		// 记录错误日志···
		return
	}

	// JSON 反序列化
	var requestData []map[string]interface{}
	if err := json.Unmarshal(bodyBytes, &requestData); err != nil {
		// 记录错误日志···
		return
	}


	// 1.没有使用协程
	for _, entry := range requestData {
		fmt.Println(len(entry))
        
                // Todo 数据写入 Kafaka
	}

	// 2.数据放入协程
	//go func(requestData []map[string]interface{}) {
	//	
	//	for _, entry := range requestData {
	//		//handleEntry(gameId, entry)
	//		fmt.Println(len(entry))
	//	}
	//}(requestData)

	// 返回状态 200
	appGin.Response( http.StatusOK, e.SUCCESS, nil)
}

1793 次点击
所在节点    问与答
20 条回复
lifei6671
2024-01-04 11:24:57 +08:00
使用了协程后,HTTP 请求会立即返回,如果 qps 很大或持续时间很长,会导致你的协程数量暴增,且每个协程都会保持一个 map 数据,当然会导致内存暴增了。
建议使用协程池,保持一定的协程数量,每个协程用来处理写 kafka 的数据,这样可以控制协程数量不会暴涨。还有你这个 json 序列化也有问题,建议直接序列化为结构体,可以使用流式 json 解析,而不是 read 所有数据。
GooMS
2024-01-04 12:50:17 +08:00
其次携程也是要回收的,处理好错误和超时
zsj1029
2024-01-04 13:18:20 +08:00
都已经用 gin 框架了,自带协程池处理请求,没必要自己写协程处理了,func 里面写正常的 Kafka insert 逻辑即可,一秒上千条的处理很轻松的
zdt3476
2024-01-04 13:47:40 +08:00
你有看过请求测试的时候 goroutine 的数量吗?大概率就是开了太多 goroutine 了。你固定开几个 goroutine 执行 kafka 写入的操作。PushBatch 这个接口通过写入一个带 buffer 的 channel 把数据投递到前面的 goroutine 中就好了。
iyaozhen
2024-01-04 14:23:13 +08:00
你不要说多少条,一条多大呢

你说的暴涨是多大呢,这个预期就是会暴涨的,不要说 2G 到 4G 。

kafka 的吞吐到了多少呢,这里卡住也会影响性能的

我猜你这个服务是接收日志或者打点上报的。我之前拿 php-Swoole 做过,3-4w qps 没有问题

你给的信息太少了,大概猜测下你说的问题:
1. 发现如果不使用协程直接 for 循环处理 requestData 数据,内存就不会上涨
那是因为这样接口耗时会增加,这样发压端压力上不来。不知道你是什么发压模型
2. 使用协程处理的话内存会立即上涨,这是为什么呢?
同 1 ,耗时下降,这样单位时间内,你收到的请求更多了,请求 data 都在内存里,可不就上涨了

我的建议是,发送到 kafka 就不要协程了,让这点消耗体现在接口耗时上
sujin190
2024-01-04 14:34:54 +08:00
好像 golang 的 goroutine 调度并不是平衡调度,所以并发很高接口提交速度这么快的话,消费不足肯定导致大量数据拥塞在内存中内存使用量肯定高了,不过你可以试试看其实应该没有一直涨,看你这量不平衡导致内存使用估计得接近 10G 级别了吧,线程的调度平衡性要好的多,但是量特别大也是要考虑线程调度的平衡性影响的

但是换个方向如果你生成业务提交数据不是一直这么高的话,内存充足无所谓的吧,否则如果一直都这么高,或者对内存使用有极度需求,那么你使用 goroutine 提交到 Kafaka 并不能提高你整个系统的吞吐,整体来看对接口延时也帮助不大,整体还是受限于内存大小和 Kafaka 写入速度,异步 goroutine 提交没啥用吧,多余了
fruitmonster
2024-01-04 14:47:33 +08:00
确实,找到问题了,大量且时间长的请求,协程中的 for 循环会把 CPU 跑满,导致一直积压,不能及时处理,但请求依旧,依旧在启动新的协程,导致内存暴涨。因为接口的请求体字段是不固定的,不知道数量,不知道类型,所以我就想的是使用了 map

```go

// 2.数据放入协程
go func(requestData []map[string]interface{}) {

for _, entry := range requestData {
//handleEntry(gameId, entry)
fmt.Println(len(entry))
}
}(requestData)

````
fruitmonster
2024-01-04 14:48:38 +08:00
@GooMS 确实,协程一直在启动 CPU 跑满,协程不能被正常释放,一直在处理 for 循环的数据
FreeEx
2024-01-04 14:49:03 +08:00
1. 删除协程。
2. 发送 kafka 改成异步批量发送。
fruitmonster
2024-01-04 15:00:42 +08:00
@iyaozhen
一次请求,数据体的大小大概 500kb
暴涨就是从服务启动的几十兆,到 7 G 到 10G ,只要给压力会持续暴涨,无限上涨,若服务不停,内存用完为止

您猜测的没错,之前我的目的是为了接口尽可能快的返回数据,我就把数据丢给了协程,让携程去处理,并且在协程中愚蠢的增加了验证、解析的逻辑,等把这部分数据整理完之后再写入 kafka ,问题就出在整理这里,目前发现的问题是:当请求量非常大,原有代码的 for 循环,本身会占用 CPU 的能力,大量请求把 CPU 全部跑满,CPU 处理不了,数据就会在内存中越积越多,所以内存就会上涨
fruitmonster
2024-01-04 15:02:30 +08:00
@sujin190 是的,您分析的没错,我之前把数据交给协程去处理,因为我想在协程里加一些验证,能过通过验证才会写入 Kafka ,现在看来我完全是错的
fruitmonster
2024-01-04 15:15:10 +08:00
@zsj1029 大哥,请明示,没找到 Gin 自带协程池处理请求的相关内容哇
kkbblzq
2024-01-04 15:33:29 +08:00
你这写的有点无力吐槽。。
1. 你这样开协程只是提前返回罢了,对处理效率并没有改进,完全没有必要开,你开了无非是多了一堆挂在后台跑的协程,内存不大才怪了。真开协程应该是按 kafka 单包大小进行分片,比如 100 条数据一个协程发送,且应该开 wg 等协程跑完再返回的。
2. 既然你不需要了解结构,单纯解数组的话,[]json.RawMessage 就可以了,何必解成 map ,你发 kafka 又得序列化回去,反复的消耗 cpu 何必呢。
...
fruitmonster
2024-01-04 15:37:15 +08:00
@kkbblzq 条条在理,正在改
leyoumake1997
2024-01-04 15:40:08 +08:00
将请求数据写入到队列,然后另外一个服务去消费到 kafka 里面
leonshaw
2024-01-04 15:50:33 +08:00
读 request body 时就已经占了一部分内存了,不开新协程时是因为没有提前返回导致并发数受到了限制,确认一下这种情况下并发瓶颈是在客户端还是服务端。你需要的是在服务端读 request body 之前限制并发。
lsk569937453
2024-01-05 08:54:58 +08:00
这是一个批量接口,目的是接口接收数据,数据交给协程存入 Kafka ,接口立即响应成功,越快越好,请求频率每秒 70 次,一次请求数组携带 1000 条数据。单次请求数据大小 500kb 。
============================
先说最简答的方案,加机器。后端服务本来就是无状态的,kafka 也绝对不是瓶颈。而且你自己也说了后端服务内存压力很大,那直接加机器就好了。

其次,在不加机器的情况下,你的代码还有可以继续优化的空间。后端收到请求后不校验,直接完整的将 http 请求写入 kafka ,省去序列化( http 请求到 struct)和反序列化(struct 到 kafka 的 body)的 cpu 。由后续的消费者去一次消费 1000 条消息做处理/校验。

技术没有银弹,完全看你怎么取舍。以前我可能采取第二种方案,现在年龄大了,不想折腾了,我只想会加机器。
PiersSoCool
2024-01-05 11:22:01 +08:00
我处理每秒最高十几万 QPS ,这种不用开协程,直接用 gin 就好;这里直接用 k8s 扩容即可,甚至可以压测出 QPS 按照 QPS 自动扩容

confluent kafka golang 那边,以及 kafka 公有云上会有 bug ,小心踩坑
fruitmonster
2024-01-05 14:42:45 +08:00
@PiersSoCool 请问怎么直接用 Gin 啊,我新手,没听太明白,谢谢啦~
PiersSoCool
2024-01-08 10:13:23 +08:00
@fruitmonster gin 直接处理请求,和上面类似的不开协程,gin 每个 http 请求本身就是个协程,直接往 kafka 写数据

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://tanronggui.xyz/t/1005712

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX