likes
comments
collection
share

CompletableFuture使用

作者站长头像
站长
· 阅读数 23

前言

CompletableFuture是jdk8提供的新属性,用于提供异步逻辑,提高代码执行效率,里面提供了丰富的方法可以使用

CompletableFuture注意

CompletableFuture是一个守护线程来着,也就是说,如果main方法执行结束,它也会跟着结束,例如:


import java.util.concurrent.CompletableFuture;

public class CompletableFuture1Demo {

    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("hello world");
            return "result";
        });
    }
}

多执行几次,会发现,有可能这串代码,压根就不输出

CompletableFuture方法使用

supplyAsync,返回值

在以前线程中,要拿到线程的返回值,一般可以使用Future获取线程的返回值,CompletableFuture也可以实现


public class CompletableFutureDemo {


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello world");
            return "success";
        });

        String result = completableFuture.get();
        System.out.println(result);
    }

}


可以拿到线程的返回值 备注,其中get()方法可以设置等待时间,如

public class CompletableFuture4Demo {


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
            System.out.println("哈哈哈========");
            return "result";
        });

        try {
            String result = cf.get(1000, TimeUnit.NANOSECONDS);
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

runAsync,不带返回值


public class CompletableFuture2Demo {


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("hello world");
        });
        completableFuture.join();
    }

}

runAsync()方法实现是不带返回值的,但是CompletableFuture是守护线程,所以可以使用join()等待线程执行结束

runAsync支持传入自定义线程池

对于runAsync方法,提供了传入自定义线程池,没传的话,默认使用ForkJoinPool.commonPool()线程池

public class CompletableFuture3Demo {


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("hello world");
        }, executorService);
        completableFuture.join();
    }

}


不建议使用jdk使用默认线程池

public class ThenApplyDemo {

    public static void main(String[] args) {
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
            System.out.println("test");
            return "hello";
        }).thenApply((x) -> {
            System.out.println(x);
            x = x + " world";
            return x;
        });
        try {
            String result = cf.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }

    }
}

thenAcceptAsync使用

thenAcceptAsync是没有返回值的


import java.util.concurrent.CompletableFuture;

public class ThenApplyAsyncDemo {

    public static void main(String[] args) {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("test");
            return "hello";
        }).thenAcceptAsync((x) -> {
            x = x + " world";
            System.out.println(x);
        });
        completableFuture.join();


    }
}


thenAcceptAsync传入线程池



import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ThenCompose1Demo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("test");
            return "hello";
        }).thenAcceptAsync(x -> {
            x = x + ":aaa";
            System.out.println(x);
        }, executorService);

        completableFuture.join();
    }
}


thenCompose使用

thenCompose用于连接两个CompletableFuture,例

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenComposeDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("test");
            return "hello";
        }).thenCompose(x -> CompletableFuture.supplyAsync(() -> {
            System.out.println(x);
            return "world";
        }));
        try {
            String result = completableFuture.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }


    }
}

thenRun使用

thenRun表示CompletableFuture执行结束之后要执行的动作,不用任何传参和返回


import java.util.concurrent.CompletableFuture;

public class ThenRunDemo {

    public static void main(String[] args) {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行1");
            return 1;
        });

        CompletableFuture<Void> completableFuture2 = completableFuture1.thenRun(() -> {
            System.out.println("执行2");
        });

        completableFuture2.join();
    }
}

thenRunAsync使用

同理,thenRunAsync不传入线程池,使用默认线程池

import java.util.concurrent.CompletableFuture;

public class ThenRunAsyncDemo {

    public static void main(String[] args) {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(1);
            return 1;
        });

        CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
            System.out.println(2);
        });
        cf2.join();

    }
}

thenRunAsync传入自定义线程池

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ThenRunAsync1Demo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(1);
            return 1;
        });

        CompletableFuture<Void> completableFuture2 = completableFuture.thenRunAsync(() -> {
            System.out.println(2);
        }, executorService);
        completableFuture2.join();

    }
}

exceptionally使用

exceptionally用于执行异常处理,例

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ExceptionDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("哈哈哈========");
            int a = 1 / 0;
            return "result";
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "xxx";
        });

        try {
            String result = completableFuture.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}


whenComplete使用

CompletableFuture执行时,无论任务正常还是异常,它都会调用whenComplete

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class WhenCompleteDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("======111===========");
            return "success";
        }).whenComplete((s, throwable) -> {
            System.out.println("结果为:" + s);
            if (Objects.nonNull(throwable)) {
                System.out.println(throwable.getMessage());
            }
        });
        try {
            String result = completableFuture.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}


异常为:



import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class WhenCompleteDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("======111===========");
            int a = 1 / 0;
            return "success";
        }).whenComplete((s, throwable) -> {
            System.out.println("结果为:" + s);
            if (Objects.nonNull(throwable)) {
                System.out.println(throwable.getMessage());
            }
        });
        try {
            String result = completableFuture.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}


whenCompleteAsync使用

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;



public class WhenCompleteAsyncDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("======111===========");
            int a = 1 / 0;
            return "success";
        }).whenCompleteAsync((s, throwable) -> {
            System.out.println("结果为:" + s);
            if (Objects.nonNull(throwable)) {
                System.out.println(throwable.getMessage());
            }
        });
        try {
            String result = completableFuture.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}


也能传入自定义线程池

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;



public class WhenCompleteAsync1Demo {

    public static void main(String[] args) {
        ExecutorService executorService= Executors.newSingleThreadExecutor();
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("======111===========");
            int a = 1 / 0;
            return "success";
        }).whenCompleteAsync((s, throwable) -> {
            System.out.println("结果为:" + s);
            if (Objects.nonNull(throwable)) {
                System.out.println(throwable.getMessage());
            }
        }, executorService);
        try {
            String result = completableFuture.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}


handle()使用

handle()是执行任务完成时对结果的处理


import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class HandleDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("============1111============");
            return "aaaa";
        }).handle((s, throwable) -> {
            System.out.println(s);
            if (Objects.nonNull(throwable)) {
                System.out.println(throwable.getMessage());
            }
            return "bbb";
        });
        try {
            String result = completableFuture.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}

异常执行为

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class HandleDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("============1111============");
            int c = 1 / 0;
            return "aaaa";
        }).handle((s, throwable) -> {
            System.out.println(s);
            if (Objects.nonNull(throwable)) {
                System.out.println(throwable.getMessage());
            }
            return "bbb";
        });
        try {
            String result = completableFuture.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}


handle()方法在执行异常时,也会调用

thenAcceptBoth整合CompletableFuture多任务


import java.util.concurrent.CompletableFuture;

public class ThenAcceptBothDemo {

    public static void main(String[] args) {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("completableFuture1执行");
            return 1;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("completableFuture2执行");
            return 2;
        });

        CompletableFuture<Void> completableFuture3 = completableFuture1.thenAcceptBoth(completableFuture2, (a, b) -> {
            System.out.println("completableFuture3执行");
            System.out.println(a + b);
        });

        completableFuture3.join();
    }
}


CompletableFuture中allOf()方法

allOf()是等待所有CompletableFuture方法执行完毕,然后再接着往下执行,没有返回值


import java.util.concurrent.CompletableFuture;

public class AllOfDemo {

    public static void main(String[] args) {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("completableFuture1执行");
            return 1;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("completableFuture2执行");
            return 2;
        });
        CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("completableFuture3执行");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 3;
        });
        CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3);
        completableFuture.join();
        System.out.println("主线程执行结束");
    }
}

allOf()参数支持传入一个数组

anyOf()方法使用

anyOf()是等待所有CompletableFuture方法任何一个返回就执行结束,有返回值


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AnyOfDemo {

    public static void main(String[] args) {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("completableFuture1执行");
            return 1;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("completableFuture2执行");
            return 2;
        });
        CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("completableFuture3执行");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("completableFuture3执行结束");
            return 3;
        });
        CompletableFuture<Object> completableFuture = CompletableFuture.anyOf(completableFuture1, completableFuture2, completableFuture3);
        try {
            Object result = completableFuture.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
        System.out.println("主线程执行结束");
    }
}

applyToEither()方法使用

applyToEither()说白了,就是哪个先返回就用哪个结果作为执行

import java.util.concurrent.CompletableFuture;  
import java.util.concurrent.ExecutionException;  
  
public class ApplyToEitherDemo1 {  
public static void main(String[] args) throws InterruptedException, ExecutionException {  
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {  
System.out.println(111);  
// try {  
// Thread.sleep(500);  
// } catch (InterruptedException e) {  
// throw new RuntimeException(e);  
// }  
return "hello1";  
});  
  
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {  
System.out.println(222);  
return "hello2";  
});  
  
CompletableFuture<String> future = completableFuture1.applyToEither(completableFuture2, x -> {  
System.out.println(x);  
return x;  
});  
  
System.out.println(future.get());  
}  
}

试着把sleep那段代码放开再试试,会发现哪个先返回用哪个

acceptEither()使用

acceptEither()跟applyToEither()差不多,只是没有返回值

import java.util.concurrent.CompletableFuture;

public class AcceptEitherDemo {

    public static void main(String[] args) {
        CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(111);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "hello1";
        });

        CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(222);
            return "hello2";
        });

        CompletableFuture<Void> future = completableFuture1.acceptEither(completableFuture2, x -> {
            x = x + ":" + "aa";
            System.out.println(x);
        });

        future.join();
    }
}


总结

CompletableFuture提供了丰富的api供我们使用,以此来提高开发效率,至于在开发中也没必要强行用CompletableFuture,还是有很多异步方法可供选择的