likes
comments
collection
share

一款简单高效的Android异步框架

作者站长头像
站长
· 阅读数 3

当我们在用java进行Android开发时,经常会涉及到线程间的切换操作。由子线程执行耗时任务,执行完毕后再切到主线程。对于这种需求在Rxjava出现之前不可避免采用handler.post(runnable)来实现。后来Rxjava出现了,我们便可以借助于RxJava强大的操作符subscribeOn, observeOn来实现线程的切换。在梳理以前用Rxjava开发的项目,发现其实很多时候都是用它来进行线程切换。而对于开发来讲,Rxjava框架比较重,如果仅仅是想方便的进行线程切换,完全没有必要引入Rxjava。为此,我们可以自己封装一套框架,实现这个功能。

期望

我们期望框架能够支持如下功能:

  1. 简单方便进行线程的切换(必须的)
  2. 最好能像Rxjava那样使用,因为大多数人已经用惯了
  3. 线程池也可以提供给外部使用
  4. 功能尽量单一,代码简洁

框架的设计:

一款简单高效的Android异步框架

Async类

框架的入口,相关方法命名和Rxjava类似,用途也一致,不做过多讲述。新增了一个 observableOnMain 方法,是针对线程执行耗时操作后将结果返回给主线程的场景。使用如下:

Async.create(()-> 1 + 1).observableOnMain(new Subscriber<Integer>() {
     @Override
     public void onError(Throwable th) {

     }

     @Override
     public void onSuccess(Integer data) {
         printData(data);
     }
 })
public class Async<T> {
    private final Callable<T> callable;
    private Scheduler subscribeOnExecutor;
    private Scheduler observeOnExecutor;
    private Async(Callable<T> callable) {
        this.callable = callable;
    }

    public static<T> Async<T> create(Callable<T> callable) {
        return new Async<>(callable);
    }

    /**
     * 执行线程
     */
    public Async<T> subscribeOn(Scheduler executor) {
        this.subscribeOnExecutor = executor;
        return this;
    }

    /**
     * 结果返回的线程
     */
    public Async<T> observeOn(Scheduler executor) {
        this.observeOnExecutor = executor;
        return this;
    }

    /**
     * 异步框架大部份的使用场景都是在子线程执行耗时任务,将任务结果返回给主线程, 所以直接抽出一个方案
     */
    public void observableOnMain(Subscriber<T> subscriber) {
        subscribeOnExecutor = Schedulers.io();
        observeOnExecutor = Schedulers.main();
        subscribe(subscriber);
    }
    public void subscribe(Subscriber<T> subscriber) {
        if (subscribeOnExecutor != null) {
            subscribeOnExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        T result = callable.call();
                        if (observeOnExecutor != null) {
                            observeOnExecutor.execute(() -> subscriber.onSuccess(result));
                        } else {
                            subscriber.onSuccess(result);
                        }
                    } catch (Exception e) {
                        if (observeOnExecutor != null) {
                            observeOnExecutor.execute(() -> subscriber.onError(e));
                        } else {
                            subscriber.onError(e);
                        }
                    }
                }
            });
        }
    }
}

Subscribe

这是一个接口,其实现也非常简单

public interface Subscriber<T> {
    void onError(Throwable th);

    void onSuccess(T t);
}

SubscriberForSuccess

这个类存在的目的是为了针对这种场景:我们在子线程中执行了任务,但只关注任务执行成功的情况,不想搭理失败。

public abstract class SubscriberForSuccess<T> implements Subscriber<T> {

    @Override
    public void onError(Throwable th) {
        th.printStackTrace();
    }
}
Async.create(()-> 1 + 1).observableOnMain(new SubscriberForSuccess<Integer>() {
    @Override
    public void onSuccess(Integer data) {
        System.out.println("data: " + data);
    }
});

Scheduler

线程调度器接口

public interface Scheduler {
    void execute(Runnable runnable);
     ExecutorService getExecutor();
}

Schedulers

相当于线程调度器的 provider, 里面提供了 io, computation, main 三种类型的调度器

public class Schedulers {
    static final int CPU = Runtime.getRuntime().availableProcessors();
    public static final class IOScheduler implements Scheduler {
        /**
         * 核心线程数:通常可以将核心线程数设置为0, IO线程池不需要响应的及时性,所以将常驻线程设置为0,可以减少应用的线程数量
         * 最大线程数:通常中小型,业务比较简单设置成64即可。
         */
        ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 64, 1,
                TimeUnit.MINUTES, new SynchronousQueue<>(), new MobileThreadFactory("IOScheduler"));
        @Override
        public void execute(Runnable runnable) {
            executor.execute(runnable);
        }
        public ExecutorService getExecutor() {
            return executor;
        }
    }

    private static final class ComputationScheduler implements Scheduler {
        /**
         * 核心线程:将核心线程数设置为该手机的 CPU 核数,理想状态下每一个核可以运行一个线程,这样能减少 CPU 线程池的调度损耗又能充分发挥 CPU 性能。
         * 最大线程数:和核心线程保持一致,因为当最大线程数超过了核心线程数时,反倒会降低 CPU 的利用率,因为此时会把更多的 CPU 资源用于线程调度上,
         */
        ExecutorService executor = new ThreadPoolExecutor(Schedulers.CPU, Schedulers.CPU, 1,
                TimeUnit.MINUTES, new LinkedBlockingDeque<Runnable>(),  new MobileThreadFactory("ComputationScheduler"));

        @Override
        public void execute(Runnable runnable) {
            executor.execute(runnable);
        }

        @Override
        public ExecutorService getExecutor() {
            return executor;
        }
    }
    private static class IOSchedulerHolder {
        private static final IOScheduler INSTANCE = new IOScheduler();
    }
    public static Scheduler io() {
        return IOSchedulerHolder.INSTANCE;
    }
    private static class ComputationSchedulerHolder {
        private static final ComputationScheduler INSTANCE = new ComputationScheduler();
    }
    public static Scheduler computation() {
        return ComputationSchedulerHolder.INSTANCE;
    }

    private static class MainSchedulerHolder {
        private static final Main INSTANCE = new Main();
    }
    public static Scheduler main() { return MainSchedulerHolder.INSTANCE;}

    private static class Main implements Scheduler {
        @Override
        public void execute(Runnable runnable) {
             Platform.get().execute(runnable);
        }

        @Override
        public ExecutorService getExecutor() {
            return null;
        }
    }
}

MobileThreadFactory

自定义 ThreadFactory, 做了如下配置:

  1. 对线程名字进行标识,方便跟踪。
  2. 修改线程栈,可一定程度节省虚拟内存
  3. 对线程的状态进行重置
public class MobileThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public MobileThreadFactory(String name) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        namePrefix = name + "-pool-" +
                poolNumber.getAndIncrement() +
                "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                -512 * 1024);
        if (t.isDaemon()) {
            t.setDaemon(false);
        }
        if (t.getPriority() != Thread.NORM_PRIORITY) {
            t.setPriority(Thread.NORM_PRIORITY);
        }
        return t;
    }
}

使用


public class AsyncActivity extends AppCompatActivity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_async);
        initView();
    }

    private void initView() {
        findViewById(R.id.btn_async).setOnClickListener(v -> Async.create(() -> 1 + 1).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
            @Override
            public void onError(Throwable th) {

            }

            @Override
            public void onSuccess(Integer data) {
                printData(data);
            }
        }));
        findViewById(R.id.btn_async_main).setOnClickListener(v -> Async.create(()-> 1 + 1).observableOnMain(new Subscriber<Integer>() {
            @Override
            public void onError(Throwable th) {

            }

            @Override
            public void onSuccess(Integer data) {
                printData(data);
            }
        }));
        findViewById(R.id.btn_async_success).setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                Async.create(()-> 1 + 1).observableOnMain(new SubscriberForSuccess<Integer>() {
                    @Override
                    public void onSuccess(Integer data) {
                     printData(data);
                    }
                });
            }
        });
        findViewById(R.id.btn_thread_pool).setOnClickListener(v -> {
            for (int i = 0; i < 10; i ++) {
                Schedulers.io().execute(() -> {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    System.out.println("execute on:" + Thread.currentThread().getName());
                });
            }
        });

    }
    private void printData(Integer data) {
        System.out.println("data: " + data + "   threadName:" + Thread.currentThread().getName());
    }
}