likes
comments
collection
share

Java小技巧:利用RxJava打造可观测数据RxLiveData

作者站长头像
站长
· 阅读数 25

1. 问题场景

在实际工作中,我们经常需要在不同类对象之间、不同模块之间共享数据,而这些数据通常是可改动的,那么就可能发生一个问题:当数据变动时,相关对象或模块并不知道,没有及时更新数据。这时候,我们希望数据改变时可以通知其他模块同步更新,实现一个类似数据之间联动的效果。最容易想到的应该就是监听回调的观察者模式,下面给出一种以前见过的、不太优雅的实现:

class User {
    //...... Java Bean 的字段略
}

interface Listener {
    void onUserUpdated(User user);
}

class UserManager {
    private static UserManager manager = new UserManager();

    private UserManager() {
    }

    public static UserManager getInstance() {
        return manager;
    }

    private User user;
    private List<Listener> listeners = new LinkedList<>();

    public void addUserListener(Listener listener) {
        listeners.add(listener);
    }

    public void removeUserListener(Listener listener) {
        listeners.remove(listener);
    }

    public User getUser() {
        return user;
    }

    public void setUser(User user) {
        this.user = user;
        for (Listener listener : listeners) {
            listener.onUserUpdated(this.user);
        }
    }
}

这种方式有以下缺点:

  1. 不具备复用性(每次添加新的数据都要把回调监听重新实现一遍);
  2. 增加内存溢出的风险(调用addUserListener的人可能忘记调用removeUserListener);
  3. setter方法的污染(做了多余的事情)。

面对这样的问题,RxJava、JDK中的Observable和Flow API还有Android里的LiveData都给出了可用的实现方式,在实际开发中,感觉并不是那么方便。而本文要介绍的是我利用RxJava打造一个更加方便的可观测对象工具类--RxLiveData(代码见最底部)。

2. 使用示例

先来看一个比较短的完整示例:

/* 测试用的 Java Bean 数据类*/
class User {
    //...... Java Bean 的字段略
}
/* 一个单例 */
class UserManager {
    private static final UserManager manager = new UserManager();

    private UserManager() {
    }

    public static UserManager getInstance() {
        return manager;
    }

    private final RxLiveData<User> userData = new RxLiveData<>();

    public RxLiveData<User> getUserData() {
        return userData;
    }
}

class A {
    public void init() {
        //订阅可观测对象,使得数据发生改变时可以被回调
        UserManager.getInstance().getUserData().getObservable()
                .subscribe((User user) -> {//使用lambda版
                    update(user);// 每次用户信息改变,这里会被调用
                });

        update(UserManager.getInstance().getUserData().getValue());
    }

    private void update(User user) {
        System.out.println("user changed");
    }
}

class B{
    public B() {
        UserManager.getInstance().getUserData().getObservable().subscribe(this::update);//方法引用版
    }

    private void update(User user) {
        System.out.println("user changed");
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        A a = new A();
        a.init();
        B b = new B();
        //更新UserManager中的数据,这时候A和B中的对应方法会被调用
        UserManager.getInstance().getUserData().postData(new User());
    }
}

这里模拟UserManager的数据在AB类的对象之间共享,当UserManager的内容发生改变时,可以通知到AB,执行相应操作。

这时如果还想给UserManager增加一个数据,例如一个long类型的time,只需要按照下面这样添加一个属性和一个getter方法就可以了:

private final RxLiveData<Long> timeData = new RxLiveData<>();

public RxLiveData<Long> getTimeData() {
    return timeData;
}

如果是在Android应用开发中,还可以借助RxAndroid和RxLifecycle的功能,来控制回调的执行线程并在界面销毁时取消订阅,例如:

userManager.getUserData().getObservable()
        .compose(bindUntilEvent(ActivityEvent.DESTROY))//指定在onDestroy回调时取消订阅
        .observeOn(AndroidSchedulers.mainThread())//指定主线程
        .subscribe(user -> {
            
        }, Throwable::printStackTrace);

3. 主要方法介绍

3.1 getObservable 方法

方法签名public Observable<T> getObservable()

这个方法用于获取RxJava的Observable,进而对数据进行订阅,还可以得到RxJava相关功能的支持(例如,Stream 操作,指定线程,控制生命周期等等)。

3.2 postData 方法

方法签名public void postData(T value)

这个方法用于更新数据。它会更新存在在当前RxLiveData对象中的数据,并通过RxJava的ObservableEmitter触发观察者的回调。

注意:当参数为null时,由于RxJava会对null抛出异常,所以这里的实现方式是在判断为null的时候只存储数据,不触发观察者的回调。

3.3 getValue 方法

方法签名public T getValue()

这个方法仅用于获得存在在当前RxLiveData中的数据。

3.4 optValue 方法

方法签名public Optional<T> optValue()

getValue方法的Optional版本。

4. 完整实现

import io.reactivex.rxjava3.core.Observable;//如果用的是RxJava2的请改为该版本的包名
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.disposables.Disposable;

import java.util.Optional;

public class RxLiveData<T> {
    private final Observable<T> observable;
    private Disposable disposable;
    private T value;
    private ObservableEmitter<T> emitter;

    public RxLiveData() {
        observable = Observable
                .create((ObservableEmitter<T> emitter) -> this.emitter = emitter)
                .publish()
                .autoConnect(0, disposable -> this.disposable = disposable);
    }

    public Observable<T> getObservable() {
        return observable;
    }

    public void postData(T value) {
        this.value = value;
        if (emitter != null && value != null) {
            emitter.onNext(value);
        }
    }

    public T getValue() {
        return value;
    }

    public Optional<T> optValue() {
        return Optional.ofNullable(value);
    }
}