likes
comments
collection
share

深入浅出--Rxjava源码分析<一>

作者站长头像
站长
· 阅读数 7

在之前的博客中简单介绍了Rxjava的使用和与Retrofit的API配合使用;那么在这里我们来看下Rxjava的源码;

1.Observable–>(subscribe)Subscriber

首先来看完整代码:

Observable.create(new Observable.OnSubscribe<Drawable>() {


            @Override
            public void call(Subscriber<? super Drawable> subscriber) {
                Drawable drawable = getResources().getDrawable(R.drawable.t2);
                subscriber.onNext(drawable);
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.io()) //Observable.OnSubscribe中Call方法所在线程
                .observeOn(AndroidSchedulers.mainThread())  //Subscribe中回调所在的线程
                .subscribe(new Subscriber<Drawable>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Drawable drawable) {

                        mBg.setImageDrawable(drawable);

                    }

1.1.首先来看被观察者的创建:create //Observable.create

    //参数onSubscribe的实例化 作为参数传入create中
    //由于继承自Action1所以会有call方法需要实现
     public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }

    public interface Action1<T> extends Action {
        void call(T t);
    }



-----------------------------------------------------------------

    @Deprecated
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

从上面代码可以看到 Observable.create其实进行两步操作: 1.将参数传入Observable的构造,实例化呗观察者 2.在Observable类中传入的Observable.OnSubscribe赋值给Observable的一个成员属性onSubscribe 在后面会有用到

而RxJavaHooks.onCreate(f)其实就是对Observable.OnSubscribe的一些封装,在这里不是很重要,所以没有必要关心:

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
        Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
        if (f != null) {
            return f.call(onSubscribe);
        }
        return onSubscribe;
    }

看到没有其实返回的还是onSubscribe;

1.2被观察者 已经创建完成 然后开始订阅观察者:

 .subscribe(new Subscriber<Drawable>()

ok 首先来看subscribe的源码:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }


static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

在上面代码中 第二个subscribe方法有点长 我们来看其核心代码: //首先来看

if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

其实这个方法是对参数的一个检验吧 SafeSubscriber是Subscriber的子类 其中也复写了onError onNext onComplete方法

然侯看订阅的核心代码:

RxJavaHooks.onObservableStart(
        observable,observable.onSubscribe).call(subscriber);         
return RxJavaHooks.onObservableReturn(subscriber);

上面的代码中 :

 public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
    Func2<Observable, 
Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        if (f != null) {
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }


RxJavaHooks.onObservableStart(observable,observable.onSubscribe)返回被观察者创建时传入的onSubscribe(可以看前面代码 有提到过); 然后.call(subscriber)将初始化的观察者传入OnSubscribe的call方法参数中;调用call方法 在被观察者创建时候在OnNext中传入参数 然后在订阅观察者的时候重写OnNext方法 对传入的参数进行操作 即如下代码:

 Observable.create(new Observable.OnSubscribe<Drawable>() {


            @Override
            public void call(Subscriber<? super Drawable> subscriber) {
                Drawable drawable = getResources().getDrawable(R.drawable.t2);
                subscriber.onNext(drawable);
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.io()) //Observable.OnSubscribe中Call方法所在线程
                .observeOn(AndroidSchedulers.mainThread())  //Subscribe中回调所在的线程
                .subscribe(new Subscriber<Drawable>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Drawable drawable) {

                        mBg.setImageDrawable(drawable);

                    }

其中:

            .subscribeOn(Schedulers.io()) 
            .observeOn(AndroidSchedulers.mainThread())  

是对Observable.Onsubscribe—>call()和Subscribe中复写方法的线程指定,在前面博文 Rxjava从入门到熟练中有讲过;

这样到这里 罪常用的Rxjava调用就介绍完了;

2.Rxjava的不完全调用

    Observable.create(new Observable.OnSubscribe<Drawable>() {


            @Override
            public void call(Subscriber<? super Drawable> subscriber) {
                Drawable drawable = getResources().getDrawable(R.drawable.t2);
                subscriber.onNext(drawable);
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.io()) //Observable.OnSubscribeCall方法所在线程
                .observeOn(AndroidSchedulers.mainThread())  //Subscribe中回调所在的线程
                .subscribe(new Action1<Drawable>() {

                    @Override
                    public void call(Drawable drawable) {
                        mBg.setImageDrawable(drawable);
                    }
                });

和之前的代码差不多 唯一的不能是 subscribe实例化 换成Action1

ActionX是Rxjava对外开放的一部分接口: ok~ 直接来看源码:

    if (onNext == null) {
            throw new IllegalArgumentException("onNext can not be null");
        }

        Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
        Action0 onCompleted = Actions.empty();
        return subscribe(new ActionSubscriber<T>(onNext,
         onError, onCompleted));
    }
```在RXjava中ActionX  会根据传入的参数  返回对应的方法:





<div class="se-preview-section-delimiter"></div>

这里写代码片 “`

Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted

传入Throwable 就是onError(xxx) 传入空参 就是onComplete(Action0) 传入其他为onNext(xxx)

所以:

    .subscribe(new Action1<Drawable>() {

                    @Override
                    public void call(Drawable drawable) {
                        mBg.setImageDrawable(drawable);
                    }
                });

就是 直接会调onNext();

Rxjava的源码 先介绍到这里 之后的博文会对flmap map 等其他API的源码 进行分析 感谢阅读 期待高效的建议 谢谢~