请教一个 Java 多线程嵌套使用的问题

2020-11-05 00:26:47 +08:00
 season8

同事碰到一个问题,我写了个 demo 复现,研究了好几天还是没头绪,多线程程场景也没有调试思路,干脆发个帖,想看看有没有大佬可以指点一二。

模拟场景:三个消费组消费异步消费,每组有三个任务,任务之间异步执行,但必须都执行完毕后消费组才算结束。 设计上,消费组线程池给 3 个线程,控制每次只有三个组能消费。 任务线程池给的是大于 3*3,按我的理解是,外层 3 个消费组,每组三个任务,实时任务应该不会超过 9 个。

但程序执行一会儿就发现会有消费组批量涌入,导致里层线程池触发 reject 。

Demo 如下:

public static void main(String[] args) throws InterruptedException {

		ThreadPoolExecutor outter = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(197));
		ThreadPoolExecutor inner = new ThreadPoolExecutor(9, 12, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));

		for (int i = 0; i < 200; i++) {
			int group = i;
			outter.execute(() -> {
				System.out.println("开始第  " +group+"  组消费");
				CountDownLatch countDownLatch = new CountDownLatch(3);
				for (int j = 0; j < 3; j++) {
					int task = j;
					inner.execute(() -> {
						System.out.println(group + "--消费数据:" + task);
						countDownLatch.countDown();
					});
				}
				try {
					countDownLatch.await();
					System.out.println(group + "--消费完成");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			});
		}
	}
3887 次点击
所在节点    问与答
13 条回复
micean
2020-11-05 02:21:44 +08:00
个人猜测
countDownLatch.countDown() 之后,outter 完成了这次任务并开始下一个了,但是 inner 还没有完成,队列塞不下
把 inner 的队列从 1 调高一些就行了
season8
2020-11-05 09:23:24 +08:00
@micean
countDownLatch 不是最后一个 inner 线程执行完成后唤醒 outter 线程吗?那 outter 线程结束应该就意味着有三个 inner 结束。

而且,尝试过,countDownLatch.await();之后 sleep,也是存在这个问题
Vedar
2020-11-05 09:27:07 +08:00
你这个 outer 不停的在刷,第五组消费的时候就已经超过 inner 的容纳能力了 肯定会 reject 掉,主要原因还是你 outer 没有阻塞
Vedar
2020-11-05 09:29:19 +08:00
@season8 你是在 outer 线程池里面开一个线程去 await 的 这根本没阻塞 outer
micean
2020-11-05 10:31:38 +08:00
@season8
outter 线程结束并不意味着有三个 inner 结束,countDownLatch 释放之后,outter 和 3 个 inner 可没有先后执行顺序
lancelee01
2020-11-05 10:33:24 +08:00
3L 正解,countDownLatch 没什么用,感觉你的场景需要的是限流器,全局限流即可。同时 LinkedBlockingQueue 这个队列的可能和你想的不太一样,网上的八股文不对,你试试-_-!
wysnylc
2020-11-05 10:42:27 +08:00
别用 countDownLatch,换成 Completablefuture
1194129822
2020-11-05 12:00:48 +08:00
建议创建线程池时传 ThreadFactory 参数,打印不要用 System.out.println,请换成 log 可以查看是具体那条线程。inner 线程池最大任务数 = 12 + 1 (建议不要设置非核心线程)。出现这个问题并没有什么高深的原理,仅仅是线程运行的不确定性,第一轮 outter 给 inner 提交了 9 个 task,此时 inner 正常,outter 三个线程被正确的阻塞。当 inner 运行所有 countDown 后,第一轮 inner 执行还没完全结束,outter 三个线程被唤醒,**注意**,此时线程执行没有了先后顺序和逻辑关系,完全靠 os 调度器调度,如果第二轮 outter 线程三个线程先提交任务,此时 inner 线程池最多可以接受 4 个任务,就是说这一轮已经可能出现错误了。而且一旦触发 RejectedExecutionException,try-catch 没有捕获这个异常,则直接杀死 outter 的核心线程,造成 outter 线程池,execute->Rejected->kill thread->create thread->execute 的恶性循环。代码根本没走到 await,所以一旦 Rejected 就不再阻塞了。
micean
2020-11-05 12:51:15 +08:00
源码里是这样的:
final void runWorker(Worker w) {
Runnable task = w.firstTask
...
try {
while (task != null || (task = getTask()) != null) { //queueSize-1 (返回 null 时 workerSize-1 )
...
task.run(); // 任务跑完了
...
}
} finally {
processWorkerExit(w, completedAbruptly); // workerSize-1
}
}

而 reject 的条件是 [queue 满] 或者 [worker 满] ,你觉得 countDown 结束了,其实只是跑完了 run()而已
zoharSoul
2020-11-05 13:37:58 +08:00
这直接用 rxjava 多方便啊...
yexiangyang
2020-11-05 14:16:03 +08:00
@micean 这个源码分析很有道理啊!
season8
2020-11-06 00:09:19 +08:00
@Vedar @1194129822 @lancelee01 @micean 感谢各位的热情解答,我很受启发。再结合朋友给的例子,我仔细读了下源码,已经大致能复盘这个错误了。

**inner 线程池 reject 的原因:**

1. 主要原因:队列太小,这里给的是 1,实际每个 outer 线程要产生 3 个任务
2. 次要原因:outter 线程里面使用 countdownlatch 确实不能起到很好的限流作用,

**次要原因分析:**
如 runWorker()源码所示,run 执行完毕并不能代表线程任务执行完毕。这意味着 outter 线程与 inner 线程的空闲线程数可能不是 1:3 的关系。但这里可以通过让 outter 线程 sleep 等待 inner 先执行完成,规避这个因素的影响。规避后,问题还是会存在,说明不是主要原因。

**主要原因分析:**
先来看个案例
```
static class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
public MyLinkedBlockingQueue(int capacity) {
super(capacity);
}

@Override
public boolean offer(E o) {
System.out.println("任务加入,当前队列数:" + this.size());
return super.offer(o);
}
}

public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new MyLinkedBlockingQueue<>(1);

// 3 个线程的线程池
ThreadPoolExecutor taskPoolExecutor = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, queue);

// 先将线程池拉满
for (int i = 0; i < 3; i++) {
final int finalI = i;
taskPoolExecutor.execute(() -> {
logger.info("{}", finalI);
});
}

// 等待全部任务执行完
Thread.sleep(1000);

// 再次执行任务,发现每一个任务都触发加入队列操作。
for (int i = 10; i < 12; i++) {
final int finalI = i;
// 多线程更容易触发 reject
// new Thread(()-> taskPoolExecutor.execute(() -> logger.info("{}", finalI))).start();
taskPoolExecutor.execute(() -> logger.info("{}", finalI));
}
}
```

执行结果:

> 23:12:39.988 [pool-1-thread-3] INFO c.r.s.Demo8.lambda$main$0:34 - 2
23:12:39.988 [pool-1-thread-2] INFO c.r.s.Demo8.lambda$main$0:34 - 1
23:12:39.988 [pool-1-thread-1] INFO c.r.s.Demo8.lambda$main$0:34 - 0
任务加入,当前队列数:0
23:12:40.997 [pool-1-thread-3] INFO c.r.s.Demo8.lambda$null$1:46 - 10
任务加入,当前队列数:0
23:12:41.000 [pool-1-thread-2] INFO c.r.s.Demo8.lambda$null$1:46 - 11

跑完这个案例我感觉我根本不懂线程池,我翻了下源码:
```
public void execute(Runnable command) {
...
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

// 线程池满了后,直接不创建核心线程了
// 这里 isRunning 看的我懵逼,明明任务都执行完了,为啥还是 isRunning,先接受,后面再研究 [1]
// 然后就触发入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
```

我以为的线程池是:只要有空闲线程,任务是直接丢给线程去执行的。
**实际情况是:当核心线程数满,不管已有线程是否空闲,任务是先丢到队列,然后空闲线程从队列里面自取。**

案例中,我给的队列大小是 1,当队列满的时候,会扩容线程池到最大线程池大小到 12,此时如果队列是满的(不管线程是否空闲),继续添加就会 reject 。案例中每组有三个任务,只要线程从队列 take 任务不及时,队列很容易满,从而触发 reject 。

**验证:**
1. countDownLatch.await(); 后面加上 sleep,让 outter 线程等 inner 线程结束,排除最开始说的第二个因素的影响。
2. 将队列改成 3,适当调整线程执行时间(也可以不调),reject 很少触发或不触发。
3. 将队列改成 9,没有触发 reject

**总结:**
1. 这个任务表面是多线程嵌套调用,内外线程调度不确定性导致的线程池问题,其实本质是对线程池理解不对导致线程池滥用的问题。
2. 任务是添加到队列,空闲线程调用 take()获取,而不是有空闲线程就直接丢到空闲线程(实际任务也难以主动去找空闲线程,还容易造成等待,让线程自取则是生产消费的模式。)
3. isRunning(c) 这个方法以及相关机制,还要再研究一下。


再次感谢各位,如有不对的地方,还请指出。。
season8
2020-11-06 00:11:04 +08:00
啊。。评论不支持 md,排版好丑,又有点长,各位见谅。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://tanronggui.xyz/t/721868

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX