ExecutorService invokeAll多线程执行任务

/ 技术 / 无站内评论 / 386浏览

前言

在熟悉一个项目代码的时候,发现一个多线程的应用场景,随后查询了相关资料总结下。

代码:

/**
 * 批量任务的限时 invokeAll(tasks) 批量提交不限时任务
 * 
 * invokeAll(tasks, timeout, unit) 批量提交限时任务
 * 
 * InvokeAll方法处理一个任务的容器(collection),并返回一个Future的容器。两个容器具有相同的结构:
 * invokeAll将Future添加到返回的容器中,这样可以使用任务容器的迭代器,从而调用者可以将它表现的Callable与Future 关联起来。
 * 当所有任务都完成时、调用线程被中断时或者超过时限时,限时版本的invokeAll都会返回结果。 超过时限后,任务尚未完成的任务都会被取消。
 *
 */
//创建特定数量的线程池
 ExecutorService service = Executors.newFixedThreadPool(pageSizeRefundList.size());
        List<OrderRefundTask> queryTasks = Lists.newArrayListWithCapacity(pageSizeRefundList.size());

        try {
            for (OrderIndexForm orderIndexForm : pageSizeRefundList) {
                OrderRefundTask infoTask = (OrderRefundTask) applicationContext.getBean("OrderRefundTask", orderIndexForm.getSite(), String.valueOf(orderIndexForm.getId()));
 //将要处理的任务放入任务池
                queryTasks.add(infoTask);
            }

            // 执行所有任务 必须等待所有的任务执行完成后统一返回
            List<Future<Map<String, Object>>> futures = service.invokeAll(queryTasks);

            //处理返回结果
            if (futures.size() > 0) {
                for (Future<Map<String, Object>> future : futures) {
                    Map<String, Object> temp = future.get();
                    OrderRefundInfoModel order;
                    String site;
                    if (null != temp.get("data") && (temp.get("data") instanceof OrderRefundInfoModel) && null != temp.get("site")) {
                        order = (OrderRefundInfoModel) temp.get("data");
                        site = (String) temp.get("site");
                    } else {
                        continue;
                    }

                }
            }

任务类代码

//实现Callable接口重写call方法
public class OrderRefundTask implements Callable<Map<String, Object>> {

    private final Logger logger = LoggerFactory.getLogger(OrderBasicTask.class);

    private String site;

    private Object orderId;

    @Resource
    private OrderInfoDao orderInfoDao;

    public OrderRefundTask(String site, Object orderId) {
        this.site = site;
        this.orderId = orderId;
    }

    @Override
    public Map<String, Object> call() throws Exception {

        Map<String, Object> res = new HashMap<String, Object>();
        res.put("site", site);
        AppContext.setSite(site);
        try {
            logger.debug("site:{},refund search form:{}",site, JSON.toJSONString(orderId));
            //执行任务
             Object data = orderInfoDao.loadOrderRefundInfoByOrderId((String)orderId);
            logger.debug("site:{},orderInfoDao Result:{}", site, JSON.toJSONString(data));
            res.put("data", data);
        } catch (Exception e) {
            logger.error("get site:{} orderRefundInfo fail", site, e);
        } finally {
            AppContext.clearResource();
        }
        return res;
    }

}

ExecutorService的invokeAll方法有两种用法:

1.exec.invokeAll(tasks)

2.exec.invokeAll(tasks, timeout, unit)

其中tasks是任务集合,timeout是超时时间,unit是时间单位

两者都会堵塞,必须等待所有的任务执行完成后统一返回,一方面内存持有的时间长;另一方面响应性也有一定的影响,毕竟大家都喜欢看看刷刷的执行结果输出,而不是苦苦的等待;

但是方法二增加了超时时间控制,这里的超时时间是针对的所有tasks,而不是单个task的超时时间。如果超时,会取消没有执行完的所有任务,并抛出超时异常。相当于将每一个future的执行情况用一个list集合保存,当调用future.get()方法取值时和设置的timeout比较,是否超时。

InvokeAll方法处理一个任务的容器(collection),并返回一个Future的容器。两个容器具有相同的结构;

这里提交的任务容器列表和返回的Future列表存在顺序对应的关系。

invokeAll将Future添加到返回容器中,这样可以使用任务容器的迭代器,从而调用者可以将它表现的Callable与Future关联起来。

当所有任务都完成时、调用线程被中断时或者超过时限时,限时版本的invokeAll都会返回结果。超过时限后,任何尚未完成的任务都会被取消。

作为invokeAll的返回值,每个任务要么正常地完成,要么被取消。

召唤蕾姆
琼ICP备18000156号

鄂公网安备 42011502000211号