跳至主要內容

使用ThreadPoolExecutor解决并发问题

ZnyoungJava多线程并发

背景

业务功能需要对多条信息数据校验其合法性,该校验非HTTP协议的接口校验,需要通过主机发送whois命令,根据返回的结果处理信息。而该命令的从请求到结果的返回需要5-12s。若业务侧需要同一时间对大量数据进行校验,若不进行优化,则会导致接口执行时间过长。

线程池

thread pool,线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。

  • 降低线程创建和销毁过程的资源消耗
  • 提高响应速度
  • 提高线程的可管理型

ThreadPoolExecutor分析

构造器各个参数的含义

  • int corePoolSize:核心线程数,当执行任务个数 < corePoolSize,就会启动对应的个数线程去执行任务。
  • int maximumPoolSize:线程池中允许的最大线程数,BlockingQueue满了,执行任务数 < maximumPoolSize的时候,才会再次创建新的线程来执行任务。
  • long keepAliveTime:线程空闲下来之后,存货的时间;只在活动线程数量 > corePoolSize的时候才有用。
  • TimeUnit unit,空闲线程存活的时间单位。
  • BlockingQueue workQueue:要保存任务的阻塞队列
  • ThreadFactory threadFactory:创建线程的工厂,给新建线程的名称赋值
  • RejectedExecutionHandler handler:饱和策略/拒绝策略/丢弃策略
    • AbortPolicy:直接抛出异常,默认的情况
  • CallerRunsPolicy:用调用者所在的线程去执行任务
  • DiscardOldestPolicy:丢弃阻塞队列中最老的任务,也就是队列中最靠前的任务
  • DiscardPolicy:直接丢弃当前任务
  • 其他:自己新建一个类,实现RejectedExecutionHandler接口,自定义饱和策略。

提交任务

void execute(Runnable command) 不需要返回

Future submit(Callable task) 需要返回

关闭线程池

shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程

shutdown():设置线程池的状态,只会中断所有没有执行任务的线程

线程池的工作机制

  • 优先使用核心线程处理任务。

  • 如果核心线程都已忙碌,则将任务压入阻塞队列,等待线程池有空闲线程,从队列中获取任务执行。

  • 当阻塞队列已满,启动新的线程执行任务。前提是线程池支持的线程数充足。

  • 直到线程池所有的线程都在执行任务,且无法再启动新的线程,此时执行饱和策略(拒绝策略)

合理配置线程池

  • 计算密集型

    • 加密、大数分解、正则
  • 线程数小,推荐:CPU核心数+1

  • IO密集型

    • 读取文件、数据库连接、网络通讯
  • 线程数大,推荐:CPU核心数*2

  • 混合型

    • 尽量拆分
  • IO密集型>>计算密集型:拆分意义不大

  • IO密集型≈计算密集型,建议使用有界队列

常见系统线程池

  • FixedThreadPool

    • 固定线程数量,适用负载较重的服务器,使用了无界队列LInkedBlockingQueue
  • SingleThreadExecutor

    • 创建单个线程,需要保证顺序执行任务,不会有多个线程活动,使用了无界队列
  • CachedThreadPool

    • 根据需要创建新线程,执行很多短期异步任务的程序SynchronousQueue
  • WorkStealingPool

  • ScheduledThreadPoolExecutor

    • 需要定期、周期地执行任务

Executor框架使用流程

  • 一般都不能直接使用Executors.new***ThreadPool()的情况

    • 使用了无界队列,系统几乎没有使用有界队列,OOM异常
  • 系统提供的饱和策略:直接抛出异常,不合适。

  • 饱和策略可以自己去实现,通过写日志、存入本地文件、DB、生成一个新的队列等都是有可能的。

结合线程池与CountDownLatch解决并发同步问题

结合需求背景:

  1. 网络通信
  2. whois调用频率问题,同一时间批量调用会导致请求被拒绝而返回空值的问题
  • 结合实际业务场景,选择核心线程数量为CPU核心数*2:32,BlockingQueue队列长度根据具体业务场景决定。
  • 结合对端的请求频率限制策略,循环获取需要校验的信息数据时,主线程往BlockingQueue队列塞任务前,先随机睡眠100-500ms,避免被限制请求。
  • 设置CountDownLatch数量为需要校验的信息数据的数量,每个线程任务完全结束后执行countDownLatch.countdown()
  • 主线程往BlockingQueue插完任务后,在主线程调用countDownLatch.await()等待线程执行完毕,未避免线程任务出问题。可设置等待的时长boolean b = countDownLatch.await(num, TimeUnit),返回执行结果。
  • 线程之间使用共享变量,将结果存到List中。
public void checkPrefixInfos () throws IOException {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(32, 32,
				0L, TimeUnit.MILLISECONDS,
				new LinkedBlockingQueue<Runnable>(500));
		final long MIN_MILLISECONDS = 100L;
		final long MAX_MILLISECONDS = 500L;
		try {
            //业务逻辑省略……
			final CountDownLatch latch = new CountDownLatch(prefixModuleArr.size());
			for (Object o : prefixModuleArr) {
				JSONObject prefixModuleObject = (JSONObject) o;
				PrefixValid prefixValid = (PrefixValid) JSONObject.toBean(prefixModuleObject, PrefixValid.class);
				// 随机休眠,减少因并发请求导致对端系统触发安全策略的可能
				Thread.sleep(MIN_MILLISECONDS + (int) (Math.random() * (MAX_MILLISECONDS - MIN_MILLISECONDS)));
				CheckPrefixThread checkPrefixThread = new CheckPrefixThread(prefixValid, successList, failList, latch);
				executorService.execute(checkPrefixThread);
			}
			// 主线程等待五分钟,超时则返回失败
			boolean isSuccess = latch.await(300, TimeUnit.SECONDS);
			//业务逻辑省略……
		} catch (RejectedExecutionException e) {
			/*
			线程池默认饱和策略:AbortPolicy,出现饱和策略抛出的异常一般是因为
			校验数量过多 & 核心线程中的prefix一直处于重试状态,导致无空闲线程释放,
			导致任务队列饱和,这里直接捕获异常处理
			 */
            // 也可以自己实现RejectedExecutionHandler自定义饱和策略
			prefixResult.setRetCode(PrefixResult._TIMEOUT);
			prefixResult.setRetMsg("线程池饱和!");
			JSONObject jSONObject = JSONObject.fromObject(prefixResult);
			ResponseUtil.print(ServletActionContext.getResponse(), jSONObject.toString());
		} catch (Exception e) {
			throw new RuntimeException(e);
		} finally {
			executorService.shutdown();
		}
public class CheckPrefixThread implements Runnable {
    // 重试次数
    final int RETRY_TIMES = 2;
    private int RUN_TIME = 1;
    private PrefixValid prefixValid;
    private List<PrefixValid> successList;
    private List<PrefixValid> failList;
    private CountDownLatch countDownLatch;
    private String apnicResult = "UnChecked";
    private String radbResult = "UnChecked";
    private Boolean originFlag = false;
    private String prefix;
    private String asPath;

    /**
     * Whois校验Prefix信息
     */
    @Override
    public void run() {
        checkPrefix();
        PrefixValid resValid = new PrefixValid();
        resValid.setPrefix(prefix);
        resValid.setAsPath(asPath);
        resValid.setIndex(prefixValid.getIndex());
        resValid.setApnicResult(apnicResult);
        resValid.setRadbResult(radbResult);
        if (originFlag) {
            successList.add(resValid);
        } else {
            failList.add(resValid);
        }
        countDownLatch.countDown();
    }

    private void checkPrefix() {

        // 业务逻辑省略……
    }

    //Constructor
    public CheckPrefixThread(PrefixValid prefixValid, List<PrefixValid> successList, List<PrefixValid> failList, CountDownLatch countDownLatch) {
        this.prefixValid = prefixValid;
        this.successList = successList;
        this.failList = failList;
        this.countDownLatch = countDownLatch;
        // 避免可预见的OS命令注入漏洞
        this.prefix = prefixValid.getPrefix().replaceAll("&", "").replaceAll("\\|", "");
        this.asPath = prefixValid.getAsPath();
    }
}
上次编辑于:
贡献者: 麦正阳