likes
comments
collection
share

RXJs 流式编程思想

作者站长头像
站长
· 阅读数 16

你应该听说过所谓的响应式编程(Rx),尤其是它的一些变体,包括Rx,Bacon.js, RAC,或者其他的一些技术。

"什么是响应式编程?"

响应式编程是使用异步数据流的方式进行程序设计

某种意义上说这不是新发明,Event bus或者传统的点击事情就是一种异步数据流。在这些流上你可以做一些操作。响应式编程就是在这上面产生的一种固化思想。你可以用任何东西创建数据流,而不只单是从点击或鼠标浮动事件中产生。

任何东西都可以成为一个流:变量,用户输入,属性,缓存,数据结构等等。例如,假设你的Twitter推送是一个与点击事件相同的流,你可以监听这个流并作出任何响应。

除此以外,你可以使用一些很棒的工具来组合,创建,过滤这些流 那样”函数魔法“就可以发挥功效了。

一个流可以是另一个流的输入。甚至多个流可以作为另一个流的输入。

你可以合并两个流,过滤一个流并得到另一个只包含你关注的数据的流。你可以将一个流的数据进行一一map从而得到另一个流。

既然流这个概念对响应式编程如果重要,那来仔细看看,从对”点击一个按钮“的流开始熟悉。

RXJs 流式编程思想

一个流是一个进行中的按时间排序的事件序列。

它可以发出三种不同类型的数据:值,错误,完成信号。例如当当前窗口或视图的按钮点击时完成信号就会发出。

我们只能异步的捕捉这些事件。我们会定义一个响应值的方法,一个响应错误的方法,一个响应完成的方法。有时我们会省略后面两个方法来只关注捕捉值。监听流我们叫做订阅”Subscribing“.

我们定义的这些方法是观察者observer。 流(observable)是被观察的。

还可以用图来绘制这个流。

--a---b-c---d---X---|->

a, b, c, d 是流中产生的值
X 是流中的错误
| 代表完成
---> 代表流的时间线

熟悉了这个以后,来做一些新的尝试:让我们从点击事件中来创建一个事件流。

首先,我们创建一个计数流来指示有多少次按钮点击发生了。

在通用Reactive 库里有很多方法,例如map,filter,scan等。当你调用某个方法如clickStream.map(f),它就会返回一个新的基于点击流的流而并未修改原始的点击流。这被叫做不变性(immutability),这样就允许我们进行链式调用,像这样clickStream.map(f).scan(g):

  clickStream: ---c----c--c----c------c-->
               vvvvv map(c becomes 1) vvvv
               ---1----1--1----1------1-->
               vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->

map(f)方法将流发出的每个数据通过你定义的函数f执行转换到新的一个流。

在上面的例子中,我们在每个点击中映射了数字1。sacn(g)函数累加了在流上之前的值,产生值x=g(accumulated, current),在这儿,g只是一个简单的加法。然后,counterStream发出点击总次数。

为了展示响应式编程的正真能力,假设你想要一个双击的事件流。假设我们想要一个流将三击(多击,2或更过)认为是双击 (提取双击以上的点击事件)。想想使用传统方法如何实现。

好,在响应式编程里这相当简单。事实上只要 4行代码 。但是先忽略代码。 不管是初学者还是专家,通过图表来理解和构建流是最好的方式。

RXJs 流式编程思想

灰色框框里的方式是在将一个流转为另一个流。首先累积在250ms的节流区域类的点击事件到一个列表。这里的细节现在不理解没关系,我们现在只是在演示响应式编程。最后,我们忽略长度为1的列表,用filter(x>=2)来进行筛选。 这样的三个操作就产生了我们想要的流了。我们可以订阅这个流然后做我们想要的操作。

希望你能理解这么做是多么美。这个例子只是冰山一角,你可以应用相同的操作到各种流上面,另外,也有很多现成的方法可以来用。

通过例子来进行响应式编程的思维学习

我们来看一个实际的例子(不是人造例子,没有很烂的概念)来一步一步指导我们如何用响应式思维进行思考。 这个例子结束后我们就完成了一个实际的代码工程,在这期间我们做了所有的事情。

我选择了 JavaScriptRxJS 作为开发工具。 因为JavaScript是我现在最熟悉的语言。 但是 Rx* library family 是可以在各种语言和平台上使用的 (.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy, etc), 所以无论是使用什么语言和平台,你都可以从这篇指导中了解到你想要的东西。

实现一个“Who to follow”的建议框

在Twitter里有一个建议你去粉谁的界面。

RXJs 流式编程思想

我们将主要去模仿它的一些核心的点:

  • 启动后加载账户数据然后显示3个推荐建议
  • 点击“Refesh”后再加载3个其他的账号推荐建议
  • 点击推荐建议后面“X”后,删除该建议,并显示另一个
  • 每行显示账号的头像,并链接到他们的主页

我们可以先不管其他比较小的功能点。 另外Twitter最近关闭了它的API授权,我们就做一个关注github的吧。 这里是github的用户API Github API for getting users. 如果有需要,整个demo完整的代码再这里可以找到。jsfiddle.net/staltz/8jFJ…

请求和响应

你怎么用Rx来处理这个问题? Rx的咒语是“几乎所有的东西都可以是流”。

我们先做最简单功能点 “ 启动后加载账户数据然后显示3个推荐建议“。 这个功能没有什么特别的,

(1)发起请求

(2)获得响应

(3)展示结果。

现在做最基本事情,让我们来将请求表达成一个流。

启动时,我们只需要一个请求,如果我们将它做成一个流,那么这个流只有一个派发值。后面我们会有许多请求,但现在一个就够了。

--a------|->

a是字符串 'https://api.github.com/users'

这是一个包含我们需要请求的URLs的一个流。 任何一个请求事件发生的时候,我们会收到两个东西,”时间“和”内容“。 ”时间“就是这个事件发生的时间,”内容“是这个事件包含的内容。这里”内容“就是一个URL的字符串。

用Rx* 创建这样一个包含单个值的流是非常简单的。Rx* 官方术语把一个流叫做”Observable“, 因为它可以被观察。但我认为这个名字很蠢,所以我还是管它叫流。

var requestStream = Rx.Observable.just('https://api.github.com/users');

现在,这儿有了一个字符串的流,没有任何操作。我们现在需要在收到这个事件的时候做一些事情。只需要订阅这个流就行了。

requestStream.subscribe(function(requestUrl) {
  // execute the request
  jQuery.getJSON(requestUrl, function(responseData) {
    // ...
  });
}

注意到我们使用了 ajax 回调来处理这个异步请求。 Rx是来处理异步数据流的,是不是响应可以在后面的时间点返回后作为一个流。看起来是这样的,所以试试吧。

requestStream.subscribe(function(requestUrl) {
  // execute the request
  var responseStream = Rx.Observable.create(function (observer) {
    jQuery.getJSON(requestUrl)
    .done(function(response) { observer.onNext(response); })
    .fail(function(jqXHR, status, error) { observer.onError(error); })
    .always(function() { observer.onCompleted(); });
  });
  
  responseStream.subscribe(function(response) {
    // do something with the response
  });
}

[Rx.Observable,creat()] 使用来创建一个自定义的流来通知订阅者一些自定义的数据,这个些自定义的数据可以通过 (onNext()) 或 (onError) 来派发。

我们目前做的只是封装了Jquery Ajax的promise对象。 那么,是不是意味着 Promise 是可以作为 Observable的?

Observable 是Promise++. 在Rx 中你可以很容易的将一个Promise转换为一个Observable: var stream = Rx.Observable.fromPromise(promise) 。唯一的区别是Observable不是 Promises/A+ 兼容的,但是概念上没有冲突。

一个Promise 可以理解为一个只有一个派发值的Observable. Rx 流突破了promise只允许返回一个值的限制。

这就很nice了,至少Observable可以和promise一样强大了。假如你是promise的死忠粉,那就再看一眼留意下Observable的能力吧。

回到例子,你很快会注意到我们在一个 subscribe() 里面又嵌套调用了另一个 subscribe(), 这就有些类似与死亡缠绕了。

还有,创建 responseStream 依赖于 requestStream.

你之前可能也听说过,Rx可以从一个流进行转换来得到另一个流,所以我们就这样改进吧。

现在你需要了解的一个简单的函数是 map(f) , 这个方法在流的每一个值来应用 f() 函数, 然后创造出另一个流。 在我们的例子中, 我们可以map每个URLs流中的数据到一个响应流。

var responseMetastream = requestStream
  .map(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

在这里我们得到了一个”metasteam“:一个流后面的子流. 不要担心,一个子流只是主流派发的一个值。你可以将它看成指针,每一个派发的值是一个指向另一个流的指针。 在我们的例子中,每个URL 映射到一个指向对象响应的流的指针。

RXJs 流式编程思想

一个响应的metastream看起来很奇怪,好像没有什么帮助。 我们只是想要一个包含JOSN对象的响应的流,而不是一个包含JSON对象的Promise/Obserserbal,这是另一种 map(), 它将metastream 中的值扁平化了(也就是映射后指向的新的流转换为了数据流)。

它还是在主流上进行所有子流上的数据的派发(map本来只是派发了一个子流)。 Flatmap不是一个对metastream的修复,因为metastream不是bug, 他们只是在Rx中处理异步数据的工具。

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

RXJs 流式编程思想

好,响应流是依赖于请求流被定义出来的,假如未来我们要添加请求,我们就就会在响应流中获取到响应。

requestStream:  --a-----b--c------------|->
responseStream: -----A--------B-----C---|->

(小写是请求,大写是响应)

现在我们终于有了响应流了,我们可以显示收到的响应数据了。

responseStream.subscribe(function(response) {
  // render `response` to the DOM however you wish
});

目前所有的代码如下

var requestStream = Rx.Observable.just('https://api.github.com/users');

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

responseStream.subscribe(function(response) {
  // render `response` to the DOM however you wish
});

刷新按钮

我忘了说这个请求返回的JSON体里面包含了100个用户信息。API只允许我们指定页面位置而没有页面大小,因为我们只用了3个信息,这样我们就浪费了97个用户信息。

我们可以暂时忽略这个问题,后面我们会看看如何缓存整个请求响应。

每次点击刷新按钮,请求流就会发出一个URL,然后我们就获得了一个新的响应。

我们需要做两件事情,

一是创建一个点击事件的流(所有东西都可以是流),

二是我们根据点击事件的流改变请求的流。不错的是RxJS有将事件监听转换为流的工具函数。

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

因为刷新事件本身没有携带要请求的API URL信息,我们需要映射每个点击事件到实际的URL. 现在我们将请求流变成映射到请求API节点后并附加随机偏移量的点击事件流。

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

因为没有自动化测试,所以我破坏了之前的已经建好的功能: 启动时不再有请求发生,请求只会发生在点击刷新按钮的时候。

我同时需要两个行为:启动时和点击刷新时都要有请求产生。

我们知道如何为每个场景创建不同的流:

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });
  
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

但是我们如何将这两个合并成一个呢? 那就是 merge() 要做的。 下图是一个解释图示:

stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
          vvvvvvvvv merge vvvvvvvvv
          ---a-B---C--e--D--o----->

现在看起来就很简单了:

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });
  
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

var requestStream = Rx.Observable.merge(
  requestOnRefreshStream, startupRequestStream
);

有一种比较简单和干净的实现方式,不需要创建中间流:

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  })
  .merge(Rx.Observable.just('https://api.github.com/users'));

startWith() 做的事情就是字面理解的那样。 不管你的输入流是怎么样的,输出流带有 startWith(x) 总是会在开始的时候有 x 输出。 但是这还是不够整洁,我们依然在重复写请求URL. 一种方法是将 startWith() 放到离 refreshClickStream 更近的地方,简单说就是在启动的时候模拟一个刷新点击。

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

嗯,不错。 如果你回到我说“破坏了之前的已经建好的功能”那里,我们现在修复这个问题只需要加一个startWith() 就可以了。

3个用户推荐如何使用流来建模

到目前为止,我们接触了通过subscribe()响应流responseStreams实现了显示一个用户推荐的UI. 现在有了刷新按钮,我们有了一个问题:

当点击刷新的时候,现有的3个推荐并没有清除。新的推荐只会在响应到达后显示,所以为了UI好看一些,我们需要在点击刷新时清除当前的推荐。

refreshClickStream.subscribe(function() {
  // clear the 3 suggestion DOM elements 
});

因为我们这样就有了两个订阅者来影响同一个UI的显示(另一个是responseStream.subscribe()),

每个流产生的值都是一个包含推荐信息的JSON对象。 我们分别为3个推荐创建流,第一个推荐流看起来是这样的:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  });

剩下的第二,第三个只要复制粘贴第一个就行了。 是的,这不够简洁,我们这么做只是为了保持对于新手教程足够简单。 另外,我觉读者自己想想如何避免这样的重复代码也是一个很好的思维锻炼。

我们换种方式来实现显示推荐,不再直接使用 reponseStream.subscribe() 来显示。

suggestion1Stream.subscribe(function(suggestion) {
  // render the 1st suggestion to the DOM
});

回到刷新按钮,如何清除已显示的推荐,我们可以简单将一个返回null的刷新点击流合并到推荐流上。

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  );

在进行显示的时候吗,我们将null解释为没有数据并隐藏UI.

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // hide the first suggestion DOM element
  }
  else {
    // show the first suggestion DOM element
    // and render the data
  }
});

完整的流示意图:

refreshClickStream: ----------o--------o---->
     requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->   
 suggestion1Stream: ----s-----N---s----N-s-->
 suggestion2Stream: ----q-----N---q----N-q-->
 suggestion3Stream: ----t-----N---t----N-t-->

N代表空, s,q,t就是推荐数据

启动时,我们可以显示空推荐。 只需要添加一个 startWith(null)到推荐流上就可以了。

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

结果流图示:

refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->   
 suggestion1Stream: -N--s-----N----s----N-s-->
 suggestion2Stream: -N--q-----N----q----N-q-->
 suggestion3Stream: -N--t-----N----t----N-t-->

删除一个推荐,使用缓存

还有一个功能需要实现。每个推荐都需要一个关闭(‘x’)按钮,点击就可以删除这个推荐并且加载另一个。 第一反应,你应该想说在点击x按钮的时候在产生一个请求就可以了:

var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
// and the same for close2Button and close3Button

var requestStream = refreshClickStream.startWith('startup click')
  .merge(close1ClickStream) // we added this
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

这样做确实可以。它可以关闭并加载新的推荐。这儿有几种不同的实现方式,为了带来乐趣我们将复用之前的请求。 我们目前只用了返回值中100条里面的3条,所以还有大量有用的数据我们是没有使用的。 所以不用再发出新的请求了。

我们再次用流的思想来思考问题,当点击x按钮后,我们想要使用responseStream最后发出的的响应数据来随机获取一个值。像这样:

    requestStream: --r--------------->
   responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->

在 Rx* 中有个叫 combineLatest 可以帮助达成我们的意图。 它可以将两个流最近发出的值a,b结合成一个输出流并输出 c=f(x,y) , f是你定义的方法。 用图示解释更好理解:

stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          ----AB---AC--EC---ED--ID--IQ---->

我们可以使用 combineLastest() 来将 closeClickStreamresponseStream 结合, 然后无论何时x按钮点击了我们都能得到responseStream最后发出的值,从而产生新的suggestion1Stream。 另一方面, combineLastest()是对称的,无论何时responseStream产生了一个新的值,它都会包含最后一个x按钮点击的值来产生新的推荐流。这很有趣,因为它让我们可以简化之前的推荐流的代码,像这样:

var suggestion1Stream = close1ClickStream
  .combineLatest(responseStream,             
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

我们还差最后一点。 combineLatest()使用了两个流的最后的数据,但是如果其中一个从来没有发出过任何数据, combineLastest就不能产生输出流。 如果你仔细看上面的图示,你会注意到当第一个流发出a的时候,没有数据输出。 只有当第二流发出数据b了才产生了输出。

有许多不同的方式来解决这个问题,我们还是使用最简单的方式,就是在启动的时候模拟一个x点击。

var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this
  .combineLatest(responseStream,             
    function(click, listUsers) {l
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

串起来

现在我们已经完成了,所有的代码就是这样:

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

var closeButton1 = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
// and the same logic for close2 and close3

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var responseStream = requestStream
  .flatMap(function (requestUrl) {
    return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
  });

var suggestion1Stream = close1ClickStream.startWith('startup click')
  .combineLatest(responseStream,             
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);
// and the same logic for suggestion2Stream and suggestion3Stream

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // hide the first suggestion DOM element
  }
  else {
    // show the first suggestion DOM element
    // and render the data
  }
});

这个代码虽然少,但是包含了许多点。 它的特点是分离关注点来管理事件,甚至缓存数据。函数式的风格让代码看起来更像式声明式的而不是指令式的:我们不是在切除一个指令序列来执行,我们只是通过定义流之间的关系来说明什么是什么。例如,通过Rx 我们告诉计算机suggestion1Streamclose1 流 和 最后使用的 用户列表响应流的一个组合,并且定义流在启动和点击刷新时产生null。

我们应该还注意到一个印象深刻的地方,例子中没有用到if,for,while,和javascript典型的回调控制。 如果你愿意,在subscribe中使用filter后你甚至可以不再使用if else。在rx中,我们有流方法诸如map,filter,scan,merge,combineLatest,startWith,等等更多的流控制的事件驱动的方法。 这些方法库可以帮助你用更少的代码实现强大的功能。