1 概述
响应式编程,以rxjs为例子
2 背景
2.1 问题
2.1.1 React
A | B | C
---------
Server
远古时期的jquery和近几年的react都是单独的视图框架,集中精力在解决如何用简洁的方法来编写视图。
结构:
- 多个视图之间互不干扰,每个视图自身就是一个自洽的模型
缺点:
- 视图的数据没有缓存起来
- 多视图没有共享数据,导致一个页面修改了这个数据,在另外一个页面仍然是旧数据
- 当子组件需要通知父组件更新视图时就会很蛋疼
2.1.2 Reflux
A | B | C
------------
前端的数据层
------------
Server
在flux与reflux出来后,大家开始考虑数据层的问题。
结构:
- 将数据从视图中单独分割出来,将共有的数据放入到单一的store中,然后修改store来实现更改视图,这样就能实现多个视图共享单个数据,实现了多视图数据的共享和缓存化。
- 为了更好让多个视图复用数据层的数据,数据层的设计最好是范式化的,实现了多视图数据同步更新。
- 每次数据更改后都是将新数据从父组件开始全部渲染一遍,这样就避免了需要子组件需要通知父组件更新视图的问题。
缺点:
- 数据层中既包含了异步的ajax的代码,也包含了同步的修改内存数据代码,单元测试做起来很麻烦。
- 当需要跨多个store修改数据时就会耦合很严重。
2.1.3 Redux
A | B | C
------------
前端的action
------------
前端的reducer
------------
Server
在redux出来后,引入了函数式编程的概念。
结构:
- 仍然采用reflux中的数据层的概念,只是将数据层划分为action与reducer两部分
- reducer只负责数据层的同步更新操作,就是一些仅仅需要修改内存数据的代码。这样的reducer相当容易被单元测试,也更容易被复用起来。
- action负责分发多个reducer,以及处理异步ajax然后分发多个reducer。解决了需要跨多个reducer修改数据的问题。
- 另外,函数式编程也让server-render与middleware变得更加简单直接。
缺点:
- action遇到异步时依然很头疼,redux就有redux-thunk,redux-promise,redux-saga,redux-obserable等多种异步解决方案。
- 范式化后的数据层容易被多个视图复用,但是这些数据粒度太小,视图复用起来时较为麻烦,需要写很多format的代码。
2.1.4 现在
function fetchFriend(id) {
return dispatch => {
dispatch({ type: 'FETCH_FRIEND' });
return fetch(`http://localhost/api/friend/${id}`)
.then(response => response.json())
.then(json => dispatch({ type: 'RECEIVE_FRIENDS', payload: json }));
}
}
但是,对于异步流程的竞态问题,数据层框架依然没有很好地解决,例如在上面的代码中,描述的是获取某个用户的好友信息。在间隔较短的时间内发出以上两次请求,可能会发生第一次的数据覆盖第二次数据的问题,看这里。对于这个问题,有加入额外字段控制,有加入redux-saga,redux-observable等全家桶的方案,总而言之,原来redux无法很好地解决这个问题,需要引入更多的复杂性来解决。
c = a + b // 不管a还是b发生更新,c都不动,等到c被使用的时候,才去重新根据a和b的当前值计算
另外一个问题是计算数据,由于redux中存放的是范式化数据,而我们使用的是范式化组合过后的数据。例如,上面的数据c。这个数据c要么交给view中在render中重复计算,要么由redux的action触发更新来修改数据c。前者对于view来说逻辑太重,后者对于action来说复杂性太高,a或b更新时都需要手动触发更新。
2.2 解决
为了解决以上的这个问题,ms提出了响应式编程的概念,出现了ReactiveX的大家族,它的目标是建立一个库,专门处理异步流程中的流问题,可以被看作为loadash for stream。
fetchFriend = new Subject();
fetchFriend
.switchMap((id)=>
Observerable.from(fetch(`http://localhost/api/friend/${id}`))
.do(()=>dispatch({ type: 'FETCH_FRIEND_SUCCESS' }))
.catch(()=>dispatch({type: 'FETCH_FRIEND_SUCCESS'})))
使用switchMap来解决竞态的问题
dataA = new Subject();
dataB = new Subject();
dataC = Observable.combineLastest(dataA,dataB,function(a,b){
return a+b;
});
使用combineLastest来解决数据视图的问题。
响应式编程的特点为:
- 订阅模式,强调业务解耦,以及流处理模式。
- 函数式编程,强调流程无副作用。
3 观察者
3.1 冷
let stream$ = Rx.Observable.of(1,2,3);
//订阅者 1: 1,2,3
let subscription = stream.subscribe(
data => console.log(data),
err => console.error(err),
() => console.log('completed')
)
setTimeout(() => {
subscription.unsubscribe() // 在这我们调用了清理函数
}, 3000)
冷观察者,subscribe时创建,并且激活,unsubscribe时销毁,不同的订阅者有不同的观察者实例
3.2 热
let publisher$ = Rx.Observable
.interval(1000)
.take(5)
.publish();
publisher$.subscribe(
data => console.log('subscriber from first minute',data),
err => console.log(err),
() => console.log('completed')
)
setTimeout(() => {
publisher$.subscribe(
data => console.log('subscriber from 2nd minute', data),
err => console.log(err),
() => console.log('completed')
)
}, 3000)
let subscription = publisher$.connect();
setTimeout(() => {
subscription.unsubscribe() // 在这我们调用了清理函数
}, 10000)
热观察者,通过冷观察者publish操作转换为热观察者,然后在connect时激活。主要区别为publish时创建,connect时激活,不同的订阅者共用同一个观察者实例。
3.3 暖
let obs = Rx.Observable.interval(1000).take(3).publish().refCount();
let subscription1 = null;
let subscription2 = null;
setTimeout(() => {
subscription1 = obs.subscribe(data => console.log('sub1', data));
},1100)
setTimeout(() => {
subscription2 = obs.subscribe(data => console.log('sub2', data));
},2100)
setTimeout(() => {
subscription1.unsubscribe()
subscription2.unsubscribe()
}, 10000)
暖观察者,通过冷观察者publish操作转换为热观察者,然后再由refCount操作转换为暖观察者。主要区别为,publish时创建,第一个订阅者订阅时激活,所有订阅者消失时销毁,不同的订阅者共用同一个观察者实例。
4 操作符
4.1 转换操作符
操作符 | 意义 |
---|---|
map | 通过函数对每个item转换 |
mapTo | 对每个item执行常量转换 |
4.2 连接操作符
操作符 | 意义 |
---|---|
merge | 并行连接 |
concat | 串行连接 |
mergeMap(flatMap) | 通过函数转换为Observable,然后多个item的Observable并行连接到当前流中 |
concatMap | 通过函数转换为Observable,然后多个item的Observable串行连接到当前流中 |
switchMap | 通过函数转换为Observable,然后多个item的Observable唯一连接到当前流中,也就是会自动cancel上一个item的Observable |
4.2 组合操作符
操作符 | 意义 |
---|---|
zip | 一对一组合多个流的item,多个流都触发时才能触发 |
combineLatest | 一对一组合多个流的item,其中一个流触发时就触发 |
withLatestFrom | 一对一组合多个流的item,第一个流触发时才触发 |
4.3 时间操作符
操作符 | 意义 |
---|---|
delay | 延迟触发 |
sampleTime | 输出只在特定时间触发 |
debounceTime | 输入间隔大于阀定值时才触发 |
throttle | 保证输出间隔大于阀定值 |
4.4 条件操作符
操作符 | 意义 |
---|---|
filter | 过滤特定条件的item |
take | 取前几个的item,条件为数量 |
takeWhile | 取前几个的item,条件为function |
takeUtil | 取前几个的item,条件为Obserable |
skip | 跳过前几个的item,条件为数量 |
skipWhile | 跳过前几个的item,条件为function |
skipUtil | 跳过前几个的item,条件为Obserable |
4.5 聚合操作符
操作符 | 意义 |
---|---|
buffer | 将所有item连接成一个数组,并逐次触发 |
scan | 将所有item组合成一个新的item,并逐次触发 |
distinctUntilChanged | 触发相邻不重复的item,并逐次触发 |
reduce | 将所有item组合成一个新的item,仅最后触发一次 |
max | 将所有item组合成一个最大值,仅最后触发一次 |
min | 将所有item组合成一个最小值,仅最后触发一次 |
distinct | 触发所有不重复的item,仅最后触发一次 |
5 主题
5.1 普通主题
let source$ = Rx.Observable.interval(500).take(3);
const proxySubject = new Rx.Subject();
let subscriber = source$.subscribe( proxySubject );
proxySubject.subscribe( (value) => console.log('proxy subscriber', value ) );
proxySubject.next( 3 );
Subject可以作为一个代理来使用,而且更重要的是,Subject就像是一个热的Obserable,不过它不仅可以subscribe,还可以next,是一个双向通道。
5.2 长记忆主题
let replaySubject = new Rx.ReplaySubject( 2 );
replaySubject.next( 0 );
replaySubject.next( 1 );
replaySubject.next( 2 );
// 1, 2
let replaySubscription = replaySubject.subscribe((value) => {
console.log('replay subscription', value);
});
ReplaySubject的第一个参数控制主题的记忆性,自动缓存最近数量的item。
5.3 带初值短记忆主题
let behaviorSubject = new Rx.BehaviorSubject(42);
behaviorSubject.subscribe((value) => console.log('behaviour subject',value) );
console.log('Behaviour current value',behaviorSubject.getValue());
behaviorSubject.next(1);
console.log('Behaviour current value',behaviorSubject.getValue());
behaviorSubject.next(2);
console.log('Behaviour current value',behaviorSubject.getValue());
behaviorSubject.next(3);
console.log('Behaviour current value',behaviorSubject.getValue());
BehaviorSubject的第一个参数控制主题的初值,自动缓存最近一个数量的item。
6 场景
6.1 点击
var multiClickStream = clickStream
.buffer(function() { return clickStream.throttle(250); })
.map(function(list) { return list.length; })
.filter(function(x) { return x >= 2; });
用rxjs5实现了双击事件流
6.2 实时搜索
注意点:
- 如何用debounce避免连续输入时造成的无效搜索
- 如何用distinctUntilChanged避免相邻的重复搜索
- 如何用switchMap(flatMapLatest)解决多次ajax同时发出后的冲突问题,就是前一个ajax比后一个ajax的返回更后时造成的覆盖问题。
- 如何用subscribe的success与error回调区分处理ajax的失败问题
6.3 数据视图
注意点:
- 如何用combineLatest组合多个视图成为一个新的视图,使得view层成为很薄的一层,不需要再写format的函数
这个使用rxjs的方案解决了view层里面format逻辑太重的问题,而且很容易被view拿来subscribe。
6.4 异步事件处理
注意点:
- 如何用withLatestFrom在action里获取store里面的数据,如何传递给store。
这个使用rxjs实现mvc其实不太漂亮,因为action手动subscribe了store,破坏了解耦的原则
6.5 模拟redux
注意点:
- 使得action与reducer都是纯函数的方式,而且关注点分离,实现了更好的解耦原则和更容易的单元测试
- 多个action之间如何使用被整合到单个event action中,实现了event的集中式处理,很方便就能实现中间件
- reducer是怎么通过event action转换为数据的,注意scan,startWith,publishRelpay,refCount的用法
这个使用rxjs的方案更为靠谱,模拟redux的整个架构,同时避免redux所带来的异步全家桶,数据视图太弱的问题,是一个更加优雅的数据层方案。所以,这也是徐飞所提倡的复杂单页应用的数据层设计方案,只可惜上手的复杂度太高,实在不太容易理解。
6.6 结合redux-observable
注意点:
- epic就是action流到action流的转换
- reducer就是action流到store的转换
- 使得副作用的epic可以和reducer一样,随意combine。
这个使用redux-observable作为redux解决方案相当优雅,只是会稍微失去了rxjs作为数据视图的能力而已。另外,epic由于带上了rxjs,使得整体的复杂度更高了。
7 总结
rxjs相对redux是一个更为彻底的函数式响应框架,在处理复杂的单页面数据层问题上是一把锋利的瑞士军刀,只是上手的复杂度较高(操作符超过100多个,而且需要扭转为一个流的思考),调试困难,代码量多。所以,目前看来rxjs相对于它所带来的好处,成本代价太高,只适用于强实时的重型单页面架构,期待有更加优雅简洁的替代方案出现。
参考资料
- 本文作者: fishedee
- 版权声明: 本博客所有文章均采用 CC BY-NC-SA 3.0 CN 许可协议,转载必须注明出处!