创建一个新的线程是有代价的,创建大量的线程会消耗许多系统资源,影响系统的性能。这时,我们就需要使用线程池来管理线程,线程池能缓存线程,可用闲置的线程来执行新任务,能有效控制线程并发数,并对线程进行一些简单的管理。

先创建一个线程任务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TaskThread implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(TaskThread.class);
private String task;

public TaskThread(String task) {
this.task = task;
}

@Override
public void run() {
task();
}

private void task() {
LOG.info("任务:" + task + " 线程:" + Thread.currentThread().getName());
}
}

按照一般方式我们通过 new Thread(new TaskThread("task")); 的方式来创建一个线程,再调用start方法来启动线程。而线程池为我们提供了另一种创建管理方式,线程池的实现方式也有多种。

使用LinkedList做任务队列来实现一个简单线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WorkQueue{

private static final Logger LOG = LoggerFactory.getLogger(WorkQueue.class);
private static Map<String,WorkQueue> queues = new HashMap<String,WorkQueue>();
private final int nThreads;//线程池大小
private final PoolWorker[] threads;//用数组实现线程池
private final LinkedList<Runnable> queue;//双向链表实现任务队列

/**私有构造**/
private WorkQueue(String name,int nThreads)
{
this.nThreads = nThreads;
queue = new LinkedList<Runnable>();
threads = new PoolWorker[nThreads];
for (int i=0; i<this.nThreads; i++) {
threads[i] = new PoolWorker(name+"_"+(i+1));
threads[i].start();
}
LOG.info("初始化线程池,线程数:"+nThreads);
}

/**静态工厂方法创建线程池**/
public static WorkQueue createWorkQueue(String name,int count){
WorkQueue wq = queues.get(name);
if(wq == null){
wq = new WorkQueue(name,count);
queues.put(name, wq);
}
return wq;
}

/**执行任务**/
public void execute(Runnable r) {
synchronized(queue) {
queue.addLast(r);
queue.notify();
}
}

/**工作线程类**/
private class PoolWorker extends Thread {
PoolWorker(String name){
super(name);
}
public void run() {
Runnable r;
while (true) {
synchronized(queue) {
while (queue.isEmpty()) {
try{
queue.wait();//如果任务队列没有任务,等待
}catch (InterruptedException ignored){
}
}
r = (Runnable) queue.removeFirst();//有任务时,取出任务
}
try {
LOG.info("线程:"+Thread.currentThread().getName()+" 开始执行");
r.run();//执行任务
}
catch (Exception e) {
LOG.info("线程:"+Thread.currentThread().getName()+" 执行出现异常!");
e.printStackTrace();
}
LOG.info("线程:"+Thread.currentThread().getName()+" 执行完成");
r = null;
}
}
}
}

测试例子,大小为10的线程池执行20个任务

1
2
3
4
5
6
7
WorkQueue wq = WorkQueue.createWorkQueue("thread", 10);

TaskThread task[] = new TaskThread[20];
for(int i=0; i< task.length; i++){
task[i] = new TaskThread("task_"+i);
wq.execute(task[i]);
}

使用Executors和ThreadPoolExecutor创建线程池

Java 的 java.util.concurrent 包提供了 Executors 类来创建线程池,Executors 类有四种线程池构造,分别为:

  • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

之前在阿里的开发手册中有看到,各种 Executors 返回的线程池对象弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

一般常用 newFixedThreadPool 构造返回 ExecutorService 对象来创建线程池。 ExecutorService 接口继承了 Executor 接口(Executor是接口,Executors是类),提供了线程池的操作的成员函数。

使用 Executors 创建线程池类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {

private static final Logger LOG = LoggerFactory.getLogger(ThreadPool.class);
private static Map<String,ThreadPool> pools = new HashMap<String,ThreadPool>();
private int threadCount = 0;
private String name = null;;
ExecutorService service = null;
private ThreadPool(String name,int count){
this.threadCount = count;
this.name = name;
service = Executors.newFixedThreadPool(this.threadCount);
LOG.info("初始化线程池,线程数["+this.name+"]:"+this.threadCount);
}

public static ThreadPool createThreadPool(String name,int count){
ThreadPool pool = pools.get(name);
if(pool == null){
pool = new ThreadPool(name,count);
pools.put(name, pool);
}
return pool;
}

public void execute(Runnable r) {
service.execute(r);
}
public void destroy() {
service.shutdown();
pools.remove(this.getName());
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

其中,看下 newFixedThreadPool 方法的源码

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

这是 ThreadPoolExecutor 类的一个实现,再看下构造方法的参数

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
  • corePoolSize 核心线程数。核心线程会一直存活,即使没有任务需要处理。即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。
    核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出。
  • maximumPoolSize 最大的线程数。当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maximumPoolSize。如果线程数已等于maximumPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
  • keepAliveTime 存活时间。当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。

allowCoreThreadTimeout 属性标识默认情况下核心线程不会退出,可通过将该参数设置为true,让核心线程也退出。

测试例子

1
2
3
4
5
6
ThreadPool threadPool = ThreadPool.createThreadPool("testThread", 10);
TaskThread task[] = new TaskThread[20];
for(int i=0; i< task.length; i++){
task[i] = new TaskThread("task_"+i);
threadPool.execute(task[i]);
}

ThreadPoolExecutor 其他相关

ThreadPoolExecutor 本身可以指定创建活跃线程数、限制线程池的大小,还可以创建自己的 RejectedExecutionHandler 来处理不适合放在工作队列里的任务。

终止策略 RejectedExecutionHandler 实现类,当提交的任务无法进入等待队列且线程池中创建的线程数量已经达到了最大线程数量的限制,则会拒绝新提交的任务

1
2
3
4
5
6
7
8
9
10
11
12
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

private static final Logger LOG = LoggerFactory.getLogger(RejectedExecutionHandlerImpl.class);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
LOG.info(r.toString() + "不能执行。");
}
}

完整测试例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskTest {

private static final Logger LOG = LoggerFactory.getLogger(TaskTest.class);

public static void main(String[] args) {

/*WorkQueue wq = WorkQueue.createWorkQueue("thread", 10);
TaskThread task[] = new TaskThread[20];
for(int i=0; i<task.length; i++){
task[i] = new TaskThread("task_"+i);
wq.execute(task[i]);
}*/

/*ThreadPool threadPool = ThreadPool.createThreadPool("testThread", 10);
TaskThread task[] = new TaskThread[20];
for(int i=0; i<task.length; i++){
task[i] = new TaskThread("task_"+i);
threadPool.execute(task[i]);
}*/

RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 10,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
//TaskThread task[] = new TaskThread[4];//少数量任务
TaskThread task[] = new TaskThread[20];
for(int i=0; i<task.length; i++){
task[i] = new TaskThread("task_"+i);
threadPool.execute(task[i]);
}
}
}

线程池线程核心数设置为2,最大线程数设置为4,当执行少数量任务时线程池未饱和不会触发终止策略,而执行大量任务就可能会触发,比如以上测试例子的4个任务和20任务的区别。