likes
comments
collection
share

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

作者站长头像
站长
· 阅读数 20

前言:

这篇文章讲的是rxjs高频的应用场景,每个场景都会有相应的代码。不熟悉rxjs的读者也可以粗略了解下 rxjs 的应用场景,先收藏一下,说不定哪天就遇到了使用 rxjs 的时候。

本文分几个应用场景来展开说说:

  1. 构建单向数据流
  2. 响应式编程
  3. 组合数据源
  4. 缓存:订阅后取出订阅前发出的数据
  5. 处理异步和并发
  6. 声明式编程

1. 构建单向数据流

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

在单向数据流中,数据只能从一个方向流向另一个方向,不存在逆流的情况。这种模式有助于简化数据的管理和状态的维护,提高代码的可维护性和可预测性。UI 框架有两个例子很好的表达了单向数据流的思想。

  1. 在 vue 中,把可以从 某个state 计算出的数据作为的计算属性,而不是作为新的 state。
  2. 在 react 的父子组件之间,子组件收到来自父组件的prop后,不会去修改prop,而是直接使用。当prop发生变化时,它们会使用接收到的新prop。

而在rxjs中,订阅者通过订阅observable/数据源,确保了数据的来源是唯一的,很容易就构建了单向数据流。如下图

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

数据流动不单向的现象就是状态与状态之间相互修改:用状态A去计算出状态B,又用状态B计算出状态A。这样的代码难以理解,无法维护。

造成数据流不单向的原因往往是多个模块获取到同一来源的数据后,又各自以此数据为基础维护了各自的状态。这些状态都有不一样的更新逻辑。当这些状态有计算关系时,往往无意间就会造成数据流向交叉、不单向。

而这种多状态的情况往往出现在事件驱动的系统中(以事件作为模块之间的协作方式)-- 数据是从事件中传递出来的,导致每个模块拿到数据后还得保存作为自己状态,然后手动的去同步状态。

下节讲到的响应式编程,可以有效避免这种多状态的情况。

2. 响应式编程,直接监听数据变化

响应式编程与事件驱动编程的关系

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

响应式编程 是基于事件驱动编程的方式来处理 数据流

事件驱动编程中,被观察者(observable)发出的是事件(event)。在响应式编程中,被观察者(observable)发出的是数据(data)。

所以二者发送数据的方式有点区别,在事件驱动编程中,我们经常把数据作为事件的 payload,通过事件去传递数据,观察者监听到事件时,再从事件中取出数据。而在响应式编程中,数据是直接“响应”的,观察者直接监听到数据的变化,拿到最新的数据。所以响应式编程的响应数据的特性可以减少手动处理和更新数据的工作量。

对比命令式编程,能实现关注点分离

因为事件驱动编程能将不同的关注点分离开来,而响应式编程是基于事件驱动编程的,所以响应式编程自然也能实现关注点分离。

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

使用rxjs前:代码耦合、状态过多

接下来让我们来设想一个简单的场景:假设我们有一个网页应用,用于输入和显示不同货币的转换结果。

这个应用包含以下几个模块:

1. 输入模块:用户输入一个金额和选择货币类型。

2. 转换模块:对输入的金额进行货币转换。

3. 显示模块:显示转换后的结果。

4. 日志模块:记录每次转换的历史记录。

用户在输入模块中输入金额并选择货币类型,转换模块会监听这个输入事件进行转换,显示模块会监听转化结果的事件来更新显示,日志模块会记录转换记录。然而,这些模块独立维护状态,并手动同步,导致状态管理复杂和数据流动混乱。

// 输入模块
class InputModule {
  constructor() {
    this.amount = 0;
    this.currencyType = 'USD';
    this.listeners = [];
  }

  onInputChange(callback) {
    this.listeners.push(callback);
  }

  updateAmount(newAmount) {
    this.amount = newAmount;
    this.notifyListeners();
  }

  updateCurrencyType(newCurrencyType) {
    this.currencyType = newCurrencyType;
    this.notifyListeners();
  }

  notifyListeners() {
    this.listeners.forEach(callback => callback(this.amount, this.currencyType));
  }
}

// 转换模块
class ConversionModule {
  constructor(displayModule, loggingModule) {
    this.amount = 0;
    this.currencyType = 'USD';
    this.convertedAmount = 0;
    this.displayModule = displayModule;
    this.loggingModule = loggingModule;
  }

  onInputChange(amount, currencyType) {
    this.amount = amount;
    this.currencyType = currencyType;
    this.convert();
  }

  convert() {
    // 模拟转换逻辑
    this.convertedAmount = this.amount * (this.currencyType === 'USD' ? 0.85 : 1.15);
    this.updateDisplay();
       this.logConversion();
  }

  updateDisplay() {
    this.displayModule.update(this.convertedAmount);
  }

  logConversion() {
    this.loggingModule.log(this.amount, this.currencyType, this.convertedAmount);
  }
}

// 显示模块
class DisplayModule {
  constructor() {
    this.displayValue = 0;
  }

  update(value) {
    this.displayValue = value;
    console.log(`显示转换结果: ${this.displayValue}`);
  }
}

// 日志模块
class LoggingModule {
  log(amount, currencyType, convertedAmount) {
    console.log(`记录转换: ${amount} ${currencyType} 转换为 ${convertedAmount}`);
  }
}

// 实例化所有模块
const displayModule = new DisplayModule();
const loggingModule = new LoggingModule();
const conversionModule = new ConversionModule(displayModule, loggingModule);
const inputModule = new InputModule();

// 输入模块监听输入变化
inputModule.onInputChange((amount, currencyType) =>     
    conversionModule.onInputChange(amount, currencyType));

// 用户的交互
inputModule.updateAmount(100);
inputModule.updateCurrencyType('EUR');

尽管输入的数据来源是相同的,但每个模块都需要手动维护状态和同步状态,这么做会造成以下问题:

  1. 复杂的状态管理​:每个模块需要维护和更新自己的状态,增加了代码的复杂性,容易出错。比如代码中 ConversionModule ,维护了5个状态。
  2. 手动同步状态​:模块间需要手动传递和更新数据,容易导致数据不一致和同步错误。
  3. 存在数据流动不单向的风险​:数据在模块之间相互传递,各自保存,难以跟踪数据的变化。
  4. 关注点不分离​:在 ConversionModule 还要加入调用 DisplayModule、loggingModule 方法的代码,属于命令式编程,关注点不够分离。

使用rxjs后:简化状态、单向数据流、关注点分离

改造后的数据流动图:

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

import { BehaviorSubject, combineLatest, withLatestFrom } from 'rxjs';
import { map } from 'rxjs/operators';

// 输入模块
class InputModule {
  constructor() {
    this.amount$ = new BehaviorSubject(0);
    this.currencyType$ = new BehaviorSubject('USD');
  }

  updateAmount(newAmount) {
    this.amount$.next(newAmount);
  }

  updateCurrencyType(newCurrencyType) {
    this.currencyType$.next(newCurrencyType);
  }
}

// 转换模块
class ConversionModule {
  constructor(inputModule) {
    this.convertedAmount$ = combineLatest([inputModule.amount$, inputModule.currencyType$]).pipe(
      map(([amount, currencyType]) => this.convert(amount, currencyType))
    );
  }

  convert(amount, currencyType) {
    // 模拟转换逻辑
    return amount * (currencyType === 'USD' ? 0.85 : 1.15);
  }
}

// 显示模块
class DisplayModule {
  constructor(conversionModule) {
    this.subscription = conversionModule.convertedAmount$.subscribe(value => this.update(value));
  }

  update(value) {
    console.log(`显示转换结果: ${value}`);
  }
}

// 日志模块
class LoggingModule {
  constructor(inputModule, conversionModule) {
    this.subscription = conversionModule.convertedAmount$.pipe(
      withLatestFrom(inputModule.amount$, inputModule.currencyType$)
    ).subscribe(([convertedAmount, amount, currencyType]) => this.log(amount, currencyType, convertedAmount));
  }

  log(amount, currencyType, convertedAmount) {
    console.log(`记录转换: ${amount} ${currencyType} 转换为 ${convertedAmount}`);
  }
}

// 实例化所有模块
const inputModule = new InputModule();
const conversionModule = new ConversionModule(inputModule);
const displayModule = new DisplayModule(conversionModule);
const loggingModule = new LoggingModule(inputModule, conversionModule);

// 用户交互
inputModule.updateAmount(100);
inputModule.updateCurrencyType('EUR');

对比使用rxjs前的优点:

  • 单向数据流:数据从输入模块流向转换模块,再流向显示和日志模块,确保了数据的单向流动。
  • 简化的状态管理:利用 RxJS 的可观察对象和运算符管理状态和数据流,减少了手动同步的复杂性。
  • 实现关注点分离:利用 RxJS 的响应式编程模型,使得各模块可以自动响应数据变化。每个模块都只关注自己的部分,它们不需要关注其他模块如何获取数据、如何处理数据。比如 InputModule 只负责获取用户的输入,当有新的输入时,它只需要将新的数据传递给 BehaviorSubject ,而不需要关注这个数据会被如何使用。ConversionModule 则只关注如何将输入的金额进行转换,它不需要关心这个金额是如何获取的,也不需要关心转换结果会如何显示和记录。这样,每个模块都只需要关注自己的任务,从而实现了关注点的分离。

3. 组合数据源:多个数据/事件来源为一个数据流

在许多情况下,对同一数据的更新调用,可能发生在代码的多个地方,如果采用命令式编程的方式,这使得阅读、修改这个数据更新流程的代码变得困难。因此,问题的排查和新功能的迭代也变得相当麻烦。

举一个例子,在一个实时协作文本编辑器中,造成文本差异(diff)的来源是多样且复杂的,尤其在多人协作的环境中。以下是diff来源:

  • 本地用户按键输入
    • 常规按键输入
    • 删除操作
    • 文本格式修改
    • 剪切、复制和粘贴操作
  • 协作用户推送
    • 解决冲突并且合并
  • 网络断开重连后的同步操作
    • 解决冲突并且合并
  • 其他:版本管理(撤回、重做)、导入文件等等

画一下文本差异(diff)的输入输出的流程图如下:

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

使用rxjs前:以命令式的方式直接调用

以下是伪代码

// 常规按键输入
editor.addEventListener('keypress', () => {
    handleKeyPress()
    processDiff()
);

// 删除操作
editor.addEventListener('delete', () => {
    handleDelete()
    processDiff()
});

// 文本格式修改
editor.addEventListener('formatChange', () => {
    handleFormatChange()
    processDiff()
});

// 剪切、复制和粘贴操作
editor.addEventListener('cutCopyPaste', () => {
    handleCutCopyPaste()
    processDiff()
});

// 协作用户推送
collaborativeService.addEventListener('sync', () => {
    handleSync()
    handleConflict()
});


// 网络断开重连后的同步操作
collaborativeService.addEventListener('reconnectSync', () => {
    handleReconnectSync()
    handleConflict()
});

functionhandleConflict() {
    // ...处理冲突
    processDiff()
}


functionprocessDiff(diff) {
// 处理具体的变更逻辑
console.log('Processing change:', diff);
updateDocument(diff); // 文本更新函数
}

可以看到,如果命令式地在每个数据diff源中去调用,每多一个数据源,就可能要调用一次processDiff。比较难维护,随着数据源地增加、代码量的增加,很容易造成更新逻辑分散、更新时机随意。

使用rxjs后:用响应式的方式写

数据流动图 到底什么时候用得上rxjs,总结6个高频rxjs应用场景

以下是伪代码

// 常规按键输入
const localKeyPress$ = fromEvent(editor, 'keypress').pipe(handleKeyPress);
// 删除操作 
const localDelete$ = fromEvent(editor, 'delete').pipe(handleDelete);
// 文本格式修改
const localFormatChange$ = fromEvent(editor, 'formatChange').pipe(handleFormatChange);
// 剪切、复制和粘贴操作
const localCutCopyPaste$ = fromEvent(editor, 'cutCopyPaste').pipe(handleCutCopyPaste);  

// 本地用户按键输入数据源的合并
const localInput$ = merge(
    localKeyPress$,
    localDelete$,
    localFormatChange$,
    localCutCopyPaste$,
);

// 协作用户推送
const collaborativeSync$ = fromEvent(collaborativeService, 'sync'); 

// 网络断开重连后的同步操作
const networkReconnectSync$ = fromEvent(networkService, 'reconnectSync');  

// 将可能需要解决冲突的数据源合并
const conflictResolution$ = merge(
    collaborativeSync$,
    networkReconnectSync$,
).pipe(hanldeConflict);

// 合并所有事件源
const allChanges$ = merge(
    conflictResolution$,
    localInput$,
);

// 订阅并且处理文本diff
const allChanges$.subscribe(functionprocessDiff(diff) {
    // 处理具体的变更逻辑
    console.log('Processing change:', diff);
    updateDocument(diff); // 文本更新函数
});

而通过 rxjs 的 merge 操作符来汇总不同的数据源,合并成一个数据流,就像所有支流汇进一个江河一样,让数据的输入输出被组织、被监控。 不仅数据的来源非常清晰,也更容易维护和迭代。在merge函数中可以看到所有的数据来源,每次有新的数据源也只需在 merge 中添加。 同时也更加方便日志建设和排除问题。

4. 缓存:订阅后取出订阅前发出的数据

开发经常遇到发出事件发出早于订阅,结果订阅者错过了这条事件,rxjs的 ReplaySubject、BehaviorSubject 都很适合解决这一类问题,一订阅就可以取出数据流中的前几次数据。另外,操作符 shareReplay 也可以将普通的 obserable 流转成可缓存的 obserable 流。

使用rxjs前:手动缓存数据,额外的变量

如果不使用 RxJS,而是手动来实现类似功能,需要额外的一个数据结构(例如数组、队列等)来缓存数据,和额外的逻辑来同步数据,增加了代码复杂度。

这里就不举例子了。

使用rxjs后:直接从流取出缓存

ReplaySubject 是 RxJS 中的一种 Subject,它可以缓存并向新的订阅者回放以前发出的值,每次订阅的时候再拿出来。它特别适用于需要缓存之前的数据,并希望确保新订阅者在订阅时可以接收到它们的场景。

Rxjs 弹珠图如下:subscriber1在ReplaySubject发出值前订阅,subscriber2在ReplaySubject发出值后订阅,二者都能拿到ReplaySubject最近发出的值。

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

以下是一个使用 ReplaySubject 的例子:

模拟股市价格更新的场景,新订阅者需要获取最近三次的价格更新。

import { ReplaySubject } from 'rxjs';

class StockPriceNotifier {
  constructor(bufferSize = 3) {
    this.priceSubject$ = new ReplaySubject(bufferSize);
  }

  // 更新股票价格,存入缓存,并通知所有订阅者
  updatePrice(newPrice) {
    this.priceSubject$.next(newPrice);
  }

  // 添加新的订阅者,并将缓存的数据发送给他们
  subscribe(subscriber) {
    const subscription = this.priceSubject$.subscribe(subscriber);
    return subscription;
  }
}

// 使用示例
const notifier = new StockPriceNotifier();

// 模拟股市价格更新
notifier.updatePrice(100);
notifier.updatePrice(101);
notifier.updatePrice(102);
notifier.updatePrice(103);

// 第一个订阅者,应该会收到最近的3个价格
const subscriber1 = price => console.log(`订阅者1 价格: ${price}`)
notifier.subscribe(subscriber1);

// 新的一次价格更新
notifier.updatePrice(104);

// 第二个订阅者,应该收到最近的3个价格 (102, 103, 104)
const subscriber2 = price => console.log(`订阅者2 价格: ${price}`)
notifier.subscribe(subscriber2);

// 再次更新价格
notifier.updatePrice(105);

// 现在第一个和第二个订阅者都会收到最新的价格更新

5. 处理异步和并发:解决异步事件交叉而引发的隐秘、偶现的时序问题

事件驱动编程不能很好地应对复杂繁多的异步逻辑,容易产生许多在时空上分散的切面。面对这类复杂问题/场景,使用rxjs能很好地去梳理异步代码逻辑。

1. 竞态问题

所谓竞态问题(race condition),就是两个信号试着彼此竞争,互相影响谁先输出,导致无法保证异步操作的完成会按照他们开始时同样的顺序。

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

举一个例子,我们设计一个美金(USD)和欧元(EUR)实时转换的应用。美元与欧元之间,一个值的变化会引起另一个值的变化,变化前需要转换。每次转换时,我们通过网络请求获取最新的汇率,并使用这个汇率进行金额转换。

由于每次转换是有延迟的,就产生了竞态的问题。比如修改美元后马上修改欧元,二者先后发起请求,无法确定请求返回的快慢,所以无法确定最后的修改是来自美元还是欧元。

解决竞态问题最常见的方式是增加一个标识,在发生A事件时,忽略/取消其他有竞争关系的事件。下面我们先尝试使用这个方式来解决。

使用rxjs前:加上loading标识来处理并发

<!DOCTYPE html>
<html lang="en">

<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>实时汇率转换示例</title>
</head>

<body>
  <div>
    <label for="usd">美金 (USD): </label>
    <input type="number" id="usd" placeholder="输入美金">
  </div>
  <div>
    <label for="eur">欧元 (EUR): </label>
    <input type="number" id="eur" placeholder="输入欧元">
  </div>

  <script>
    const usdInput = document.getElementById('usd');
    const eurInput = document.getElementById('eur');

    let isUSDChanging = false;
    let isEURChanging = false;

    const fetchExchangeRate = (fromCurrency) => {
      return new Promise((resolve) => {
        setTimeout(() => {
          // 模拟一个返回汇率数据的API请求
          // 实际情况应该使用fetch或axios进行HTTP请求
          const exchangeRate = Math.random() * (1.2 - 0.8) + 0.8; // 模拟随机汇率
          resolve(exchangeRate);
        }, 1000);
      });
    };

    usdInput.addEventListener('input', async (event) => {
      isUSDChanging = true
      isEURChanging = false;
      const usdAmount = parseFloat(event.target.value);
      if (!isNaN(usdAmount)) {
        const exchangeRate = await fetchExchangeRate('USD');
        if (isUSDChanging) {
          eurInput.value = (usdAmount * exchangeRate).toFixed(2);
        }
      } else {
        eurInput.value = '';
      }
      isUSDChanging = false;
    });

    eurInput.addEventListener('input', async (event) => {
      isEURChanging = true
      isUSDChanging = false;
      const eurAmount = parseFloat(event.target.value);
      if (!isNaN(eurAmount)) {
        const exchangeRate = await fetchExchangeRate('EUR');
        if (isEURChanging) {
          usdInput.value = (eurAmount * exchangeRate).toFixed(2);
        }
      } else {
        usdInput.value = '';
      }
      isEURChanging = false;
    });
  </script>
</body>

</html>

这样的代码拥有 过多状态,导致状态管理复杂 :使用isUSDChangingisEURChanging标识来防止数据的竞态式更新,增加了状态管理的复杂性。如果逻辑处理不严格,容易导致更新循环,造成性能问题或错误。

过多的标识让代码 不具有可扩展性 ,假如后面要新增货币种类,比如日元、人民币,原来的代码还得要加上isJPYChanging、isCNHChanging等标识以及相关的判断。

使用rxjs后:一个操作符即可处理并发

switchMap 只关注最新的数据流,而忽略旧的数据流。这是switchMap的弹珠图,注意下面的30,被 50 给截断了。

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

接着用switchMap来解决这个问题。在使用switchMap之前,我们需要合并一下数据源。在用 switchMap 操作符处理合并后的数据流。

<!DOCTYPE html>
<html lang="en">

<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>实时汇率转换示例</title>
</head>

<body>
  <div>
    <label for="usd">美金 (USD): </label>
    <input type="number" id="usd" placeholder="输入美金">
  </div>
  <div>
    <label for="eur">欧元 (EUR): </label>
    <input type="number" id="eur" placeholder="输入欧元">
  </div>

  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.2.0/rxjs.umd.min.js"></script>
  <script>
    const { fromEvent, merge } = rxjs;
    const { map, switchMap, startWith } = rxjs.operators;

    const usdInput = document.getElementById('usd');
    const eurInput = document.getElementById('eur');

    // 模拟获取汇率的异步函数
    const fetchExchangeRate = (fromCurrency) => {
      return new Promise((resolve, reject) => {
        setTimeout(() => {
          const exchangeRate = Math.random() * (1.2 - 0.8) + 0.8; // 模拟随机汇率
          resolve(exchangeRate);
        }, 1000);
      });
    };

    // 转换函数(运用了策略模式)
    const convertCurrency = {
      USD: async (amount) => {
        const rate = await fetchExchangeRate('USD');
        return {
          usd: amount,
          eur: (amount * rate).toFixed(2)
        };
      },
      EUR: async (amount) => {
        const rate = await fetchExchangeRate('EUR');
        return {
          eur: amount,
          usd: (amount * rate).toFixed(2)
        };
      }
    };

    // 创建输入事件流并封装公共部分
    const createInputStream = (inputElement, currency) => {
      return fromEvent(inputElement, 'input').pipe(
        map(event => ({
          currency,
          value: parseFloat(event.target.value)
        })),
        startWith({
          currency,
          value: parseFloat(inputElement.value) || 0
        })
      );
    };

    const usdInput$ = createInputStream(usdInput, 'USD');
    const eurInput$ = createInputStream(eurInput, 'EUR');

    // 合并输入事件流
    const merged$ = merge(usdInput$, eurInput$);

    // 处理合并后的事件流
    merged$.pipe(
      switchMap(data => convertCurrency[data.currency](data.value))
    ).subscribe(result => {
      if (result.usd !== undefined) usdInput.value = result.usd;
      if (result.eur !== undefined) eurInput.value = result.eur;
    });
  </script>
</body>

</html>

2. 数据访问的时机不对

隐藏问题:误以为多个事件之间存在时序依赖

想象一个场景,模块C的计算依赖了 A、B 事件带的数据,于是模块C需要等待 A 、 B 事件都触发后时才计算。

而代码的现状这个 A 事件总是在 B 之前被触发,然后消费者也是这么认为的(可能是写代码的时候偷懒--觉得能跑就行,也可能从事件名字面意思上理解错了)

所以代码里就写成了:先从 A 拿到数据保存到一个变量a中,B事件发生后,拿到来自B的数据再与变量a继续计算。但其实 A、B两个模块并没有前后加载顺序的约定。若哪天模块A的加载不先于B,就会发生bug。

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

let dataFromA = null;

// 模块消费事件的计算函数
function onAEvent(data) {
    dataFromA = data;
    console.log("Data from A received:", dataFromA);
}

function onBEvent(data) {
    const result = computeResult(dataFromA, data);
    console.log("Computed result:", result);
}

// 模拟依赖的计算函数
function computeResult(aData, bData) {
    if (aData === null || bData === null) {
        console.error("Error: B event received before A event.");
        return;
    }
    // 假设有一些计算逻辑
    return aData + bData;
}

// 事件触发模拟
function triggerEvents() {
    // 当前假设A事件总是在B事件之前触发
    onAEvent(100); // 正常情况下,A事件先触发
    onBEvent(50);  // 然后B事件触发
}

// 模拟场景
triggerEvents();

// 现在假设事件顺序被颠倒了
dataFromA = null; // 重置
function triggerEventsWithReversedOrder() {
    onBEvent(50);  // B事件先触发
    onAEvent(100); // 然后A事件触发
}

// 触发顺序被颠倒的场景,就会报错
triggerEventsWithReversedOrder();

懒惰的解决方式:直接加setTimeout解决,但这样会让你的代码更加不可预测。

combineLatest: 当 A 和 B 的数据都准备好时再计算

如果把这个事件驱动编程换成响应式编程,直接从A、B两个数据源中直接取出数据,用于计算,那就不存在这个风险了。

用rxjs的combineLatest操作符去解决这个问题,combineLatest可以等待传入的所有数据源都发出过一次数据后,才开始发出数据。

除了combineLatest,还有delayWhen、forkJoin等操作符,可以延迟访问数据的时机

弹珠图如下

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

代码如下:

import { fromEvent, combineLatest  } from 'rxjs';
import { map, filter } from 'rxjs/operators';

// 创建事件源
const aEventSource = new EventTarget();
const bEventSource = new EventTarget();

// 监听 A 事件
const aEvent$ = fromEvent(aEventSource, 'AEvent').pipe(
    map(event => event.detail) // 将事件处理成其数据部分
);

// 监听 B 事件
const bEvent$ = fromEvent(bEventSource, 'BEvent').pipe(
    map(event => event.detail) // 将事件处理成其数据部分
);

// 结合 A 和 B 事件的数据
const combined$ = combineLatest([aEvent$, bEvent$]);

// 进行计算,当 A 和 B 的数据都准备好时触发
combined$.pipe(
    filter(([aData, bData]) => aData !== null && bData !== null),
    map(([aData, bData]) => computeResult(aData, bData))
).subscribe(result => {
    console.log("Computed result:", result);
});

// 模拟计算函数
function computeResult(aData, bData) {
    return aData + bData;
}

// 模拟触发事件函数
function triggerEvents() {
    // 模拟按正确顺序触发事件
    aEventSource.dispatchEvent(new CustomEvent('AEvent', { detail: 100 }));
    bEventSource.dispatchEvent(new CustomEvent('BEvent', { detail: 50 }));
}

function triggerEventsWithReversedOrder() {
    // 模拟按颠倒顺序触发事件
    bEventSource.dispatchEvent(new CustomEvent('BEvent', { detail: 50 }));
    aEventSource.dispatchEvent(new CustomEvent('AEvent', { detail: 100 }));
}

// 测试正常顺序
triggerEvents();

// 测试顺序颠倒
triggerEventsWithReversedOrder();

6. 声明式编程:处理事件的lodash库

采用声明式的方式来描述数据流和操作,使得代码更加清晰、简洁和可读,减少了出错的可能性,最常见的例子就是数组的forEach、map、filter操作符的使用。

而 rxjs 提供了丰富的operator,封装常见的复杂的代码逻辑,让你写出声明式的代码。

比较常用的operator有:

  • 防抖处理 debounce、throttle
  • 并发处理 mergeMap
  • 竞态处理 concatMap、switchMap
  • 失败捕获 catchError
  • 失败重试 retryWhen
  • 去重 distinct、distinctUntilChanged
  • 截停 take、takeUntil
  • 缓冲 buffer、bufferWhen
  • 分支选择 iif
  • 组合多个事件 merge、withLatestFrom、combineLatest

如果想快速了解和学习operator的话,推荐访问这个网站 www.learnrxjs.io/learn-rxjs/… operator 都标记了⭐。

到底什么时候用得上rxjs,总结6个高频rxjs应用场景

另外,如果rxjs提供的operator不满足自己的需求,也可以自己实现。

例子:用rxjs实现搜索功能

要求对搜索框的输入进行失败处理、竞态处理、防抖处理、过滤空值、去重。

<!DOCTYPE html>
<html lang="en">

<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>

<body>
  <div>
    <input id="search" type="text" placeholder="搜索...">
    <span id="searchResult"></span>
  </div>

  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.2.0/rxjs.umd.min.js"></script>
  <script>
    const {
      fromEvent,
      of ,
      from,
    } = rxjs;
    const {
      debounceTime,
      map,
      switchMap,
      catchError,
      tap,
      distinctUntilChanged,
      filter,
    } = rxjs.operators;

    // 获取输入框元素
    const inputElement = document.getElementById('search');
    // 结果
    const searchResult = document.getElementById('searchResult');

    // 创建一个 Observable 来监听输入事件
    const input$ = fromEvent(inputElement, 'input');

    // 模拟的请求函数
    const mockSearchService = (query) => {
      return new Promise((resolve, reject) => {
        const random = Math.random();
        console.log('request: ', query,  random, random > 0.7)
        setTimeout(() => {
          // 设置一个随机数,如果随机数小于0.2,则请求失败
          if (random > 0.2) {
            resolve(`搜索结果: ${query}`);
          } else {
            reject('搜索出错!');
          }
        }, 500);
      });
    };

    // 使用 pipe 方法来处理输入事件
    const search$ = input$.pipe(
      // 使用 debounceTime 来实现防抖功能,例如这里我们设置为 500 毫秒
      debounceTime(500),
      // 变化才会请求
      distinctUntilChanged(),
      // 使用 map 来获取输入框的值
      map(event => event.target.value),
      // 过滤空值
      filter((val) => val),
      // 使用 switchMap 来发送搜索请求
      switchMap(query => from(mockSearchService(query)).pipe(
        // 对请求进行失败处理
        catchError(error => of('搜索失败')),
      )),
      tap(console.log)
    );

    // 订阅 search$ Observable,然后在控制台打印搜索结果
    search$.subscribe(results => searchResult.textContent = results);
  </script>
</body>

</html>
转载自:https://juejin.cn/post/7379059228105097268
评论
请登录