《 从没用过的有趣 rxjs》
前言
一直听过 rxjs
的大名, 但是真没用过,今天看了「珠峰架构」 的关于 rxjs 的公开课,发现 rxjs
还挺有趣的😁
使用
1. 观测普通数据
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
// 接收一个对象
observable.subscribe({
next: value => console.log('next value:', value),
complete: () => {
console.log('complete')
}
})
// 接受一个函数
observable.subscribe(value => console.log('next value:', value))
2. 数组管道操作
let r = [1, 2, 3];
const subscriber = from(r)
.pipe(map(val => val * 2)) // [2,4,6]
.pipe(filter(val => val > 3)) //[4,6]
.pipe(map(data => data + 1)); //[5,7]
// subscriber.subscribe(val => console.log(val, "abcd"));
subscriber.subscribe({
next: val => console.log(val),
});
3. 异步调用
function task(state) {
console.log("state: ", state);
if (state < 5) {
this.schedule(state + 1, 2000);
}
}
// 最后一个参数从哪开始
asyncScheduler.schedule(task, 10000, 2);
4. 定时任务
interval(500).pipe(take(3)).subscribe(console.log);
5. 发布订阅数据
const subject = new Subject();
subject.subscribe({ next: data => console.log("observerA: ", data) });
subject.subscribe({ next: data => console.log("observerB: ", data) });
subject.next(1); // observerA:1 observerA: 2
subject.next(2); // observerB:1 observerA: 2
感觉还是方便的,如果想要了解更多 Api 可以点击这里 👉 rxjs.dev/guide/overv…
Api
简单的源码实现
1. Observable
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
observable.subscribe(value => console.log("next value:", value));
observable.subscribe({
next: value => console.log("next value:", value),
complete: () => {
console.log("complete");
},
});
可以看出 Observable
是一个 类,接收一个 函数,同时它的实例上有一个叫做 subscribe
的方法,subscribe
可以接收 一个 函数 / 一个对象
所以我们可以写出这样的代码
class Observable {
constructor(subscribe) {
if (subscribe) {
this._subscribe = subscribe;
}
}
subscribe(observerOrNext){
let subscriber = null;
// 构造出一个对象出来
if (isFunction(observerOrNext)) {
subscriber = {
next: observerOrNext,
};
} else {
subscriber = observerOrNext;
}
// 执行 complate
subscriber.complete ??= () => {};
this._subscribe(subscriber);
return subscriber;
}
}
虽然可以完成基本需要,但是有些丑陋,而且功能杂糅到一起,让我们来进行一些抽离
通过 Subscriber类 对传入的数据进行包装,增加对应的功能
subscribe(observerOrNext) {
const subscriber = new Subscriber(observerOrNext);
this._subscribe(subscriber);
return subscriber;
}
在 Subscriber类 中增加 next / complate 方法,通过一个标志位 isStopped
来控制函数的执行
class Subscriber {
isStopped = false;
constructor(observerOrNext) {
let observer = null;
if (isFunction(observerOrNext)) {
observer = {
next: observerOrNext
}
} else {
observer = observerOrNext
}
//把观察者对象存到了订阅者对象的destination属性上
this.destination = observer;
}
next(value) {
if (!this.isStopped) {
this.destination.next(value);
}
}
//如果调用了complete方法,就表示生产完毕了
complete() {
if (!this.isStopped) {
this.isStopped = true;
this.destination.complete?.();
}
}
}
在 Subscriber类 的初始化中,我们格式化用户传递的数据,无论传入的是对象还是函数,都使其变成一个对象形式;用户在Observable
传入的回调函数中传递的 next
中传入的值,都被 Subscriber
的 next
收到
那么在 new Observable
接受的函数 就是 一个 Subscriber
实例
2. 数组调用
使用 from
接收一个数组,返回值可以使用 pipe
管道函数接收 map
或者 filter
对数组进行处理,并且可以监听转化过程
let r = [1, 2, 3];
const subscriber = from(r)
.pipe(map(val => val * 2)) // [2,4,6]
.pipe(filter(val => val > 3)) //[4,6]
.pipe(map(data => data + 1)); //[5,7]
// subscriber.subscribe(val => console.log(val, "abcd"));
subscriber.subscribe({
next: val => console.log(val),
});
from
的这个函数接收一个数组
function from(arrayLike) {
return new Observable(subscriber => {
for (let i = 0; i < arrayLike.length; i++) {
subscriber.next(arrayLike[i]);
}
subscriber.complete();
});
}
哦,那么它又用到了 Observable
,返回一个 Observable
实例,遍历执行 next
方法;
Observable
需要加上方法 pipe
由于 pipe
可以进行链式调用,并且可以进行值传递,所以简单来说,可以写成下面这种,把当前实例当做参数传递
class Observable{
// ...
pipe(operations){
return operations(this)
}
}
那么 map
和 filter
应该满足什么条件呢?
首先能够返回值能够调用 Observable类
中的 pipe
方法,所以必须返回一个 Observable
实例
其次还有返回值能够传递给下一个 pipe
🚩所以 在 map
中 的 source
是一个老 的 Observable
,同时需要返回一个转化过的新的 Observable
,这个新的 Observable
交由 pipe 返回,这样可以不断的链式调用
function map(project) {
//operation 传入老的Observable,返回新的Observable
return source => {
return new Observable(function (subscriber) {
return source.subscribe({
...subscriber,
//最关键的是要重写next方法,此value是老的Observable传过来的老值
next: (value) => {
subscriber.next(project(value));
}
})
});
}
}
同样的 ,filter
function filter(predicate) {
//operation 传入老的 Observable,返回新的Observable
return source => {
return new Observable(function (subscriber) {
return source.subscribe({
...subscriber,
//最关键的是要重写next方法,此value是老的Observable传过来的老值
next: (value) => {
predicate(value) && subscriber.next(value);
}
})
});
}
}
map
和 filter
都是高阶函数
通过不断的返回新的 Observable
让链式调用不断的执行,并且source 都是上一次 this
3. 异步处理
function task(state) {
console.log("state: ", state);
if (state < 5) {
this.schedule(state + 1, 2000);
}
}
// 最后一个参数从哪开始
asyncScheduler.schedule(task, 10000, 2);
asyncScheduler.schedule
接收3个参数,一个函数,一个执行间隔,一个初始值
其实很简单,就是一个定时器不断的执行
我们来看看
const asyncScheduler = new Scheduler(AsyncAction)
asyncScheduler
是 Scheduler 的一个实例, 接收 AsyncAction类
作为参数
执行this.schedulerActionCtor = schedulerActionCtor
把 AsyncAction
保存在自己身上
class Scheduler {
constructor(schedulerActionCtor) {
this.schedulerActionCtor = schedulerActionCtor;
}
/**
* setTimeout
* @param {*} work 要执行的工作
* @param {*} delay 延迟的时间
* @param {*} state 初始状态
* @returns
*/
schedule(work, delay = 0, state) {
return new this.schedulerActionCtor(work).schedule(state, delay);
}
}
可以看出 schedule
方法也是执行的是 AsyncAction
中的 schedule
方法
来到 AsyncAction类
中
AsyncAction
初始化接受一个存储 传入的函数,然后在 schedule
方法 中接收 state 和 delay
class AsyncAction {
pending = false
constructor(work) {
this.work = work;
}
//当调用此方法的时候,我要在delay时间后,以state作为参数调用work方法
schedule(state, delay = 0) {
this.state = state;
this.delay = delay;
// 清除定时器把上次的清除掉,避免多个定时器执行
if (this.timerID !== null) {
this.timerID = this.recycleAsyncId(this.timerID)
}
//表示有任务等待执行
this.pending = true;
this.timerID = this.requestAsyncId(delay);
}
recycleAsyncId(timerID) {
if (timerID !== null) {
clearInterval(timerID)
}
return null;
}
requestAsyncId(delay = 0) {
return setInterval(this.execute, delay)
}
execute = () => {
this.pending = false;
this.work(this.state);
//如果在 work中没有调度新的任务的话,那就把定时器也清掉
if (this.pending === false && this.timerID !== null) {
this.timerID = this.recycleAsyncId(this.timerID);
}
}
}
因为是在 setInterval
中执行的 execute
,为了保证 this 指向,所以使用 箭头函数
保证 execute
的指向
4. 定时任务
interval(1000).subscribe(v => console.log(v));
interval(500).pipe(take(3)).subscribe(console.log);
interval
接收一个执行 delayTime
,其实内部利用了 setInterval
function interval(
dueTime = 0,
scheduler = asyncScheduler
) {
return new Observable(subscriber => {
let n = 0;
return scheduler.schedule(function () {
subscriber.next(n++);
if (dueTime > 0) {
this.schedule(undefined, dueTime);
} else {
subscriber.complete();
}
}, dueTime);
});
}
没错,使用了 asyncScheduler
那个 异步类 - const asyncScheduler = new Scheduler(AsyncAction);
; 此时scheduler.schedule
指代的是Scheduler
类中的schedule
所以 work
是一个匿名函数,delay 是 dueTime
,state 是 undefined
class Scheduler {
constructor(schedulerActionCtor) {
this.schedulerActionCtor = schedulerActionCtor;
}
/**
* setTimeout
* @param {*} work 要执行的工作
* @param {*} delay 延迟的时间
* @param {*} state 初始状态
* @returns
*/
schedule(work, delay = 0, state) {
return new this.schedulerActionCtor(work).schedule(state, delay);
}
}
🚀到了在 this.schedule(undefined, dueTime)
中 this 指向的是 schedulerActionCtor
也就是 AsyncAction
,因为 scheduler.schedule
的调用本质上是 new this.schedulerActionCtor
的调用
5. 发布订阅数据
const subject = new Subject();
subject.subscribe({ next: data => console.log("observerA: ", data) });
subject.subscribe({ next: data => console.log("observerB: ", data) });
subject.next(1);
subject.next(2);
可以简单理解为
class Subject {
observers = [];
subscribe(subscriber) {
this.observers.push(subscriber);
}
next(value) {
for (const subscriber of this.observers) {
subscriber.next(value);
}
}
complete() {
for (const subscriber of this.observers) {
subscriber.complete();
}
}
}
使用内部变量 observers
收集 传入实例 subject
的 subscribe
的函数,当实例 subject
执行 next
时,拿出所有的收集到的函数,依次执行
总结
rxjs
有很多的 Api
, 只列取了很少一部分,有兴趣可以去看看rxjs 中文文档
转载自:https://juejin.cn/post/7258555175495958584