likes
comments
collection
share

java线程池ThreadPoolExecutor类使用详解

作者站长头像
站长
· 阅读数 6

一、ThreadPoolExecutor类使用详解

扩展说明

在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。 池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()newSingleThreadExecutor()newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

ThreadPoolExecutor构造函数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造函数的参数含义如下:

  1. corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
  2. maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量
  3. keepAliveTime:当线程池中的空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;
  4. unit:keepAliveTime的单位
  5. workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
  6. threadFactory:线程工厂,用于创建线程,一般用默认即可;
  7. handler:拒绝策略,当任务太多来不及处理时,如何拒绝任务。

二、类的重要参数详解

接下来我们对其中比较重要的参数做进一步的了解(配合实例)

2.1、Queue任务队列

上面已经介绍过了,这里有几种队列类型,提交队列、有界任务队列、无界任务队列、优先任务队列

  • 提交队列SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要在执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。
public class ThreadPool {
    private static ExecutorService pool;
    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
	for (int i = 0; i < 3; i++) {
            pool.execute(new ThreadTask());	
        }
    }
}

public class ThreadTask implements Runnable {	
    public ThreadTask() {}
    public void run() {	
	System.out.println(Thread.currentThread().getName());		
    }
}

输出结果为

pool-1-thread-1
Exception in thread "main" pool-1-thread-2
java.util.concurrent.RejectedExecutionException: Task com.xxx.threadtest.test1.ThreadTask@5c647e05 rejected from java.util.concurrent.ThreadPoolExecutor@33909752[Running, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 1]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.xxx.threadtest.test1.ThreadPool.main(ThreadPool.java:16)

可以看到,当任务队列为SynchronousQueue,创建的线程数大于maximumPoolSize时,直接执行了拒绝策略抛出异常。 使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于等于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易会执行拒绝策略。

  • 有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现,如下所示:
public class ThreadPool2 {
    private static ExecutorService pool;
    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < 3; i++) {
            pool.execute(new ThreadTask());
	}
    }
}

输出结果如下:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系。如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。(这里可以多尝试几组数据) 如果队列容量设置为1,其它不变的时候

new ArrayBlockingQueue<Runnable>(1)

输出结果:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-2

可以看到输出结果是创建了2个线程的。 如果队列容量设置为2,线程数设置为10,就会看到因为线程池不够大,等待队列的容量也不够而出现拒绝策略的输出结果。

pool-1-thread-1
pool-1-thread-1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.xxx.threadtest.test1.ThreadTask@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 1]
pool-1-thread-2
pool-1-thread-1
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.xxx.threadtest.test2.ThreadPool2.main(ThreadPool2.java:18)
  • 无界的任务队列:无界任务队列可以使用LinkedBlockingQueue实现,如下所示:
public class ThreadPool3 {
    private static ExecutorService pool;
    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < 3; i++) {
            pool.execute(new ThreadTask());
        }
    }
}

输出结果如下:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接金融队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理直接的协调控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

  • 优先任务队列:优先任务队列通过PriorityBlockingQueue实现,下面我们通过一个例子演示一下
public class ThreadPool4 {
    private static ExecutorService pool;
    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < 20; i++) {
            pool.execute(new ThreadTask1(i));
        }
    }
}

public class ThreadTask1 implements Runnable, Comparable<ThreadTask1> {
    private int priority;
    public int getPriority() {
	return priority;
    }
    public void setPriority(int priority) {
	this.priority = priority;
    }
    public ThreadTask1() {
    }
    public ThreadTask1(int priority) {
	this.priority = priority;
    }
    public int compareTo(ThreadTask o) {
	// TODO Auto-generated method stub
	return 0;
    }
    // 当前对象和其它对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高
    public int compareTo(ThreadTask1 o) {	
	return this.priority > o.priority?-1:1;	
    }
    public void run() {	
	try {	
            Thread.sleep(1000);	
            System.out.println("priority: " + this.priority + ", ThreadName: " + Thread.currentThread().getName());		
	} catch (InterruptedException e) {		
            e.printStackTrace();	
	}
    }
}

执行结果

priority: 0, ThreadName: pool-1-thread-1
priority: 19, ThreadName: pool-1-thread-1
priority: 18, ThreadName: pool-1-thread-1
priority: 17, ThreadName: pool-1-thread-1
priority: 16, ThreadName: pool-1-thread-1
priority: 15, ThreadName: pool-1-thread-1
priority: 14, ThreadName: pool-1-thread-1
priority: 13, ThreadName: pool-1-thread-1
priority: 12, ThreadName: pool-1-thread-1
priority: 11, ThreadName: pool-1-thread-1
priority: 10, ThreadName: pool-1-thread-1
priority: 9, ThreadName: pool-1-thread-1
priority: 8, ThreadName: pool-1-thread-1
priority: 7, ThreadName: pool-1-thread-1
priority: 6, ThreadName: pool-1-thread-1
priority: 5, ThreadName: pool-1-thread-1
priority: 4, ThreadName: pool-1-thread-1
priority: 3, ThreadName: pool-1-thread-1
priority: 2, ThreadName: pool-1-thread-1
priority: 1, ThreadName: pool-1-thread-1

大家可以看到除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。 通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

2.2、拒绝策略

一般我们创建线程池时,为防止资源耗尽,任务队列都会选择创建有界任务队列。但这种模式下如果出现任务队列已满,且线程池创建的线程数达到你设置的最大线程数时,就需要指定ThreadPoolExecutorRejectedExecutionHandler参数合理的拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略有以下几种:

  • AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;
  • CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中执行; 修改上面的有界队列拒绝策略为本策略,当队列容量为2,线程数为10的时候,输出结果如下:
main
main
main
main
main
main
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1

可以看到没有异常输出,并且输出有main线程。

  • DiscardOldestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交; 仍然是上面的例子,改完本策略,结果输出4个线程,但是服务一直等待不完成。这里具体会造成怎么样的结果,还不是很确定,如果说尝试再次提交,不应该只输出4个线程。
  • DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;
  • 自定义拒绝策略:以上策略均实现了RejectedExecutionHandler接口,当然我们也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略。示例代码:
public class ThreadPool5 {
    private static ExecutorService pool;
    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2),Executors.defaultThreadFactory(), new TestRejected());
        for (int i = 0; i < 10; i++) {
            pool.execute(new ThreadTask());
        }
    }
}

public class TestRejected implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
	System.out.println(r.toString() + "执行了拒绝策略");	
    }	
}

public class ThreadTask implements Runnable {	
    public ThreadTask() {}
    public void run() {	
	try {		
            Thread.sleep(1000); //让线程阻塞,使后续任务进入缓存队列	
            System.out.println(Thread.currentThread().getName());	
	} catch (InterruptedException e) {		
            e.printStackTrace();	
	}		
    }
}

输出结果如下:

com.xxx.threadtest.test1.ThreadTask@70dea4e执行了拒绝策略
com.xxx.threadtest.test1.ThreadTask@5c647e05执行了拒绝策略
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
com.xxx.threadtest.test1.ThreadTask@33909752执行了拒绝策略
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2

可以看到由于任务加了休眠阻塞,执行需要花费一定时间,导致会有一定的任务被丢弃,从而执行自定义的拒绝策略;

2.3、ThradFactory自定义线程创建

线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等,下面代码我们通过ThreadFactory对线程池中创建的线程进行记录与命名

public class ThreadPool6 {
    private static ExecutorService pool;
    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),new TestFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
	for (int i = 0; i < 10; i++) {
            pool.execute(new ThreadTask());
	}
    }
}

public class TestFactory implements ThreadFactory {
    public Thread newThread(Runnable r) {
	System.out.println("线程" + r.hashCode() + "创建");
	// 线程命名
	Thread th = new Thread(r, "threadPool" + r.hashCode());
	return th;
    }
}

输出结果:

线程2018699554创建
线程1311053135创建
线程118352462创建
线程1550089733创建
threadPool2018699554
threadPool118352462
threadPool1311053135
main
threadPool1550089733
threadPool2018699554
threadPool1550089733
threadPool1311053135
threadPool118352462
threadPool2018699554

可以看到线程池中,每个线程的创建我们都进行了记录输出与命名。

2.4、ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecutor()afterExecutor()terminated()三个接口实现的。

  • beforeExecute:线程池中任务运行前执行
  • afterExecute:线程池中任务运行完毕后执行
  • terminated:线程池退出后执行 通过这三个接口,我们可以监控每个任务的开始和结束时间,或者其他一些功能。示例代码:
public class ThreadPool7 {
    private static ExecutorService pool;
    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),new TestFactory(), new ThreadPoolExecutor.CallerRunsPolicy()) {
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:" + ((ThreadTask7) r).getTaskName());
            }
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("准备完毕:" + ((ThreadTask7) r).getTaskName());
            }
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };
        for (int i = 0; i < 10; i++) {
            pool.execute(new ThreadTask7("Task" + i));
        }
        pool.shutdown();
    }
}

public class ThreadTask7 implements Runnable {
    private String taskName;
    public String getTaskName() {
	return taskName;
    }
    public void setTaskName(String taskName) {
	this.taskName = taskName;
    }
    public ThreadTask7(String name) {	
	this.setTaskName(name);	
    }
    public void run() {
	System.out.println("TaskName " + this.getTaskName() + "---ThreadName: " + Thread.currentThread().getName());	
    }
}

输出结果:

准备执行:Task0
准备执行:Task1
线程1550089733创建
TaskName Task0---ThreadName: threadPool2018699554
准备执行:Task7
TaskName Task7---ThreadName: threadPool118352462
准备完毕:Task7
TaskName Task1---ThreadName: threadPool1311053135
准备完毕:Task1
TaskName Task9---ThreadName: main
准备完毕:Task0
准备执行:Task3
准备执行:Task8
准备执行:Task2
TaskName Task2---ThreadName: threadPool118352462
准备完毕:Task2
TaskName Task8---ThreadName: threadPool1550089733
准备完毕:Task8
TaskName Task3---ThreadName: threadPool1311053135
准备完毕:Task3
准备执行:Task4
准备执行:Task6
TaskName Task6---ThreadName: threadPool1550089733
准备完毕:Task6
准备执行:Task5
TaskName Task5---ThreadName: threadPool118352462
TaskName Task4---ThreadName: threadPool2018699554
准备完毕:Task5
准备完毕:Task4
线程池退出

可以看到beforExecutor()afterExecutor()terminated()的实现,我们对线程池中线程的运行状态进行了监控,在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池,当线程池调用该方法后,线程池不再接受后续添加的任务。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

2.5、线程池线程数量

线程吃线程数量的设置没有一个明确的指标,根据实际情况,只要不是设置的偏大和偏小都问题不大,结合下面这个公式即可

/**
 * Nthreads=CPU数量
 * Ucpu=目标CPU的使用率,0<=Ucpu<=1
 * W/C=任务等待时间与任务计算时间的比率
 */
Nthreads = Ncpu*Ucpu*(1+W/C)

参考:

www.cnblogs.com/dafanjoy/p/… docs.oracle.com/javase/8/do…