rxjs

2017-11-19 fishedee 前端

1 概述

响应式编程,以rxjs为例子

2 背景

2.1 问题

2.1.1 React

A | B | C
---------
 Server

远古时期的jquery和近几年的react都是单独的视图框架,集中精力在解决如何用简洁的方法来编写视图。

结构:

  • 多个视图之间互不干扰,每个视图自身就是一个自洽的模型

缺点:

  • 视图的数据没有缓存起来
  • 多视图没有共享数据,导致一个页面修改了这个数据,在另外一个页面仍然是旧数据
  • 当子组件需要通知父组件更新视图时就会很蛋疼

2.1.2 Reflux

 A | B | C
------------
前端的数据层
------------
  Server

在flux与reflux出来后,大家开始考虑数据层的问题。

结构:

  • 将数据从视图中单独分割出来,将共有的数据放入到单一的store中,然后修改store来实现更改视图,这样就能实现多个视图共享单个数据,实现了多视图数据的共享和缓存化。
  • 为了更好让多个视图复用数据层的数据,数据层的设计最好是范式化的,实现了多视图数据同步更新。
  • 每次数据更改后都是将新数据从父组件开始全部渲染一遍,这样就避免了需要子组件需要通知父组件更新视图的问题。

缺点:

  • 数据层中既包含了异步的ajax的代码,也包含了同步的修改内存数据代码,单元测试做起来很麻烦。
  • 当需要跨多个store修改数据时就会耦合很严重。

2.1.3 Redux

 A | B | C
------------
前端的action
------------
前端的reducer
------------
  Server

在redux出来后,引入了函数式编程的概念。

结构:

  • 仍然采用reflux中的数据层的概念,只是将数据层划分为action与reducer两部分
  • reducer只负责数据层的同步更新操作,就是一些仅仅需要修改内存数据的代码。这样的reducer相当容易被单元测试,也更容易被复用起来。
  • action负责分发多个reducer,以及处理异步ajax然后分发多个reducer。解决了需要跨多个reducer修改数据的问题。
  • 另外,函数式编程也让server-render与middleware变得更加简单直接。

缺点:

  • action遇到异步时依然很头疼,redux就有redux-thunk,redux-promise,redux-saga,redux-obserable等多种异步解决方案。
  • 范式化后的数据层容易被多个视图复用,但是这些数据粒度太小,视图复用起来时较为麻烦,需要写很多format的代码。

2.1.4 现在

function fetchFriend(id) {
    return dispatch => {
        dispatch({ type: 'FETCH_FRIEND' });
        return fetch(`http://localhost/api/friend/${id}`)
            .then(response => response.json())
            .then(json => dispatch({ type: 'RECEIVE_FRIENDS', payload: json }));
    }
}

但是,对于异步流程的竞态问题,数据层框架依然没有很好地解决,例如在上面的代码中,描述的是获取某个用户的好友信息。在间隔较短的时间内发出以上两次请求,可能会发生第一次的数据覆盖第二次数据的问题,看这里。对于这个问题,有加入额外字段控制,有加入redux-saga,redux-observable等全家桶的方案,总而言之,原来redux无法很好地解决这个问题,需要引入更多的复杂性来解决。

c = a + b // 不管a还是b发生更新,c都不动,等到c被使用的时候,才去重新根据a和b的当前值计算

另外一个问题是计算数据,由于redux中存放的是范式化数据,而我们使用的是范式化组合过后的数据。例如,上面的数据c。这个数据c要么交给view中在render中重复计算,要么由redux的action触发更新来修改数据c。前者对于view来说逻辑太重,后者对于action来说复杂性太高,a或b更新时都需要手动触发更新。

2.2 解决

为了解决以上的这个问题,ms提出了响应式编程的概念,出现了ReactiveX的大家族,它的目标是建立一个库,专门处理异步流程中的流问题,可以被看作为loadash for stream。

fetchFriend = new Subject();
fetchFriend
    .switchMap((id)=>
        Observerable.from(fetch(`http://localhost/api/friend/${id}`))
        .do(()=>dispatch({ type: 'FETCH_FRIEND_SUCCESS' }))
        .catch(()=>dispatch({type: 'FETCH_FRIEND_SUCCESS'})))

使用switchMap来解决竞态的问题

dataA = new Subject();
dataB = new Subject();
dataC = Observable.combineLastest(dataA,dataB,function(a,b){
    return a+b;
});

使用combineLastest来解决数据视图的问题。

响应式编程的特点为:

  • 订阅模式,强调业务解耦,以及流处理模式。
  • 函数式编程,强调流程无副作用。

3 观察者

3.1 冷

let stream$ = Rx.Observable.of(1,2,3);
//订阅者 1: 1,2,3
let subscription = stream.subscribe(
   data => console.log(data),
   err => console.error(err),
   () => console.log('completed')
)
setTimeout(() => {
  subscription.unsubscribe() // 在这我们调用了清理函数
}, 3000)

冷观察者,subscribe时创建,并且激活,unsubscribe时销毁,不同的订阅者有不同的观察者实例

3.2 热

let publisher$ = Rx.Observable
.interval(1000)
.take(5)
.publish();


publisher$.subscribe(
  data => console.log('subscriber from first minute',data),
  err => console.log(err),
  () => console.log('completed')
)

setTimeout(() => {
    publisher$.subscribe(
        data => console.log('subscriber from 2nd minute', data),
        err => console.log(err),
        () => console.log('completed')
    )
}, 3000)


let subscription = publisher$.connect();
setTimeout(() => {
  subscription.unsubscribe() // 在这我们调用了清理函数
}, 10000)

热观察者,通过冷观察者publish操作转换为热观察者,然后在connect时激活。主要区别为publish时创建,connect时激活,不同的订阅者共用同一个观察者实例。

3.3 暖

let obs = Rx.Observable.interval(1000).take(3).publish().refCount();

let subscription1 = null;
let subscription2 = null;

setTimeout(() => {
    subscription1 = obs.subscribe(data => console.log('sub1', data));
},1100)

setTimeout(() => {
    subscription2 = obs.subscribe(data => console.log('sub2', data));
},2100)

setTimeout(() => {
  subscription1.unsubscribe()
  subscription2.unsubscribe()
}, 10000)

暖观察者,通过冷观察者publish操作转换为热观察者,然后再由refCount操作转换为暖观察者。主要区别为,publish时创建,第一个订阅者订阅时激活,所有订阅者消失时销毁,不同的订阅者共用同一个观察者实例。

4 操作符

4.1 转换操作符

操作符 意义
map 通过函数对每个item转换
mapTo 对每个item执行常量转换

4.2 连接操作符

操作符 意义
merge 并行连接
concat 串行连接
mergeMap(flatMap) 通过函数转换为Observable,然后多个item的Observable并行连接到当前流中
concatMap 通过函数转换为Observable,然后多个item的Observable串行连接到当前流中
switchMap 通过函数转换为Observable,然后多个item的Observable唯一连接到当前流中,也就是会自动cancel上一个item的Observable

4.2 组合操作符

操作符 意义
zip 一对一组合多个流的item,多个流都触发时才能触发
combineLatest 一对一组合多个流的item,其中一个流触发时就触发
withLatestFrom 一对一组合多个流的item,第一个流触发时才触发

4.3 时间操作符

操作符 意义
delay 延迟触发
sampleTime 输出只在特定时间触发
debounceTime 输入间隔大于阀定值时才触发
throttle 保证输出间隔大于阀定值

4.4 条件操作符

操作符 意义
filter 过滤特定条件的item
take 取前几个的item,条件为数量
takeWhile 取前几个的item,条件为function
takeUtil 取前几个的item,条件为Obserable
skip 跳过前几个的item,条件为数量
skipWhile 跳过前几个的item,条件为function
skipUtil 跳过前几个的item,条件为Obserable

4.5 聚合操作符

操作符 意义
buffer 将所有item连接成一个数组,并逐次触发
scan 将所有item组合成一个新的item,并逐次触发
distinctUntilChanged 触发相邻不重复的item,并逐次触发
reduce 将所有item组合成一个新的item,仅最后触发一次
max 将所有item组合成一个最大值,仅最后触发一次
min 将所有item组合成一个最小值,仅最后触发一次
distinct 触发所有不重复的item,仅最后触发一次

5 主题

5.1 普通主题

let source$ = Rx.Observable.interval(500).take(3);
const proxySubject = new Rx.Subject();
let subscriber = source$.subscribe( proxySubject );

proxySubject.subscribe( (value) => console.log('proxy subscriber', value ) );

proxySubject.next( 3 );

Subject可以作为一个代理来使用,而且更重要的是,Subject就像是一个热的Obserable,不过它不仅可以subscribe,还可以next,是一个双向通道。

5.2 长记忆主题

let replaySubject = new Rx.ReplaySubject( 2 );

replaySubject.next( 0 );
replaySubject.next( 1 );
replaySubject.next( 2 );

//  1, 2
let replaySubscription = replaySubject.subscribe((value) => {
    console.log('replay subscription', value);
});

ReplaySubject的第一个参数控制主题的记忆性,自动缓存最近数量的item。

5.3 带初值短记忆主题

let behaviorSubject = new Rx.BehaviorSubject(42);

behaviorSubject.subscribe((value) => console.log('behaviour subject',value) );
console.log('Behaviour current value',behaviorSubject.getValue());
behaviorSubject.next(1);
console.log('Behaviour current value',behaviorSubject.getValue());
behaviorSubject.next(2);
console.log('Behaviour current value',behaviorSubject.getValue());
behaviorSubject.next(3);
console.log('Behaviour current value',behaviorSubject.getValue());

BehaviorSubject的第一个参数控制主题的初值,自动缓存最近一个数量的item。

6 场景

6.1 点击

var multiClickStream = clickStream
    .buffer(function() { return clickStream.throttle(250); })
    .map(function(list) { return list.length; })
    .filter(function(x) { return x >= 2; });

用rxjs5实现了双击事件流

6.2 实时搜索

代码在这里

注意点:

  • 如何用debounce避免连续输入时造成的无效搜索
  • 如何用distinctUntilChanged避免相邻的重复搜索
  • 如何用switchMap(flatMapLatest)解决多次ajax同时发出后的冲突问题,就是前一个ajax比后一个ajax的返回更后时造成的覆盖问题
  • 如何用subscribe的success与error回调区分处理ajax的失败问题

6.3 数据视图

代码在这里

注意点:

  • 如何用combineLatest组合多个视图成为一个新的视图,使得view层成为很薄的一层,不需要再写format的函数

这个使用rxjs的方案解决了view层里面format逻辑太重的问题,而且很容易被view拿来subscribe。

6.4 异步事件处理

代码在这里

注意点:

  • 如何用withLatestFrom在action里获取store里面的数据,如何传递给store。

这个使用rxjs实现mvc其实不太漂亮,因为action手动subscribe了store,破坏了解耦的原则

6.5 模拟redux

代码在这里

注意点:

  • 使得action与reducer都是纯函数的方式,而且关注点分离,实现了更好的解耦原则和更容易的单元测试
  • 多个action之间如何使用被整合到单个event action中,实现了event的集中式处理,很方便就能实现中间件
  • reducer是怎么通过event action转换为数据的,注意scan,startWith,publishRelpay,refCount的用法

这个使用rxjs的方案更为靠谱,模拟redux的整个架构,同时避免redux所带来的异步全家桶,数据视图太弱的问题,是一个更加优雅的数据层方案。所以,这也是徐飞所提倡的复杂单页应用的数据层设计方案,只可惜上手的复杂度太高,实在不太容易理解。

6.6 结合redux-observable

代码在这里

注意点:

  • epic就是action流到action流的转换
  • reducer就是action流到store的转换
  • 使得副作用的epic可以和reducer一样,随意combine。

这个使用redux-observable作为redux解决方案相当优雅,只是会稍微失去了rxjs作为数据视图的能力而已。另外,epic由于带上了rxjs,使得整体的复杂度更高了。

7 总结

rxjs相对redux是一个更为彻底的函数式响应框架,在处理复杂的单页面数据层问题上是一把锋利的瑞士军刀,只是上手的复杂度较高(操作符超过100多个,而且需要扭转为一个流的思考),调试困难,代码量多。所以,目前看来rxjs相对于它所带来的好处,成本代价太高,只适用于强实时的重型单页面架构,期待有更加优雅简洁的替代方案出现。

参考资料

相关文章