Skip to content

java 线程池,Future,ThreadLocal

Updated: at 04:12 PM

自定义线程池

    /**
     * 用给定的初始参数创建一个新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                              int maximumPoolSize,//线程池的最大线程数
                              long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                              TimeUnit unit,//时间单位
                              BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
                              ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                              RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                               ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

参数介绍

1.corePoolSize:任务队列没有达到队列容量的时候,最大可以同时运行的线程数量

2.maximumPoolSize:任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数

3.workQueue:新任务来的时候会先判断当前线程数量是否达到核心线程数,或者核心线程数是否还有空闲,如果没有空闲了则加入到workQueue队列中

4.keepAliveTime:线程中的线程数量大于corePoolSize的时候,如果没有新的任务提交,多余的空闲线程会等到keepAliveTime之后才会回收销毁。线程池收回线程时,对核心线程和非核心线程一视同仁,直到线程池的数量等于corePoolSize才停止。

5.unit:keepAliveTime的时间单位

6.threadFactory:创建新线程的创建工厂

7.handler:拒绝策略,待会介绍

总结

一开始线程池线程数为corePoolSize个,提交的任务如果核心线程处理不过来会加入到阻塞队列中,如果阻塞队列满了则会创建非核心线程来处理,非核心线程如果空闲了会在keepAliveTime之后被回收。非核心线程如果再处理不过来则通过拒绝策略处理。

拒绝策略

  1. ThreadPoolExecutor.AbortPolicy : 抛出RejectExecutionException拒绝新的任务处理

  2. ThreadPoolExecutor.CallerRunsPolicy : 调用执行自己的线程运行任务,也就时直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行线程已关闭,则会丢弃该任务。因此这种策略会降低对于新任务的提交速度,影响程序的整体性能。

  3. ThreadPoolExecutor.DiscardPolicy : 不处理新任务,直接丢弃掉

  4. ThreadPoolExecutor.DiscardOldestPolicy : 此策略将丢弃最早的未处理的任务请求

阻塞队列

  1. LinkedBlockingQueue(无界队列):容量为Integer.MAX_VALUE

  2. SynchronousQueue(同步队列):没有容量,即不存储元素。(一手交钱,一手交货)

  3. DelayWorkQueue(延迟阻塞队列):内部的元素并不是按照放入的时间顺序排序,而是会按照延迟的时间长短对任务进行排序,内部是堆。保证每次出任务的都是当前队列中马上要被执行的。

  4. PriorityBlockingQueue(优先级阻塞队列):可以看作是线程安全的PriorityQueue,实现Comparable接口或者传入Comparator对象来实现排序,无界。

可以看到上面的队列除了无界就是无空间的。

常用线程池

Executor框架工具类Executors来创建:

  1. FixedThreadPool:该方法返回一个固定线程数量的线程池。核心线程数=最大线程数,使用LinkedBlockingQueue,若有空余线程则执行任务,没有线程则在任务队列里等待,直到有线程空余出来执行(不会创建多余的线程)。

  2. SingleThreadExecutor:该方法返回一个只有一个线程的线程池。与FixedThreadPool本质一样,只是线程数是1。若多余的一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行任务。

  3. CachedThreadPool:该方法返回一个可根据实际情况调整线程数量的线程池。初始大小为0,队列是synchronousQueue,即队列不可以放任务,如果线程不够用就创建,线程空闲了超过一定时间就销毁。本质是corePoolSize = 0,maxPoolSize = Integer.MAX_VALUE,超时时间60s

  4. ScheduledThreadPool:定期执行任务,使用的队列是DelayedWorkQueue,corePoolSize给定,最大线程数Integer.MAX_VALUE

Future

主要用于异步执行任务

// V 代表了Future执行的任务返回值的类型
public interface Future<V> {
    // 取消任务执行
    // 成功取消返回 true,否则返回 false
    boolean cancel(boolean mayInterruptIfRunning);
    // 判断任务是否被取消
    boolean isCancelled();
    // 判断任务是否已经执行完成
    boolean isDone();
    // 获取任务执行结果
    V get() throws InterruptedException, ExecutionException;
    // 指定时间内没有返回计算结果就抛出 TimeOutException 异常
    V get(long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutExceptio

}

可以看到,任务可以取消,可以获取结果

Callable、Runnable和Future的关系

Runnable、Callable都代表了一段可执行的任务

(1) Runnable需要实现run()方法,且没有返回值

(2) callable需要实现call()方法,但是有返回值T

线程池可以接受Runnable也可以接受Callable

submit方法的定义:

(1) Future submit(Callable task); 对于Callable任务,submit会返回一个Future对象,用于获取任务的返回值,线程池会调用call()方法

(2) Future<?> submit(Runnable task); 对于Runnable任务,submit也会返回一个Future对象,但是没有返回值,Future.get()为null,线程池会调用run()方法

其实runnable和callable可以通过Executors.callable(runnable, result)转换的

FutureTask是Future接口的一个具体实现类(Executors.submit()返回的其实是Future的具体实现类FutureTask)

FutureTask有两个构造函数,即可以接受Callable,也可以接受Runnable。其实接受Runnable参数本质也会通过Executors.callable()方法转化为Callable接口

ThreadLocal

ThreadLocal里面有一个关键的变量ThreadLocalMap(一个HashMap)

你可以会任务所有线程的ThreadLocal共用一个ThreadLocalMap,key是Thread,value是值,但这是老版本的实现了

新的版本一个Thread的一个成员变量是ThreadLocalMap,key对应具体的ThreadLocal变量。

下面给张图解释:

img

ThreadLocal的set()方法

public void set(T value) {
    //获取当前请求的线程
    Thread t = Thread.currentThread();
    //取出 Thread 类内部的 threadLocals 变量(哈希表结构)
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 将需要存储的值放入到这个哈希表中
        map.set(this, value);
    else
        createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

可以看到是先获得当前线程,然后得到map,其中的map.set(this,value) 的 this是ThreadLocal对象。

内存泄漏问题

ThreadLocalMap的key是弱引用,原因是:

如果key是强引用,那么只要线程存在,ThreadLocal对象就无法被回收。

设置弱引用后,可以gc清除掉key。ThreadLocal的get,set,remove方法都会显式的判断key是否为null,如果key为null了则value也会被回收(value依然是强引用)。

img