使用ThreadPoolExecutor解决并发问题
背景
业务功能需要对多条信息数据校验其合法性,该校验非HTTP协议的接口校验,需要通过主机发送whois命令,根据返回的结果处理信息。而该命令的从请求到结果的返回需要5-12s。若业务侧需要同一时间对大量数据进行校验,若不进行优化,则会导致接口执行时间过长。
线程池
thread pool,线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
- 降低线程创建和销毁过程的资源消耗
- 提高响应速度
- 提高线程的可管理型
ThreadPoolExecutor分析
构造器各个参数的含义
- int corePoolSize:核心线程数,当执行任务个数 < corePoolSize,就会启动对应的个数线程去执行任务。
- int maximumPoolSize:线程池中允许的最大线程数,BlockingQueue满了,执行任务数 < maximumPoolSize的时候,才会再次创建新的线程来执行任务。
- long keepAliveTime:线程空闲下来之后,存货的时间;只在活动线程数量 > corePoolSize的时候才有用。
- TimeUnit unit,空闲线程存活的时间单位。
- BlockingQueue workQueue:要保存任务的阻塞队列
- ThreadFactory threadFactory:创建线程的工厂,给新建线程的名称赋值
- RejectedExecutionHandler handler:饱和策略/拒绝策略/丢弃策略
- AbortPolicy:直接抛出异常,默认的情况
- CallerRunsPolicy:用调用者所在的线程去执行任务
- DiscardOldestPolicy:丢弃阻塞队列中最老的任务,也就是队列中最靠前的任务
- DiscardPolicy:直接丢弃当前任务
- 其他:自己新建一个类,实现RejectedExecutionHandler接口,自定义饱和策略。
提交任务
void execute(Runnable command) 不需要返回
Future submit(Callable task) 需要返回
关闭线程池
shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程
shutdown():设置线程池的状态,只会中断所有没有执行任务的线程
线程池的工作机制
优先使用核心线程处理任务。
如果核心线程都已忙碌,则将任务压入阻塞队列,等待线程池有空闲线程,从队列中获取任务执行。
当阻塞队列已满,启动新的线程执行任务。前提是线程池支持的线程数充足。
直到线程池所有的线程都在执行任务,且无法再启动新的线程,此时执行饱和策略(拒绝策略)
合理配置线程池
计算密集型
- 加密、大数分解、正则
线程数小,推荐:CPU核心数+1
IO密集型
- 读取文件、数据库连接、网络通讯
线程数大,推荐:CPU核心数*2
混合型
- 尽量拆分
IO密集型>>计算密集型:拆分意义不大
IO密集型≈计算密集型,建议使用有界队列
常见系统线程池
FixedThreadPool
- 固定线程数量,适用负载较重的服务器,使用了无界队列LInkedBlockingQueue
SingleThreadExecutor
- 创建单个线程,需要保证顺序执行任务,不会有多个线程活动,使用了无界队列
CachedThreadPool
- 根据需要创建新线程,执行很多短期异步任务的程序SynchronousQueue
WorkStealingPool
ScheduledThreadPoolExecutor
- 需要定期、周期地执行任务
Executor框架使用流程
一般都不能直接使用Executors.new***ThreadPool()的情况
- 使用了无界队列,系统几乎没有使用有界队列,OOM异常
系统提供的饱和策略:直接抛出异常,不合适。
饱和策略可以自己去实现,通过写日志、存入本地文件、DB、生成一个新的队列等都是有可能的。
结合线程池与CountDownLatch解决并发同步问题
结合需求背景:
- 网络通信
- whois调用频率问题,同一时间批量调用会导致请求被拒绝而返回空值的问题
- 结合实际业务场景,选择核心线程数量为CPU核心数*2:32,BlockingQueue队列长度根据具体业务场景决定。
- 结合对端的请求频率限制策略,循环获取需要校验的信息数据时,主线程往BlockingQueue队列塞任务前,先随机睡眠100-500ms,避免被限制请求。
- 设置CountDownLatch数量为需要校验的信息数据的数量,每个线程任务完全结束后执行countDownLatch.countdown()
- 主线程往BlockingQueue插完任务后,在主线程调用countDownLatch.await()等待线程执行完毕,未避免线程任务出问题。可设置等待的时长boolean b = countDownLatch.await(num, TimeUnit),返回执行结果。
- 线程之间使用共享变量,将结果存到List中。
public void checkPrefixInfos () throws IOException {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(32, 32,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(500));
final long MIN_MILLISECONDS = 100L;
final long MAX_MILLISECONDS = 500L;
try {
//业务逻辑省略……
final CountDownLatch latch = new CountDownLatch(prefixModuleArr.size());
for (Object o : prefixModuleArr) {
JSONObject prefixModuleObject = (JSONObject) o;
PrefixValid prefixValid = (PrefixValid) JSONObject.toBean(prefixModuleObject, PrefixValid.class);
// 随机休眠,减少因并发请求导致对端系统触发安全策略的可能
Thread.sleep(MIN_MILLISECONDS + (int) (Math.random() * (MAX_MILLISECONDS - MIN_MILLISECONDS)));
CheckPrefixThread checkPrefixThread = new CheckPrefixThread(prefixValid, successList, failList, latch);
executorService.execute(checkPrefixThread);
}
// 主线程等待五分钟,超时则返回失败
boolean isSuccess = latch.await(300, TimeUnit.SECONDS);
//业务逻辑省略……
} catch (RejectedExecutionException e) {
/*
线程池默认饱和策略:AbortPolicy,出现饱和策略抛出的异常一般是因为
校验数量过多 & 核心线程中的prefix一直处于重试状态,导致无空闲线程释放,
导致任务队列饱和,这里直接捕获异常处理
*/
// 也可以自己实现RejectedExecutionHandler自定义饱和策略
prefixResult.setRetCode(PrefixResult._TIMEOUT);
prefixResult.setRetMsg("线程池饱和!");
JSONObject jSONObject = JSONObject.fromObject(prefixResult);
ResponseUtil.print(ServletActionContext.getResponse(), jSONObject.toString());
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
executorService.shutdown();
}
public class CheckPrefixThread implements Runnable {
// 重试次数
final int RETRY_TIMES = 2;
private int RUN_TIME = 1;
private PrefixValid prefixValid;
private List<PrefixValid> successList;
private List<PrefixValid> failList;
private CountDownLatch countDownLatch;
private String apnicResult = "UnChecked";
private String radbResult = "UnChecked";
private Boolean originFlag = false;
private String prefix;
private String asPath;
/**
* Whois校验Prefix信息
*/
@Override
public void run() {
checkPrefix();
PrefixValid resValid = new PrefixValid();
resValid.setPrefix(prefix);
resValid.setAsPath(asPath);
resValid.setIndex(prefixValid.getIndex());
resValid.setApnicResult(apnicResult);
resValid.setRadbResult(radbResult);
if (originFlag) {
successList.add(resValid);
} else {
failList.add(resValid);
}
countDownLatch.countDown();
}
private void checkPrefix() {
// 业务逻辑省略……
}
//Constructor
public CheckPrefixThread(PrefixValid prefixValid, List<PrefixValid> successList, List<PrefixValid> failList, CountDownLatch countDownLatch) {
this.prefixValid = prefixValid;
this.successList = successList;
this.failList = failList;
this.countDownLatch = countDownLatch;
// 避免可预见的OS命令注入漏洞
this.prefix = prefixValid.getPrefix().replaceAll("&", "").replaceAll("\\|", "");
this.asPath = prefixValid.getAsPath();
}
}