likes
comments
collection
share

🐒🐒🐒猴子都能看懂的Rxjs教程(上篇)

作者站长头像
站长
· 阅读数 19

基本概念

Rxjs是一个处理同步或异步的事件请求库,可以配合前端三大框架Angular/React/Vue使用,也可以和Node框架Nest配合使用

观察者模式

以买房为例,小明需要知道楼盘A的信息,小明就是一个观察者,楼盘A是被观察的目标

  1. 实现观察者
const xiaoming = {
  notify:id=>{
   console.log(`我是小明,我收到楼盘${id}的信息了`)
  }
}
const lyllovelemon = {
 notify:id=>{
   console.log(`我是柠檬酱,我收到楼盘${id}的信息了`)
 }
}
  1. 实现被观察目标
const houseSubject = {
 observers:[],//存放所有需要通知买房的人的信息(观察者)
 // 通知所有观察者楼盘开盘信息
 notifyObservers:id=>{
  houseSubject.observers.forEach(people=>{
   people.notify(id)
  })
 },
 // 加入新的观察者,就是有新的人想知道楼盘信息
 addObserver:newPeople=>{
  houseSubject.observers.push(newPeople)
 },
 removeObserver:people=>{
  houseSubject.observers = houseSubject.observers.filter(
      obs => obs !== people
    );
 }
}
  1. 执行逻辑 有了观察者(想买房人)和被观察目标(楼盘信息),就可以试着加入一个新的楼盘A了
houseSubject.notifyObservers('A');//通知楼盘A开盘

// 目前没有任何观察者,所以不会发送通知

加入观察者

houseSubject.addObserver('lyllovelemon')//加入柠檬酱
houseSubject.notifyObservers('B');//通知楼盘B开盘,此时柠檬酱会收到楼盘B开票通知

// 输出结果
我是柠檬酱,我收到楼盘B的信息了

再加入一个观察者

houseSubject.addObserver('xiaoming')//加入小明
houseSubject.notifyObservers('C');//通知楼盘C开盘,此时柠檬酱和小明都会收到楼盘C开盘通知

// 输出结果
我是柠檬酱,我收到楼盘C的信息了
我是小明,我收到楼盘C的信息了

此时移除一个观察者

houseSubject.removeObserver('lyllovelemon')//移除柠檬酱
houseSubject.notifyObservers('D');// 通知楼盘D开盘,此时小明会收到楼盘D开盘通知

// 输出结果
我是小明,我收到楼盘D的信息了

迭代器模式

学过JavaScript的都很好理解,es6的generator函数就是一个简单的迭代器

var g = gen(1);
g.next() // { value: 3, done: false }
g.next() // { value: undefined, done: true }


// for...of 也是一个简单的迭代器
let arr = [1,2,3]
for(const item of arr){
  console.log(item)
}

迭代器模式包含两部分:

  • next() 用于取得当前集合的下一个元素
  • hasNext() 用于判断是否还有下一个元素可以访问,当没有下一个元素可以访问时,代表访问结束

通过Js实现迭代器模式

const createEvenOddIterator = (data) => {
  let nextIndex = 0;

  // 定义迭代规则
  return {
    hasNext: () => {
      return nextIndex < data.length;
    },
    next: () => {
      const currentIndex = nextIndex;
      // 下一个索引值加2
      nextIndex += 2;
      // 如果下一个索引值超过队列长度,且索引值为偶数
      // 代表偶数索引访问完毕,跳到奇数索引的起点
      if(nextIndex >= data.length && nextIndex % 2 == 0) {
        nextIndex = 1;
      }
      // 回传当前索引值内容
      return data[currentIndex];
    }
  };
};
const iterator = createEvenOddIterator(['a', 'b', 'c', 'd']);
console.log('start')
while (iterator.hasNext()) {
  const value = iterator.next();
  console.log(value);
}
console.log('end');

// 输出结果
start
a
c
b
d
end

Observable

Observable是一个用于被观察的对象

Subject

处理一个新的Subject只需要三个步骤

  1. 初始化
import {Subject} from "rxjs"//引入Subject

const subject = new Subject()//通过new实例化
  1. 事件订阅,分别是next/error/complete
  • next 用来触发新的事件,使用next传入数据后,就有新的事件发生
  • error 用于触发错误事件,一个Observable只会触发一次,代表事件已经处理完成并发生了错误
  • complete 用于触发完成事件,一个Observable只会触发一次,代表事件已经结束
subject.subscribe({
//一个subject包含三个事件:next(处理函数),error(错误处理函数),complete(完成函数)
  next:(data)=>{
    console.log('get value'+data)
  },
  error:(err)=>{
    console.error(err)
  },
  complete:()=>{
    console.log('done')
  }
}
)

上面是最完整的写法,也可以分开写

// 写法1:只处理next
subject.subscribe({
  next:(data)=>{
    console.log(data)
  }
}
)

//写法2:直接传入一个方法作为参数
subject.subscribe(data=>console.log(data))
  1. 取消订阅
subject.unsubscribe()

由上述代码对比观察者模式

  • 每个被观察的目标houseSubject就等于subject
  • 观察目标的addObserver方法等价于Rxjs的subscribe方法
  • 观察目标的removeObserver方法等价于Rxjs的unsubscribe方法
  • 观察者实现的notify方法等价于Rxjs中Observer的next方法(接收通知)
  • 观察目标实现的notifyObservers方法等价于Rxjs中Observer的next方法(送出通知)
  • 处理next()方法用于处理通知,Rxjs还额外定义了error()和complete()分别用于处理错误和结束通知
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});
observable.subscribe({
  next(x) {
    console.log(x);
  },
  error(err) {
    console.error('err: ' + err);
  },
  complete() {
    console.log('done');
  },
});

控制台输入结果如下:

  • 1
  • 2
  • 3
  • 4(1秒后输出)
  • done

Observer

Observer(观察者)是Observable(被观察对象)的回调函数,就是上面提到的next/error/complete,要使用Observer首先要订阅

const observer = {
  next: (x) => console.log('got a next value: ' + x),
  error: (err) => console.error('got an error: ' + err),
  complete: () => console.log('got a complete notification'),
};
observable.subscribe(observer);

Operator

Operator(操作符)记录了事件要被怎样处理

  • 每个Operator的输入包含一个来源Observable
  • 同一个Operator的输出包含另一个Observable
  • 通过组合我们拿到了全新的Observable

Rxjs有100多种操作符,不需要全部都记住,只要在使用的时候查API文档即可

import { of,fromEvent } from "rxjs";

const source = of("lyllovelemon");//of操作符用于将字符串变为Observable

const event = fromEvent(document,"click");

//fromEvent用于事件处理,等价于下面代码
const event = new Subject();
document.addEventListener('click', (e) => {
  event.next(e);
});

Operator正确使用可以帮助我们简化代码

实现一个简易计数器

  • 页面展示三种状态:

    • 目前状态:包含「开始计数」、「完成」和「错误」(包含错误信息)。
    • 目前计数:展示「计数」按钮被点击的次数。
    • 偶数计数:每当「目前计数」数值为偶数时,显示这个偶数值。
  • 页面包含四个按钮,功能如下:

    • 「开始新的计数器」按钮:重新建立一个新的计数器,必须使用 new Subject() 来建立;并在「目前状态」部分显示「开始计数」。
    • 「计数」按钮:当建立新的计数器后,每按下计数按钮,显示的数值就加1。
    • 「错误」按钮:要求展示错误信息在[目前状态]里。
    • 「完成计数」按钮:在「目前状态」中显示「完成」
  • 其他要求:

    • 当按下「开始新的计数器」时,所有计数器重置为0。
    • 当按下「错误」或「完成计数」,除非按下「开始新的计数器」,否则其他按钮不会有任何新的操作。

🐒🐒🐒猴子都能看懂的Rxjs教程(上篇)

实现思路

用rxjs实现需要有事件绑定(fromEvent)、过滤操作符(filter)

  1. 最开始完成html部分
 <body>
    <button id='start'>开始新的计数器</button>
    <button id='count'>计数</button>
    <button id='error'>错误</button>
    <button id='done'>完成计数</button>
   
    <div id='status'>
      目前状态:
    </div>
    <div id='currentNum'>
      目前计数:
    </div>
    <div id='evenNum'>
      偶数计数:
    </div>
  </body>
  1. 初始化变量,count作为当前的计数器,counter$订阅Subject的变化,以$命名变量代表它是一个可被观察的对象(Observable)
let counter = 0;
let counter$: Subject<number>;
  1. 需要依次注册四个按钮和三个文本
// BOM操作不需要多说
const startButton = document.querySelector('#start');
const countButton = document.querySelector('#count');
const errorButton = document.querySelector('#error');
const doneButton = document.querySelector('#done');

const currentTxt = document.querySelector('#currentNum');
const evenTxt = document.querySelector('#evenNum');
const statusTxt = document.querySelector('#status');
  1. 以[开始新的计数器]按钮为例,需要处理:
  • 重置count为0
  • 初始化counter$
  • 更新[目前状态]文本
// 绑定[开始新的计数器]按钮的点击事件
// 通过fromEvent(绑定对象,事件名)注册事件
fromEvent(startButton, 'click').subscribe(() => {
  counter$ = new Subject(); // 初始化counter$
  counter = 0;//重置计数器

  statusTxt.innerHTML = '目前状态:开始计数';//更新目前状态文本
  
  //todo...
});
  1. 我们需要订阅counter$(Observable)的变化来监听数值变化
  counter$.subscribe(data=>{
      currentCount.innerHTML = "当前计数:"+data
  })
  1. 增加一个新的Observable用于监听偶数值的变化

这样做的好处是[关注点分离],也就是说监听数值的变化和监听偶数值的变化是两部分,这样做就可以只关注需要处理变化的部分,有利于代码的可维护性和可读性

 const evenCounter$ = counter$.pipe(
    filter(data => data % 2 === 0)
  )
  evenCounter$.subscribe(data => {
    evenTxt.innerHTML = `偶数计数:${data}`;
  });
  1. 处理[计数按钮],点击该按钮时:
  • 数值加1(count+1)
  • 更新[目前计数]文本值
  • 当数值为偶数时,更新[偶数计数]文本值

后面两步操作可以写在counter$和evenCounter$的订阅函数中

fromEvent(startButton, 'click').subscribe(() => {
  ...
  counter$.subscribe({
    next: data => {
      currentTxt.innerHTML = `目前计数:${data}`
    },
    error: message => {
      statusTxt.innerHTML = `目前状态:错误 -> ${message}`
    },
    complete: () => {
      statusTxt.innerHTML = '目前状态:完成'
    }
  });
  const evenCounter$ = counter$.pipe(
    filter(data => data % 2 === 0)
  )
  evenCounter$.subscribe(data => {
    evenTxt.innerHTML = `偶数计数:${data}`;
  });
  counter$.next(counter);
});
fromEvent(countButton, 'click').subscribe(() => {
  counter$.next(++counter);
});
  1. 处理[错误]按钮,实际上就是处理counter/$的error函数
fromEvent(errorButton, 'click').subscribe(() => {
  const err = prompt('请输入错误信息');
  counter$.error(err || 'error');
});
  1. 处理[完成计数]按钮,实际上就是处理counter/$的complete函数
fromEvent(doneButton, 'click').subscribe(() => {
  counter$.complete();
});

完整代码可参考stackblitz.com/edit/rxjs-c…

常用Operator

不需要所有都掌握,需要用的时候再查即可

of

of操作符就是将传进去的值转为Observable

import { of } from 'rxjs';
of([1,2,3]).subscribe(v=>console.log(v))
of('hello,lyllovelemon').subscribe(v=>console.log(v))
of(1,2,3,4).subscribe(v=>console.log(v))

// 输出结果
[1,2,3]
hello,lyllovelemon
1
2
3
4

range

range就是将一个范围的值转为Observable,接收两个参数:

  • start 从哪个数值范围开始
  • count 范围大小

range(3,4).subscribe(v=>console.log(v))

// 输出结果
3
4
5
6

iif

iif表示根据条件判断会产生怎样的Observable,类似于三元表达式,接收三个参数:

  • condition(Function) 判断条件,接收一个函数
  • trueResult 条件为真时返回的Observable
  • falseResult 条件为假时返回的Observable
import { iif,of,EMPTY } from 'rxjs';

// 只有值为偶数时才输出
const consoleWhenOdd = (data)=>{
  // EMPTY操作符表示不执行任何操作
  return iif(() => data%2 === 0,of('console '+data),EMPTY)
}
consoleWhenOdd(1).subscribe(data=>console.log(data))
consoleWhenOdd(2).subscribe(data=>console.log(data))

// 输出结果
console 2

throwError

throwError只有在Observable发生错误时才能使用,因此要和error函数搭配使用

import { throwError } from 'rxjs';
const source$ = throwError('error')
source$.subscribe({
  next:(data)=>console.log(data),
  error:(err)=>console.log(err),
  complete:()=>console.log('done')
})

// 输出结果
error

interval

等价于setInterval,会按照设定时间(单位毫秒)来执行Observable

import { interval } from 'rxjs';
interval(1000).subscribe(data => console.log(`${data}`));
//输出结果
0
1
2
3
4
5
...(每隔1s执行一次console)

我们可以通过unsubscribe方法取消定时器

import { interval } from 'rxjs';
const subscription = interval(1000)
  .subscribe(data => console.log(data));

setTimeout(() => {
  subscription.unsubscribe();
}, 5500);

// 输出结果
0
1
2
3
4

timer

和setTimeout相似,会按照设定时间(单位毫秒)来执行Observable,但是它有两个参数:

  • afterMillseconds 会在多少毫秒后开始事件
  • millseconds 以什么频率计时(单位毫秒),第2个参数不传时表示在afterMillseconds毫秒后开始事件,只执行1次,接下来事件就不会继续执行了
import { timer } from 'rxjs';
timer(3000, 1000)
  .subscribe(data => console.log(data));
  
// 输出结果
// 等待3s后
0
1
2
3
...
每隔1s输出一次

它的缺点是要等待afterMillseconds毫秒后才会执行,可以将第一个参数设置为0,表示立即开始执行事件

defer

用于延迟执行Observable

import { of,defer } from 'rxjs';
const factory = () => of(1, 2, 3);
const source$ = defer(factory);
source$.subscribe(data => console.log(data));

// 输出结果
1
2
3

from

用于将事件转换为Observable,可接收的参数包括数组、Promise、Observable、iterable等,和of很相似,区别在于from会将数组内容一个个传递给订阅的观察者

传递数组

import { from } from 'rxjs';
from([1,2,3]).subscribe(data => console.log(data));

// 输出结果
1
2
3

传递iterator函数

import { from } from 'rxjs';
function* range(start, end) {
  for(let i = start; i <= end; ++i){
    yield i;
  }
}

from(range(1, 4)).subscribe(data => {
  console.log(`from: ${data}`);
});
// 输出结果
from:1
from:2
from:3
from:4

传递Promise参数

import { from } from 'rxjs';

from(Promise.resolve('lyllovelemon')).subscribe(data => {
  console.log(data);
});

// 输出结果
lyllovelemon

传递Observable

from(of([1,2,3,4])).subscribe(data => {
  console.log(data);
});

// 输出结果
[1,2,3,4]

fromEvent

fromEvent可以将事件包装为Observable,接收两个参数:

  • target 要监听的DOM元素
  • eventName 事件名,比如click、mouseup、mousedown等
import { fromEvent } from 'rxjs';

fromEvent(document,'click').subscribe(e=>{
  console.log('click',e)
})

// 输出结果(点击时触发)
click
click PointerEvent:{isTrusted:true}

fromEventPattern

fromEventPattern可以根据自定义逻辑决定事件发生,接收两个参数:

  • subscribeFn(Function) 订阅事件时触发的事件
  • unSubscribeFn(Function) 取消订阅时触发事件
import { fromEventPattern } from 'rxjs';

const addHandler = (handler) =>{
  console.log('自定义订阅')
  document.addEventListener('click',e => handler(e))
}

const removeHandler = (handler) =>{
  console.log('自定义取消订阅')
  document.removeEventListener('click',e => handler(e))
}
const target$ = fromEventPattern(addHandler,removeHandler)
const subscription = target$.subscribe(e=>{
  console.log('鼠标点击')
})  
setTimeout(()=>{
  subscription.unsubscribe() 
},1000)

// 输出结果
自定义订阅
自定义取消订阅 //1s后输出

concact

concat可以将多个Observable组合成一个Observable

目前定义了三个Observable

import { of } from 'rxjs';

const event$ = of(1,2,3)
const event2$ = of(4,5,6)
const event3$ = of(7,8,9)

目前需求是event$执行完毕再执行event2$,event2$执行完毕才执行event3$,可以这么写

event$.subscribe({
  next:data=>console.log(data),
  complete:()=>event2$.subscribe({
    next:data=>console.log(data),
    complete:()=>event3$.subscribe({
      next:data=>console.log(data)
    })
  })
})

// 输出结果
1
2
3
4
5
6
7
8
9

这种写法会造成深层嵌套,代码逻辑一多可维护性就变差,因此可以用concat写法替换

import { concat,of } from 'rxjs';

const event$ = of(1,2,3)
const event2$ = of(4,5,6)
const event3$ = of(7,8,9)

concat(event$,event2$,event3$).subscribe(e=>{
  console.log(e)
})
// 输出结果和上面一样
1
2
3
4
5
6
7
8
9

merge

merge和concat很相似,也是将多个Observable组合成一个Observable,区别在于merge是并行处理

event$每隔1秒调用一次,event2$每隔3秒调用一次,event3$每隔5秒调用一次

merge将三个Observable合并为一个,并在5秒后取消订阅

import { interval,merge,map } from 'rxjs';

const event$ = interval(1000).pipe(
  map(data=>`event data:${data}`)
)
const event2$ = interval(3000).pipe(
  map(data=>`event2 data:${data}`)
)
const event3$ = interval(5000).pipe(
  map(data=>`event3 data:${data}`)
)

const subscription = merge(event$,event2$,event3$).subscribe(data=>{
  console.log('merge '+data)
})
setTimeout(()=>{
   subscription.unsubscribe()
},5000)

// 输出结果
// event$会被调用5次,event2$和event3$只会被调用1次
merge event data:0
merge event data:1
merge event data:2
merge event2 data:0
merge event data:3
merge event data:4
merge event3 data:0

zip

zip是拉链的意思,使用时会把多个Observable依次组合成一个队列,已经被组合过的就不会再次被组合

import { interval,zip,map } from 'rxjs';

const event$ = interval(1000).pipe(
  map(data=>`event data:${data}`) // event$每隔1s执行一次
)
const event2$ = interval(2000).pipe(
  map(data=>`event2 data:${data}`) // event2$每隔2s执行一次
)
const event3$ = interval(3000).pipe(
  map(data=>`event3 data:${data}`) // event3$每隔3s执行一次
)

const subscription = zip(event$,event2$,event3$).subscribe(data=>{
  console.log('merge '+data)
})
setTimeout(()=>{
   subscription.unsubscribe() // 6s后取消订阅
},6000)

// 输出结果
// 第1个3秒后
merge event data:0,event2 data:0,event3 data:0
// 第2个3秒后(6秒)
merge event data:1,event2 data:1,event3 data:1

从上,zip和interval组合时,返回的是花费最长时间返回结果的组合,event$和event2$都要等event3$执行完成后才一起输出

partition

partition用于将多个Observable按一定的规则拆分为一个Observable,接收两个参数:

  • source 来源Observable
  • condition(Function) 拆分条件,是一个函数,根据拆分条件判断是否拆分为Observable,条件为true时会被归到一条Observable,为false时会被拆分到另一条Observable
import {of,partition } from 'rxjs';
const source$ = of(1,2,3,4)

const [sourceEven$,sourceOdd$] = partition(
  source$,
  (data)=>data%2 === 0
)
sourceEven$.subscribe(data => console.log('偶数',data))
sourceOdd$.subscribe(data => console.log('奇数',data))

// 输出结果
偶数 2
偶数 4
奇数 1
奇数 3

combineLatest

combineLatest接收一个数组,它和zip很相似,但是zip会按顺序组合,combineLatest直接和最后一个事件组合

import {interval,combineLatest,map } from 'rxjs';
const sourceA$ = interval(1000).pipe(
  map(data => `A${data + 1}`)
);
const sourceB$ = interval(2000).pipe(
  map(data => `B${data + 1}`)
);
const sourceC$ = interval(3000).pipe(
  map(data => `C${data + 1}`)
);

const subscription = combineLatest([sourceA$, sourceB$, sourceC$])
	.subscribe(data => console.log(`combineLatest: ${data}`));


setTimeout(()=>{
  subscription.unsubscribe()
},6000)

// 输出结果
combineLatest: A3,B1,C1
combineLatest: A4,B1,C1
combineLatest: A4,B2,C1
combineLatest: A5,B2,C1
combineLatest: A6,B2,C1
combineLatest: A6,B3,C1
combineLatest: A6,B3,C2

forkJoin

forkJoin接收一个数组,它会同时订阅传入的Observable,直到每一个Observable结束后才将每个Observable最后返回的值组合起来

import {interval,forkJoin,map } from 'rxjs';
import { take} from 'rxjs/operators'
const sourceA$ = interval(1000).pipe(
  map(data => `A${data + 1}`),
  take(5)
);
const sourceB$ = interval(2000).pipe(
  map(data => `B${data + 1}`),
  take(4)
);
const sourceC$ = interval(3000).pipe(
  map(data => `C${data + 1}`),
  take(3)
);

forkJoin([sourceA$, sourceB$, sourceC$])
	.subscribe({
    next:data=>console.log('forkJoin '+ data),
    complete:()=>console.log('forkJoin done')
  });
  
// 输出结果
forkJoin A5,B4,C3
forkJoin done

这个方法可用于多次不同的HTTP请求想拿到最终结果(HTTP请求结果没有连续性)

race

race本身就有竞速的意义,因此它接收的参数是多个Observable。当订阅发生时,多个Observable同时进行,获取的结果是最快获得结果的那一个Observable,有点类似于Promise.race方法

import {interval,race,map } from 'rxjs';
import { take} from 'rxjs/operators'
const raceA$ = interval(1000).pipe(
  map(data => `A${data + 1}`),
  take(5)
);
const raceB$ = interval(2000).pipe(
  map(data => `B${data + 1}`),
  take(4)
);
const raceC$ = interval(3000).pipe(
  map(data => `C${data + 1}`),
  take(3)
);

race([raceA$, raceB$, raceC$])
	.subscribe(data=>console.log('race '+ data),
  );
  
// 输出结果
A1
A2
A3
...(每隔1秒输出一次)

上面会返回raceA/$的结果,因为它是最先返回的

map

使用频率很高,类似于数组方法的map,可以把Observable每次事件的值换成另外的值

import {of,map } from 'rxjs';
const source$ = of(1,2,3).pipe(
  map(v=>v*2)
).subscribe(v=>console.log(v))

// 输出结果
2
4
6

map还可以传入第二个参数index,表示当前的Observable的序号(从0开始)


import {of,map } from 'rxjs';
const source$ = of(1,2,3).pipe(
  map((v,i)=>`第${i}次结果为${v*2}`)
).subscribe(v=>console.log(v))

// 输出结果
0次结果为2
1次结果为4
2次结果为6

scan

scan接收两个参数:

  • 累加函数
  1. 【acc】 目前的累加值
  2. 【value】 目前事件值
  3. 【index】目前事件index
  • 初始值

在被Observable订阅时,起始结果为初始值,和Array的reduce函数很像

import {of,scan } from 'rxjs';

const arr = [2,4,6,8]

const sum$ = of(...arr).pipe(
  scan(
    (acc,value)=>acc+value, // 累加函数
    0 // 初始值
  )
).subscribe(v=>console.log(v))

// 输出结果
2
6
12
20

pairwise

pairwise可以将Observable的结果成双成对的输出,它返回的结果就是两个为一组

import {of,pairwise } from 'rxjs';

const arr = [2,4,6,8]

const sum$ = of(...arr).pipe(
  pairwise()
).subscribe(v=>console.log(v))

// 输出结果
[2,4]
[4,6]
[6,8]

使用场景

Rxjs可以极大简化异步场景,当异步处理逻辑足够复杂时,上Rxjs可以方便不少

  1. 应付面试

字节前端原题:

实现一个并发请求函数concurrencyRequest(urls, maxNum),要求如下: 
• 要求最大并发数 maxNum 
• 每当有一个请求返回,就留下一个空位,可以增加新的请求 
• 所有请求完成后,结果按照 urls 里面的顺序依次打出(发送请求的函数可以直接使用fetch即可)
  1. 微前端场景

Rxjs的subject可以实现微前端应用通信(发布订阅模式)

import {Subject} from "rxjs";
const main = new Subject();

main.subscribe(v=>{
    console.log(`主应用监听到子应用消息:${v}`);
});

const msg = {
  data:{},
  main
}

// qiankun注册子应用
export const registerMicroApps = [
  {
    name: 'test',
    entry: qiankunEntryConfig(test),
    container: '#appContainer',
    activeRule: '/micro-test',
    loader (loading) {},
    props: { store, pager: msg, parentRoute: router }
  },
]

  1. Angular

用Angular的不可避免要使用Rxjs,用于状态管理和组件通信

  1. Nest

Nest的interceptor(拦截器)集成了Rxjs,用于处理响应,主要用到了以下方法:

  • tap 用于添加日志、缓存处理逻辑
  • catchError 用于处理抛出异常
  • timeOut 用于处理超时异常,通常和catchError一起使用
import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
import { Observable,TimeoutError,catchError,throwError,timeout,tap,of } from 'rxjs';

@Injectable()
export class ErrorTestInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    console.log('errorTest start')
    return next.handle().pipe(
      timeout(3000),// 请求超时3秒后
      catchError((err)=>{
        // 错误类型是超时错误 
        if(err instanceof TimeoutError){
          // 输出timeout
          return of('Timeout')
        }
        return throwError(err)
      }),
      tap((data)=>{
        console.log('data',data)
        }
      )
    );
  }
}

我们通过tap、catchError、timeout组合定义了一个ErrorTestInterceptor,在App.controller.ts中使用

import { Controller, Get, UseInterceptors } from '@nestjs/common';
import { AppService } from './app.service';
import { ErrorTestInterceptor} from './error-test.interceptor';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}
  
  // 定义了/test路径下的get请求接口
  @Get('test')
  // 使用了ErrorTestInterceptor拦截器
  @UseInterceptors(ErrorTestInterceptor)
  async test(){
    // 具体实现
    await new Promise(resolve=>setTimeout(()=>{
      console.log('hahaha')
    },4000))
    return 'test'
  }
}

访问对应接口,控制台输出结果为

errorTest start
data Timeout
hahaha
  1. 异步场景
  • 简化Ajax/XHR/Fetch/Axios
  • Websocket通信
  • Promise/setInterval/SetTimeout

后记

Rxjs估计没啥人看,随缘看点赞,点赞数过百就写下篇