前端竞态条件与RxJS
竞态条件
竞争冒险(race hazard)又名竞态条件、竞争条件(race condition),它旨在描述一个系统或者进程的输出依赖于不受控制的事件出现顺序或者出现时机。此词源自于两个信号试着彼此竞争,来影响谁先输出。
对于前端来说,就是异步操作完成的顺序不一定和他们开始的顺序相匹配,举个🌰
- 有一个搜索框,只要输入文字就会触发搜索,先后输入两个关键字,触发两次请求
- 由于网络问题或分词搜索效率问题,可能先触发的请求后返回
- 导致页面渲染的是第一次输入的关键词的返回值
这种问题常见于前端单页应用的分页,搜索,选项卡切换等场景中那么怎么解决竞态问题?
避开问题
通过节流或防抖保证一段时间内只会出现一次请求从而避免竞态条件的触发。然而这种方案在大多数情况下体验不太好,并且如果第一次请求的时间大于防抖/节流时长 + 第二次请求的时间,依然会触发竞态条件。
当新请求发出时,取消掉上次的请求
XMLHttpRequest
XHR 对象提供了 abort 方法来取消请求
const xhr = new XMLHttpRequest();
const method = "GET";
const url = "https://developer.mozilla.org/";
xhr.open(method, url, true);
xhr.send();
if (OH_NOES_WE_NEED_TO_CANCEL_RIGHT_NOW_OR_ELSE) {
xhr.abort();
}
Fetch API
需要使用AbortController来取消fetch请求
let controller;
const url = "video.mp4";
const downloadBtn = document.querySelector(".download");
const abortBtn = document.querySelector(".abort");
downloadBtn.addEventListener("click", fetchVideo);
abortBtn.addEventListener("click", () => {
if (controller) {
controller.abort(); // 根据标识来取消请求
console.log("Download aborted");
}
});
function fetchVideo() {
controller = new AbortController();
const signal = controller.signal; // 发起新的请求时新建一个标识
fetch(url, { signal })
.then((response) => {
console.log("Download complete", response);
})
.catch((err) => {
console.error(`Download error: ${err.message}`);
});
}
常用的请求库如 axios 也是基于XHR或fetch进行cancel方法的封装,详情可查阅API文档
在某种程度上能减轻服务端压力
给每次请求打标,只处理最新的请求
import React, { useRef } from 'react';
export default function useResolveLastReq(fn) {
const id = useRef(0); // 标识每次请求
const wrappedFn = async (...args) => {
const curId = id.current + 1; // 每次请求的ID
id.current = curId; // 最新的请求ID
// 执行请求
const data = await fn.apply(this, args);
try {
if (curId === id.current) {
return data;
}
} catch (e) {
if (curId === id.current) {
throw e;
}
}
};
return wrappedFn;
}
利用了闭包,上下文中 curId 和 id.current 可能不同。这种全局变量和局部变量的混用 + 异步,对于不熟悉前端异步机制的人可能很头疼
使用RxJS
const target = useRef(null); // 指向搜索按钮
useEffect(() => {
if (target) {
const subscription = fromEvent(target.current, 'click') // 可观察的点击事件流
.pipe(
debounceTime(300), // 对事件流防抖
switchMap(cur => // 将事件流转换成请求流,当有新的请求开始产生数据,停止观察老的请求
from( // promise --> Observable
postData(
'http://localhost:3333/data/api.json?action=DescribeInstances&product=wind-demo',
{
name: keyword,
},
),
),
),
map(cur => cur?.data?.name),
tap(result => { // 处理副作用
setData(result);
}),
)
.subscribe(); // 触发
return () => subscription.unsubscribe();
}
}, [target, keyword]);
模式统一,可读性高
由实例代码我们可以看到引入了RxJS后,代码中有几个要素。
- 通过fromEvent创建的事件流
- debounceTime,switchMap, map, tap 等纯函数对原始的事件流进行处理和变换
- subscription 代表一次订阅,可以调用unsubscribe() 方法结束订阅
RxJS
Functional Programing
核心思想是做运算,并用function来思考问题,举个简单🌰
(5-1)+3*6; // statement 表现某个行为
const add = (a, b) => a + b;
const mul = (a, b) => a * b;
const sub = (a, b) => a - b;
add(sub(5, 1), mul(3, 6)); // expression 表达式是一个运算过程,一定有返回值
纯函数
调用一个function, 入参相同,出参一定相同,没有副作用;Array.prototype.slice() 每次调用不改变原数组Array.prototype.splice() 每次调用改变原数组,不为纯函数
可读性高、可维护性高、易于并发处理
Reactive Programing
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
当数据发生变化时,自动通知订阅方
核心思想的转变:数据调用方拉取数据 --> 数据生产者推送数据
举个🌰:
// hooks
useEffect(() => {
// 代码
}, [state])
// class
componentDidUpdate(prevProps, prevState) {
if(this.state.val !== prevState.val) {
// 代码
}
}
Observable --- 核心概念
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => { // 创建一个惰性推送的集合
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe({ // 订阅数据流
next(x) {
console.log('got value ' + x);
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
},
});
console.log('just after subscribe');
ReactiveX 还结合了观察者模式、迭代器模式
拉取 (Pull) vs. 推送 (Push)
拉取和推送是两种不同的协议,用来描述数据生产者 (Producer)如何与数据消费者 (Consumer)进行通信的。
拉取
由消费者来决定何时从生产者那里接收数据。生产者本身不知道数据是何时交付到消费者手中的。
每个 JavaScript 函数都是拉取体系。函数是数据的生产者,调用该函数的代码通过从函数调用中“取出”一个单个返回值来对该函数进行消费。ES2015 引入了 generator 函数和 iterators (function*),这是另外一种类型的拉取体系。调用 iterator.next() 的代码是消费者,它会从 iterator(生产者) 那“取出”多个值。
推送
在推送体系中,由生产者来决定何时把数据发送给消费者。消费者本身不知道何时会接收到数据。
Promise 是最常见的推送体系类型。Promise(生产者) 将一个解析过的值传递给已注册的回调函数(消费者),但不同于函数的是,由 Promise 来决定何时把值“推送”给回调函数。
RxJS 引入了 Observables,一个新的 JavaScript 推送体系。Observable 是多个值的生产者,并将值“推送”给观察者(消费者)。
Function --- 惰性运算,调用时会同步地返回单一值。Generator --- 惰性运算,调用时会同步地返回零到无限多个值。Promise --- 最终可能(或可能不)返回单个值的运算。Observable --- 惰性运算,被调用的时刻起同步或异步地返回零到(有可能的)无限多个值。
Observable vs. EventEmitter
import { Observable } from 'rxjs';
const foo = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
});
foo.subscribe((x) => {
console.log(x);
});
foo.subscribe((y) => {
console.log(y);
});
const events = require('events');
const emitter = new events.EventEmitter();
emitter.on('someEvent', function(arg1, arg2) {
console.log('listener1', arg1, arg2);
});
emitter.on('someEvent', function(arg1, arg2) {
console.log('listener2', arg1, arg2);
});
emitter.emit('someEvent', 'arg1 参数', 'arg2 参数');
在EventEmitter中,无论是否存在订阅者,事件都会进行广播,且所有订阅者都共享事件的广播而在Observable中,只有当订阅者存在(即调用subscribe方法),数据流才会触发。并且两个不同的订阅并不共享同一个数据流
创建Observable
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {
subscriber.next('hi');
}, 1000);
});
import { fromEvent } from 'rxjs';
const clicks = fromEvent(document, 'click');
clicks.subscribe(x => console.log(x));
除了通过new Observable 方式,一般更为常见的方式是使用operator (of, fromEvent, interval)创建数据流
订阅Observable
observable.subscribe((x) => console.log(x));
当调用 observable.subscribe 时,生产数据流的代码才会被触发,且不同的订阅不共享数据流
执行Observable
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
} catch (err) {
subscriber.error(err); // delivers an error if it caught one
}
});
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // Is not delivered because it would violate the contract
});
const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err), // 处理数据流中的错误
complete: () => console.log('Observer got a complete notification'), // 数据流完成
};
observable.subscribe(observer);
终止Observable
import { from } from 'rxjs';
const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later:
subscription.unsubscribe();
Subject
是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
Scheduler
可以改变数据流推送的顺序
import { Observable, observeOn, asyncScheduler } from 'rxjs';
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x);
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
},
});
console.log('just after subscribe');
参考资料
转载自:https://juejin.cn/post/7203294201313034301