周其仁:停止改革,我们将面临三大麻烦

抛开立场观点不谈,且看周小平写一句话能犯多少语病

罗马尼亚的声明:小事件隐藏着大趋势——黑暗中的风:坚持做对的事相信未来的结果

布林肯突访乌克兰,为何选择去吃麦当劳?

中国不再是美国第一大进口国,贸易战殃及纺织业? 美国进一步延长352项中国商品的关税豁免期

生成图片,分享到微信朋友圈

自由微信安卓APP发布,立即下载! | 提交文章网址
查看原文

使用Java8的CompletableFuture功能(一)

Honwhy 朴素读物 2022-12-27

在Java8环境下,如果需要多线程方式执行多个子任务才能完成一项任务的话,那么可以考虑下使用CompletableFuture这个API。以下内容构思一种多线程查询聚合的场景,看使用CompletableFuture是否合适,以及还有哪些问题存在。

设调用play方法,它需要去调用外部10个接口,这10个接口全部完成后才算play方法完成,接受内容可损。如果全部都顺利完成,那么我们可以得到[0,1,2,...10]这样的结果;如果其中某些接口超时了,结果可能是[2, 3, 4, 6, 7]。

使用Random随机数方式,小于0.5的计为正常,否则就认为不正常,通过Sleep方式模拟超时情况。

if (random.nextDouble() < 0.5) { map.put(finalI, PRESENT);} else { try { TimeUnit.MILLISECONDS.sleep(1_500 + Math.round(random.nextDouble() * 500)); System.err.println(String.format("%d round %d step, after sleep", j, finalI)); } catch (InterruptedException ignore) { // }}

然后就是

- 将每次请求使用CompletableFuture.supplyAsync构造成异步的方式

- 设定一个核心=4,最大=8,队列长度=200的线程池,用来接收异步请求(设定线程池这也是线上环境比较推荐的做法)

- 使用CompletableFuture.allOfCompletableFuture.get+timeout形式来收集结果

但是由于CompletableFuture.allOf后的所有future结果通过get方法并不能拿到,返回的结果是Void类型,它只代表在一定时间内容收集完毕,并不能拿到所有Future的结果。所以,我们想办法用线程安全容器在Future内部的实际逻辑中收集。

static Object PRESENT = new Object();Map<Integer, Object> map = new ConcurrentHashMap<>();// 模仿调用外部10个接口for (int i = 1; i <= 10; i++) { if (random.nextDouble() < 0.5) { map.put(finalI, PRESENT); }}

常常整体接口(宏观角度)会设定自己的超时时间,而且不受内部每次调用其他接口的超时时间影响(微观角度),况且内部中的每次调用接口实际上可能与外部调用接口的时间是有差距的。

比如说外部调用play方法,时间是T1,超时时间设定为1000ms,
内部执行方法调用其他接口,开始第一个调用记为T2, 开始第五个调用记为T3, 开始第十个调用记为T4,
那么有可能:T1 < T2 < T2 + 1000 < T3 < T4;
更夸张的可能:T1 < T3 < T3 + 1000 < T2 < T4,T2和T3之间的顺序是打乱的。

这都是由于线程池核心已满,总之是任务要进入队列排队的情况。可以看到T2 + 1000或者T3 + 1000后面的任务是多余的,已经没有必要执行,而且也不应该执行,应该退出释放资源。

因此完整的代码如下,play方法

static void play(int j) { Random random = new Random(); Map<Integer, Object> map = new ConcurrentHashMap<>(); List<CompletableFuture<Void>> all = new ArrayList<>(); for (int i = 1; i <= 10; i++) { int finalI = i; CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { if (random.nextDouble() < 0.5) { map.put(finalI, PRESENT); } else { try { TimeUnit.MILLISECONDS.sleep(1_500 + Math.round(random.nextDouble() * 500)); System.err.println(String.format("%d round %d step, after sleep", j, finalI)); } catch (InterruptedException ignore) { // } } return null; }, threadPoolExecutor); all.add(future); } try { CompletableFuture.allOf(all.toArray(new CompletableFuture<?>[0])).get(1000, TimeUnit.MILLISECONDS); } catch (Exception e) { System.out.println(j + " round, before clean: " + threadPoolExecutor.getQueue().size()); all.forEach(f -> { if (f.isDone() || f.isCancelled() || f.isCompletedExceptionally()) {
} else { f.cancel(false); } }); System.out.println(j + " round, after clean: " + threadPoolExecutor.getQueue().size()); } Set<Integer> set = map.keySet(); System.out.println(j + " round, play result: " + set);}

在main方法中调用,

static BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(200);static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, blockingQueue);static Object PRESENT = new Object();public static void main(String[] args) { for (int j = 1; j <= 10; j++) { play(j); } threadPoolExecutor.shutdown();}

可惜结果不是预料中的那样,任务并不能取消掉。

1 round, before clean: 5
1 round, after clean: 5
1 round, play result: [2]
1 round 5 step, after sleep
1 round 4 step, after sleep
1 round 3 step, after sleep
1 round 1 step, after sleep

2 round, before clean: 4
2 round, after clean: 4
2 round, play result: [1, 4]
3 round, before clean: 14
3 round, after clean: 14
3 round, play result: []
2 round 2 step, after sleep
2 round 3 step, after sleep
2 round 5 step, after sleep
2 round 6 step, after sleep

可见第二轮的sleep 在第三轮之后才结束,我们的代码中执行的

try { CompletableFuture.allOf(all.toArray(new CompletableFuture<?>[0])).get(1000, TimeUnit.MILLISECONDS);} catch (Exception e) { System.out.println(j + " round, before clean: " + threadPoolExecutor.getQueue().size()); all.forEach(f -> { if (f.isDone() || f.isCancelled() || f.isCompletedExceptionally()) {
} else { f.cancel(false); } }); System.out.println(j + " round, after clean: " + threadPoolExecutor.getQueue().size());}

并没有效果。

待后续



文章有问题?点此查看未经处理的缓存