likes
comments
collection
share

Java线程详解:线程模型,Thread类,异常处理器,异步执行结果Future

作者站长头像
站长
· 阅读数 12

认识Java世界的线程

Java线程模型

线程是在进程中执行的一个任务。

JVM规范里是没有规定的——具体实现用1:1(内核线程)、N:1(用户线程)、M:N(混合)模型的任何一种都完全OK。Java并不暴露出不同线程模型的区别,上层应用是感知不到差异的。

HotSpot VM。在这个JVM的较新版本所支持的所有平台上,它都是使用1:1线程模型的

Solaris之外,它是个特例,支持M:N模型。Solaris是SUN公司研制的类Unix操作系统

因此,一般情况下,Java的线程模型是一对一模型,Java的线程是内核线程

Java线程详解:线程模型,Thread类,异常处理器,异步执行结果Future

glibc中的pthread_create方法主要是创建一个OS内核级线程,我们不深入细节,主要是为该线程分配了栈资源;需要注意的是这个栈资源对于JVM而言是堆外内存,因此堆外内存的大小会影响JVM可以创建的线程数。

Thread类

核心字段

线程ID

private long tid;
public long getId() {  return tid;  }

线程组

private ThreadGroup group;

ThreadGroup管理着它下面的Thread,ThreadGroup是一个标准的向下引用的树状结构,这样设计的原因是防止"上级"线程被"下级"线程引用而无法有效地被GC回收。我们可以使用线程组对线程进行批量控制。每个Thread必然存在于一个ThreadGroup中,Thread不能独立于ThreadGroup存在。默认将父线程(当前执行new Thread的线程)线程组设置为自己的线程组。

线程的优先级

private int priority;

默认的线程优先级为5。Java程序中对线程所设置的优先级只是给操作系统一个建议,操作系统不一定会采纳。而真正的调用顺序,是由操作系统的线程调度算法决定的。只能说高优先级的线程将会比低优先级的线程有更高的几率得到执行。我们使用方法Thread类的setPriority()实例方法来设定线程的优先级。

    // 默认5 最小1 最大10 
     public final static int MIN_PRIORITY = 1;
     public final static int NORM_PRIORITY = 5;
     public final static int MAX_PRIORITY = 10;

如果某个线程优先级大于线程所在线程组的最大优先级,那么该线程的优先级将会失效,取而代之的是线程组的最大优先级。

守护线程(Daemon)

    private boolean daemon = false;

守护线程默认的优先级比较低。如果所有的非守护线程结束,这个守护线程也会自动结束。应用场景是:当所有非守护线程结束时,结束其余的子线程(守护线程)自动关闭,就免去了还要继续关闭子线程的麻烦。

线程执行的方法体:target

  private Runnable target;

线程状态:threadStatus

    private volatile int threadStatus = 0;

    // 一个thread不能调用两次start方法,start方法会先判断State是否为NEW。而执行完了是TERMINATED状态
    public enum State {
        NEW,         // 还未调用start方法
        RUNNABLE,    // 运行  
        BLOCKED,     // 阻塞  等待前面的线程执行完才轮到你,至少你还在排队
        WAITING,     // 等待  调用wait或者join进入,你都不在排队,只有其他人notify你,你才能去排队
        TIMED_WAITING,//超时等待状态  等待一个具体的时间,时间到后会被自动唤醒,不需要别人notify
        TERMINATED;   //执行完毕
    }
    // threadStatus 的值 对应的 State,也是利用了位运算
        public static State toThreadState(int var0) {
            if ((var0 & 4) != 0) {
                return State.RUNNABLE;
            } else if ((var0 & 1024) != 0) {
                return State.BLOCKED;
            } else if ((var0 & 16) != 0) {
                return State.WAITING;
            } else if ((var0 & 32) != 0) {
                return State.TIMED_WAITING;
            } else if ((var0 & 2) != 0) {
                return State.TERMINATED;
            } else {
                return (var0 & 1) == 0 ? State.NEW : State.RUNNABLE;
            }
        }

线程状态转换图(来自《Java 并发编程 78 讲》)

Java线程详解:线程模型,Thread类,异常处理器,异步执行结果Future

ThreadLocalMap:本地变量表

Thread内部有两个ThreadLocalMap,一个是inheritable的

    ThreadLocal.ThreadLocalMap threadLocals = null;
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

关于ThreadLocal后面会单独开一章讲,这里有个印象:ThreadLocalMap是存储在Thread内部的

线程上下文类加载器

private ClassLoader contextClassLoader;

首先,主线程的contextClassLoaderAppClassLoader

其次,所有创建的线程的contextClassLoader,都被init为父线程的contextClassLoader

因此,如果不手动set,所有线程的contextClassLoader都为AppClassLoader

上下文类加载器有啥用呢?

不了解SPI机制的可以参考:Java SPI 机制详解

比如SPI机制,bootstrap类加载器会负责加载ServiceLoader类,ServiceLoader又要负责加载SPI的实现类,而SPI的实现类不应该由bootstrap类加载器来加载,此时就需要打破双亲委派机制,交给线程上下文类加载器(也就是App)来加载这些实现类。


    public static <S> ServiceLoader<S> load(Class<S> service) {
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        return ServiceLoader.load(service, cl);
    }

希望更深入理解「上下文类加载器」可以参考:

异常处理器

Thread内部定义了UncaughtExceptionHandler接口,用来处理线程发生意外可能抛出的异常,是个函数式接口

    @FunctionalInterface
    public interface UncaughtExceptionHandler {
        // 当线程t抛出异常e,会调用uncaughtException方法来回调处理
        void uncaughtException(Thread t, Throwable e);
    }

Thread内持有两个UncaughtExceptionHandler的引用

    // null unless explicitly set 除非明确指定否则为null,并提供了对应的set/get方法
    private volatile UncaughtExceptionHandler uncaughtExceptionHandler;
    private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;

异常处理器是如何生效的

    // Dispatch an uncaught exception to the handler.
    // This method is intended to be called only by the JVM.
    // 仅被JVM调用,分发一个UncaughtException
    private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }


    1. getUncaughtExceptionHandler: 如果没有手动设置,返回线程组ThreadGroup
        // class ThreadGroup implements Thread.UncaughtExceptionHandler
    2. uncaughtException 处理异常
         2.1 线程组让他的parent父递归调用uncaughtException方法,然后获取该线程的                          DefaultUncaughtExceptionHandler来uncaughtException
            如果DefaultUncaughtExceptionHandler为null 就会报一个我们很熟悉的错:
                 System.err.print("Exception in thread ""
                                     + t.getName() + "" ");
                    e.printStackTrace(System.err);
线程异常处理小结

在线程发生异常时,一般来说无法被外部线程(调用该线程start方法的线程)捕获,必须由线程自己处理。

当不得不向方法调用者抛异常时,JVM会回调dispatchUncaughtException方法:

  • 如果有UncaughtExceptionHandler,就让他处理;
  • 如果没有,先找到父线程组,再交给defaultUncaughtExceptionHandler处理;
  • 如果两个NEH都没有,就只能抛出异常了。

核心方法

静态方法

1、获取当前线程:currentThread
public static native Thread currentThread();
2、暂停线程:sleep

支持mills,nanos

3、让出时间片:yield

当前线程让出对cpu的占用,进入可运行状态,不是阻塞状态。 此时其他相同优先级或更高的线程有可能获得运行的机会。那当然也有可能当前线程继续占用CPU。

public static native void yield();

yield不会释放当前线程持有的锁资源

实例方法

4、启动线程:start

本地方法start0,会调用pthread_create(...)这个内核函数,从而创建出一条内核线程,然后让Java线程,和内核线程产生映射关系,让内核线程去执行run方法

在「Java线程模型」中我们已经分析过Java的线程与内核线程是一对一对应的。

一个线程不能两次start

线程调用start()之后再次调用start(),会抛出IllegalThreadStateException

    // start会对线程状态做一个判断,不是NEW,就抛异常
    if (threadStatus != 0)
        throw new IllegalThreadStateException();
5、中断线程:interrupt

这里的中断线程并不会立即停止线程,而是设置线程的中断状态为true。具体被要求中断的线程要怎么处理,完全由被中断线程自己而定,可以在合适的时机处理中断请求,也可以完全不处理继续执行下去。

    public void interrupt() {
        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();//看注释理解为 仅仅是设置一下interrupt flag
    }

可以看到,这里是保障了线程的安全的。

6、提前唤醒sleep的线程

有时,我们Thread.sleep()过久,需要提前唤醒它,仍然是通过interrupt方法,抛出InterruptedException,从而在catch代码块内继续执行。

                    try{
                        Thread.sleep(1000 * 60 * 60);//睡眠   
                    }catch(InterruptedException e){// 捕获中断异常
                        e.printStackTrace();
                    }
    //此时另一个线程执行thread.interrupt(); 即可唤醒   
7、等待:join

使当前线程等待另一个线程执行完毕之后再继续执行,内部调用的是Object类的wait方法实现的

因为用了wait/notify 会释放锁

    // 锁的是线程对象
    public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;
        // 0 直到调用join的线程执行完毕,当前线程才可以执行
        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

线程的创建方式

这是个被问烂了的面试题,但实际上网上的标准答案大都混淆了一些概念

先说结论:只有new Thread().start()方法这一种创建线程的方式

start方法才会通过操作系统真实创建出一个内核线程

传统的面试答案:

  1. 继承 Thread类
  2. 实现Runnable
  3. 实现Callable
  4. 线程池

Runnable创建

    public class ImplementsRunnable implements Runnable {
        @Override
        public void run() {  System.out.println("2......"); }
        // implements Runnable 实现run方法
        public static void main(String[] args) {
            ImplementsRunnable runnable = new ImplementsRunnable();
            // 还是Thread.start();
            new Thread(runnable).start();
            // ------还可以lambda,本质也是runnable-------------    
            new Thread(() -> {
                System.out.println(111);
            }).start();
            // 因为Runnable是个函数式接口
            // 构造方法:public Thread(Runnable target) {} 
        }
        
    }

Callable创建

关于Callable我们后面会具体分析,这里只需要知道可以用来封装线程体即可

    public class ImplementsCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("3......");
            return "";
        }
        public static void main(String[] args) throws Exception {
            ImplementsCallable callable = new ImplementsCallable();
            FutureTask<String> futureTask = new FutureTask<>(callable);
            new Thread(futureTask).start();
            System.out.println(futureTask.get());
        }
    }

但实际上,实现Runnable接口,重写了run方法,这是一个线程体,就是线程要执行的任务,并不是真正的线程,所有这些方法本质都是利用new Thread().start()方法,才成为一个线程

线程体的概念

线程体也是一段普通的代码,像我们实现了Runnable接口,然后直接调用run方法,就和调用普通方法没有区别,但是,线程体是逻辑上的,专门交给线程去执行的代码,而不是当作普通方法去调用的(尽管JVM不会阻止你这样做)。因此,实现Runnable接口,本质是对线程体的封装,并不是创建线程。其他方式也是类似,或者封装Thread.start方法。

因此,本质上只有一种线程的创建方式。

线程的初始化

前面废了很多笔墨,说明了线程的创建只有new Thread().start()一种,而start方法在「Java线程模型」中已经分析过,因此我们来关注Thread的构造方法。

所有构造方法转化为对init方法的调用:


    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
        //  默认线程名为:"Thread-" + nextThreadNum()
        // 父线程 就是创建该线程的线程
        Thread parent = currentThread();
        SecurityManager security = System.getSecurityManager();
        // g就是线程组
        if (g == null) {
            if (security != null) {
                g = security.getThreadGroup();
            }
            // 父线程的线程组
            if (g == null) {
                g = parent.getThreadGroup();
            }
        }
        // 初始化线程的一些字段
        this.group = g;
        this.daemon = parent.isDaemon();
        this.priority = parent.getPriority();
        if (security == null || isCCLOverridden(parent.getClass()))
            this.contextClassLoader = parent.getContextClassLoader();
        else
            this.contextClassLoader = parent.contextClassLoader;
        this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();
        this.target = target;
        setPriority(priority);
        // 初始化inheritableThreadLocals,具体讲到ThreadLocal会在分析
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        // 线程栈的大小
        this.stackSize = stackSize;
        // 貌似stackSize的值是没法根据单个线程自定义的,可以参考R大的回答:                            // https://www.zhihu.com/question/21776758/answer/19285132
        // 线程id会不断原子地++
        tid = nextThreadID();
    }

Java异步模型

前面用runnable封装线程体并执行,有一个缺点是:我们不知道线程异步执行的执行结果。比如MQ:

  • 我们需要知道,向MQ发消息的返回值,成功 or 失败,这个结果在调用方法时未知

因此,我们需要一个线程的执行是有返回值的,并且返回值代表一个未来才可以知道的结果。

Callable接口:有返回值的Runnable

直接看源码,发现Callable接口同样是个函数式接口,提供了call方法,有返回值。

    @FunctionalInterface
    public interface Runnable {
        public abstract void run();
    }
    @FunctionalInterface //函数式接口
    public interface Callable<V> {
        V call() throws Exception;
    }

我们知道Thread类实现了Runnable接口,也有基于Runnable的构造函数,但是没发现Callable,那么我们该如何利用Callable创建线程呢?

答案是:结合Future,这就是所谓的“异步”模型。

Future:异步执行的结果

Future接口代表异步计算的结果。

    public interface Future<V> {
        // 尝试取消异步任务的执行(不是一定取消),参数表示是否用中断的方式取消,
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        // 获取异步执行的结果,支持超时时间
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }

我们异步地让一个线程执行子任务,在拿到它的执行结果时,我们并不能确定该线程已经执行完毕返回结果,因此Future就是获取执行结果的手段,提供了两种get方法,支持超时。

RunnableFuture

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }
    // 同时继承了Runnable, Future<V>

这个接口就是关键了:

  • 既需要实现Runnable,代表可以被线程执行
  • 又实现了Future,即可以获取线程的异步执行结果。

而RunnableFuture最关键的实现类之一就是FutureTask

FutureTask:有返回值的任务

FutureTask为 Future 提供了基础实现。因为FutureTask实现了Runnable接口,因此可以被线程直接执行。

因此:FutureTask将线程体(callable或runnable)封装成「任务task」,并且支持获取任务的执行结果

核心字段

    // 任务执行的状态
    private volatile int state;
    // callable接口
    private Callable<V> callable;
    // 线程的运行结果
    private Object outcome; 
    // 执行callable接口的call方法的线程
    private volatile Thread runner;
    // 因调用get没有获取到结果而阻塞等待的线程
    private volatile WaitNode waiters;

任务执行状态

有如下7种。

    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

中间状态:

  • NEW:任务执行前或执行中
  • COMPLETING:任务执行完毕,但还没有将结果保存到outcome

最终状态:

  • NORMAL:正常执行完毕
  • EXCEPTIONAL:执行完毕,但抛出异常
  • CANCELLED:任务被取消
  • INTERRUPTED:线程被中断(线程中断会导致状态先变为5再变为6)

构造方法:支持Callable

有两个构造方法

    public FutureTask(Callable<V> callable) {
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

会把Runnable封装成一个Callable

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

实现原理

核心方法:run

run方法是对callable接口的call方法的封装

    public void run() {
        try {
            // c就是callable接口
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // 异常处理 
                    result = null;
                    ran = false;
                    // 异常也被封装在outcome字段里
                    setException(ex);
                }
            // 设置异步的执行结果,用CAS保证线程安全
                if (ran)
                    set(result);
        } finally {
            runner = null;
            if (state >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

set/setException方法内部会执行finishCompletion();唤醒阻塞的线程

获取执行结果:get

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 无止境地阻塞下去
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        // report方法,如果outcome是异常会抛出
        return report(s);
    }

取消任务:cancel

    // 参数是是否允许中断的意思
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {        finishCompletion();    }
        return true;
    }

首先,cancel的前提必须是线程状态为NEW。(NEW表示执行前或执行中)

  • 如果允许中断,CAS修改状态为INTERRUPTING,中断线程,再CAS修改为INTERRUPTED
  • 如果不允许,CAS修改状态为CANCELLED

FutureTask的局限性

要想拿到FutureTask的执行结果,线程要么轮询isDone方法,要么调用FutureTask.get()方法,这不是真正意义上的异步,这会导致线程要么阻塞,要么白白消耗CPU资源。

我们希望真正的异步是:线程可以正常做别的工作,等到拿到异步任务的执行结果,自动回调处理函数。

最后挖个坑,有空来讲讲JDK8引入的真正的异步:CompletableFuture,Fork/Join

参考文档

深入浅出Java多线程

《Java并发编程的艺术》

JVM中的线程模型是用户级的么?- RednaxelaFX

转载自:https://juejin.cn/post/7281459744602587190
评论
请登录