《 从没用过的有趣 rxjs》
前言
一直听过 rxjs 的大名, 但是真没用过,今天看了「珠峰架构」 的关于 rxjs 的公开课,发现 rxjs 还挺有趣的😁
使用
1. 观测普通数据
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
// 接收一个对象
observable.subscribe({
next: value => console.log('next value:', value),
complete: () => {
console.log('complete')
}
})
// 接受一个函数
observable.subscribe(value => console.log('next value:', value))

2. 数组管道操作
let r = [1, 2, 3];
const subscriber = from(r)
.pipe(map(val => val * 2)) // [2,4,6]
.pipe(filter(val => val > 3)) //[4,6]
.pipe(map(data => data + 1)); //[5,7]
// subscriber.subscribe(val => console.log(val, "abcd"));
subscriber.subscribe({
next: val => console.log(val),
});

3. 异步调用
function task(state) {
console.log("state: ", state);
if (state < 5) {
this.schedule(state + 1, 2000);
}
}
// 最后一个参数从哪开始
asyncScheduler.schedule(task, 10000, 2);

4. 定时任务
interval(500).pipe(take(3)).subscribe(console.log);

5. 发布订阅数据
const subject = new Subject();
subject.subscribe({ next: data => console.log("observerA: ", data) });
subject.subscribe({ next: data => console.log("observerB: ", data) });
subject.next(1); // observerA:1 observerA: 2
subject.next(2); // observerB:1 observerA: 2

感觉还是方便的,如果想要了解更多 Api 可以点击这里 👉 rxjs.dev/guide/overv…
Api
简单的源码实现
1. Observable
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
observable.subscribe(value => console.log("next value:", value));
observable.subscribe({
next: value => console.log("next value:", value),
complete: () => {
console.log("complete");
},
});
可以看出 Observable 是一个 类,接收一个 函数,同时它的实例上有一个叫做 subscribe 的方法,subscribe 可以接收 一个 函数 / 一个对象
所以我们可以写出这样的代码
class Observable {
constructor(subscribe) {
if (subscribe) {
this._subscribe = subscribe;
}
}
subscribe(observerOrNext){
let subscriber = null;
// 构造出一个对象出来
if (isFunction(observerOrNext)) {
subscriber = {
next: observerOrNext,
};
} else {
subscriber = observerOrNext;
}
// 执行 complate
subscriber.complete ??= () => {};
this._subscribe(subscriber);
return subscriber;
}
}

虽然可以完成基本需要,但是有些丑陋,而且功能杂糅到一起,让我们来进行一些抽离
通过 Subscriber类 对传入的数据进行包装,增加对应的功能
subscribe(observerOrNext) {
const subscriber = new Subscriber(observerOrNext);
this._subscribe(subscriber);
return subscriber;
}
在 Subscriber类 中增加 next / complate 方法,通过一个标志位 isStopped 来控制函数的执行
class Subscriber {
isStopped = false;
constructor(observerOrNext) {
let observer = null;
if (isFunction(observerOrNext)) {
observer = {
next: observerOrNext
}
} else {
observer = observerOrNext
}
//把观察者对象存到了订阅者对象的destination属性上
this.destination = observer;
}
next(value) {
if (!this.isStopped) {
this.destination.next(value);
}
}
//如果调用了complete方法,就表示生产完毕了
complete() {
if (!this.isStopped) {
this.isStopped = true;
this.destination.complete?.();
}
}
}
在 Subscriber类 的初始化中,我们格式化用户传递的数据,无论传入的是对象还是函数,都使其变成一个对象形式;用户在Observable传入的回调函数中传递的 next中传入的值,都被 Subscriber的 next 收到
那么在 new Observable 接受的函数 就是 一个 Subscriber 实例
2. 数组调用
使用 from 接收一个数组,返回值可以使用 pipe 管道函数接收 map 或者 filter 对数组进行处理,并且可以监听转化过程
let r = [1, 2, 3];
const subscriber = from(r)
.pipe(map(val => val * 2)) // [2,4,6]
.pipe(filter(val => val > 3)) //[4,6]
.pipe(map(data => data + 1)); //[5,7]
// subscriber.subscribe(val => console.log(val, "abcd"));
subscriber.subscribe({
next: val => console.log(val),
});
from 的这个函数接收一个数组
function from(arrayLike) {
return new Observable(subscriber => {
for (let i = 0; i < arrayLike.length; i++) {
subscriber.next(arrayLike[i]);
}
subscriber.complete();
});
}
哦,那么它又用到了 Observable,返回一个 Observable 实例,遍历执行 next 方法;
Observable 需要加上方法 pipe
由于 pipe 可以进行链式调用,并且可以进行值传递,所以简单来说,可以写成下面这种,把当前实例当做参数传递
class Observable{
// ...
pipe(operations){
return operations(this)
}
}
那么 map 和 filter 应该满足什么条件呢?
首先能够返回值能够调用 Observable类 中的 pipe 方法,所以必须返回一个 Observable实例
其次还有返回值能够传递给下一个 pipe
🚩所以 在 map 中 的 source 是一个老 的 Observable,同时需要返回一个转化过的新的 Observable ,这个新的 Observable 交由 pipe 返回,这样可以不断的链式调用
function map(project) {
//operation 传入老的Observable,返回新的Observable
return source => {
return new Observable(function (subscriber) {
return source.subscribe({
...subscriber,
//最关键的是要重写next方法,此value是老的Observable传过来的老值
next: (value) => {
subscriber.next(project(value));
}
})
});
}
}
同样的 ,filter
function filter(predicate) {
//operation 传入老的 Observable,返回新的Observable
return source => {
return new Observable(function (subscriber) {
return source.subscribe({
...subscriber,
//最关键的是要重写next方法,此value是老的Observable传过来的老值
next: (value) => {
predicate(value) && subscriber.next(value);
}
})
});
}
}
map 和 filter 都是高阶函数
通过不断的返回新的 Observable让链式调用不断的执行,并且source 都是上一次 this
3. 异步处理
function task(state) {
console.log("state: ", state);
if (state < 5) {
this.schedule(state + 1, 2000);
}
}
// 最后一个参数从哪开始
asyncScheduler.schedule(task, 10000, 2);
asyncScheduler.schedule 接收3个参数,一个函数,一个执行间隔,一个初始值
其实很简单,就是一个定时器不断的执行
我们来看看
const asyncScheduler = new Scheduler(AsyncAction)
asyncScheduler 是 Scheduler 的一个实例, 接收 AsyncAction类 作为参数
执行this.schedulerActionCtor = schedulerActionCtor 把 AsyncAction保存在自己身上
class Scheduler {
constructor(schedulerActionCtor) {
this.schedulerActionCtor = schedulerActionCtor;
}
/**
* setTimeout
* @param {*} work 要执行的工作
* @param {*} delay 延迟的时间
* @param {*} state 初始状态
* @returns
*/
schedule(work, delay = 0, state) {
return new this.schedulerActionCtor(work).schedule(state, delay);
}
}
可以看出 schedule 方法也是执行的是 AsyncAction 中的 schedule 方法
来到 AsyncAction类 中
AsyncAction 初始化接受一个存储 传入的函数,然后在 schedule方法 中接收 state 和 delay
class AsyncAction {
pending = false
constructor(work) {
this.work = work;
}
//当调用此方法的时候,我要在delay时间后,以state作为参数调用work方法
schedule(state, delay = 0) {
this.state = state;
this.delay = delay;
// 清除定时器把上次的清除掉,避免多个定时器执行
if (this.timerID !== null) {
this.timerID = this.recycleAsyncId(this.timerID)
}
//表示有任务等待执行
this.pending = true;
this.timerID = this.requestAsyncId(delay);
}
recycleAsyncId(timerID) {
if (timerID !== null) {
clearInterval(timerID)
}
return null;
}
requestAsyncId(delay = 0) {
return setInterval(this.execute, delay)
}
execute = () => {
this.pending = false;
this.work(this.state);
//如果在 work中没有调度新的任务的话,那就把定时器也清掉
if (this.pending === false && this.timerID !== null) {
this.timerID = this.recycleAsyncId(this.timerID);
}
}
}
因为是在 setInterval 中执行的 execute,为了保证 this 指向,所以使用 箭头函数 保证 execute 的指向
4. 定时任务
interval(1000).subscribe(v => console.log(v));
interval(500).pipe(take(3)).subscribe(console.log);
interval 接收一个执行 delayTime,其实内部利用了 setInterval
function interval(
dueTime = 0,
scheduler = asyncScheduler
) {
return new Observable(subscriber => {
let n = 0;
return scheduler.schedule(function () {
subscriber.next(n++);
if (dueTime > 0) {
this.schedule(undefined, dueTime);
} else {
subscriber.complete();
}
}, dueTime);
});
}
没错,使用了 asyncScheduler 那个 异步类 - const asyncScheduler = new Scheduler(AsyncAction);; 此时scheduler.schedule指代的是Scheduler类中的schedule
所以 work 是一个匿名函数,delay 是 dueTime,state 是 undefined
class Scheduler {
constructor(schedulerActionCtor) {
this.schedulerActionCtor = schedulerActionCtor;
}
/**
* setTimeout
* @param {*} work 要执行的工作
* @param {*} delay 延迟的时间
* @param {*} state 初始状态
* @returns
*/
schedule(work, delay = 0, state) {
return new this.schedulerActionCtor(work).schedule(state, delay);
}
}
🚀到了在 this.schedule(undefined, dueTime) 中 this 指向的是 schedulerActionCtor也就是 AsyncAction,因为 scheduler.schedule的调用本质上是 new this.schedulerActionCtor 的调用
5. 发布订阅数据
const subject = new Subject();
subject.subscribe({ next: data => console.log("observerA: ", data) });
subject.subscribe({ next: data => console.log("observerB: ", data) });
subject.next(1);
subject.next(2);
可以简单理解为
class Subject {
observers = [];
subscribe(subscriber) {
this.observers.push(subscriber);
}
next(value) {
for (const subscriber of this.observers) {
subscriber.next(value);
}
}
complete() {
for (const subscriber of this.observers) {
subscriber.complete();
}
}
}
使用内部变量 observers 收集 传入实例 subject 的 subscribe 的函数,当实例 subject 执行 next 时,拿出所有的收集到的函数,依次执行
总结
rxjs 有很多的 Api, 只列取了很少一部分,有兴趣可以去看看rxjs 中文文档
转载自:https://juejin.cn/post/7258555175495958584