在最近的社交媒体、活动中,我常被问及关于“hot” vs “cold” observables
的问题,或者observable
到底是“multicast”还是“unicast”。人们似乎为Rx.Observable
内部的黑魔法所困。当被要求介绍一下observable
时,常听到“那就是些Stream
”,或“就像是Promise
”。而且,我本人也在各种场合的公开演讲中使用过这类说辞。
和Promise
的比较是必然、但不准确的。假设Promise
和Observable
都是异步的,而且Promises
已经被广泛使用、并且为JavaScript社区所熟知,这的确是个不错的切入点。拿Promise
的then
和Observable
的subscribe
做对比,看看立即执行
和延迟执行
的差异;看看撤销机制和复用机制。。。通过这些比较向初学者介绍Observable
。
但有个问题就是:Observables
和Promises
相比,差异大于相似。Promise
永远是multicast。Promise
的resolution
和rejection
永远是异步的。所以当人们开始以使用Promise
的思维来处理Observable
的代码时,所期待的结果并不都是成立的。Observable
可能是multicast,可能是异步,但不代表全部情况。我开始有些自责关于之前人们对Observable
认识不准确这件事上的推波助澜了。
# Observable
就是一个函数,她接受一个Observer
作为参数然后返回另一个函数
如果你真想了解Observable
,最佳手段是自己写一个。说实话,也没有想象中那么难。一个Observable
,说白了,就是一个有着特殊用途的函数。
我们来看下她基本特征:
- 是个函数
- 接受一个
Observer
(一个包含next
、error
和complete
方法的对象)作为参数 - 返回一个
unsubscribe
函数
再来看下她的目的:
使Observer
联通一个Producer
,然后返回一个可以挂断与Producer
联通关系的函数。Observer
可被当作一个注册机处理程序。
基本实现如下:
function myObservable(observer) {
const datasource = new DataSource();
datasource.ondata = (e) => observer.next(e);
datasource.onerror = (err) => observer.error(err);
datasource.oncomplete = () => observer.complete();
return () => {
datasource.destroy();
};
}
(当然你也可以自己试试:JSBin)
如你所见,也没多少内容,so easy!
# SafeObserver
:写更好的Observer
当我们讨论RxJS
或者Reactive Programming
时,Observable
常常首先映入眼帘。但实际上,Observer
才是那个干重活儿的人。Observable
是懒汉,她就是一个函数,杵在那里等人subscribe
,她加载好Observer
,就结束任务,又百无聊赖等待被临幸了。另一方面,Observer
一只保持活跃,不断从Producer
那里接收消息。
从现在起,你可以使用包含了next
、error
和complete
方法的POJO(Plain-Old JavaScript Object)来subscribe Observable
,但写区区一个POJO不过是开始。在RxJS
5里,我们会提供一些保障机制给开发者,下面就是一些比较重要的保障原则:
- 传入的
Observer
对象可以不实现所有规定的方法 - 在
complete
或者error
触发之后再调用next
方法是没用的 unsubscribe
后,任何方法都不能再被调用了complete
和error
触发后,unsubscribe
也会自动调用- 当
next
、complete
和error
出现异常时,unsubscribe
也会自动调用以保证资源不会溢出/浪费 next
、complete
和error
是可选的。按需处理即可,不必全部处理。
为了完成上述目标,我们得把传入的匿名Observer
对象封装在一个SafeObserver
里以提供上述保障。例如第2项,我们要跟踪complete
或error
是否被调用。再比如第3项,我们得知道consumer什么时候要unsubscribe
。最后,再看第4项,我们还得知道unsubscribe
的逻辑,以便在complete
或error
之后帮用户完成unsubscribe
操作。
如果我们想对上面那个简陋的Observable
进行安全性改造,代码一定变得和屎一样。不信可以自己到JSBin感受一下。我就不把那个鄙陋的实现放在这里了,文章显得太长没法读了。但我们还是可以看看,用了SafeObserver
后,我们的Observable
变成什么样儿了:
function myObservable(observer) {
const safeObserver = new SafeObserver(observer);
const datasource = new DataSource();
datasource.ondata = (e) => safeObserver.next(e);
datasource.onerror = (err) => safeObserver.error(err);
datasource.oncomplete = () => safeObserver.complete();
safeObserver.unsub = () => {
datasource.destroy();
};
return safeObserver.unsubscribe.bind(safeObserver);
}
# 改进Observable
:更好的SafeObserver
将Observable
声明为class/object,让其接受一个函数作为构造器参数,这个函数以SafeObserver
作为参数,以此向开发人员提供更友好的用法。因为在Observable
的subscribe
实现中控制了SafeObserver
的创建过程,Observable
可以书写成如下简单格式:
const myObservable = new Observable((observer) => {
const datasource = new DataSource();
datasource.ondata = (e) => observer.next(e);
datasource.onerror = (err) => observer.error(err);
datasource.oncomplete = () => observer.complete();
return () => {
datasource.destroy();
};
});
你会发现,上面的代码和我们第一次写的非常相似。但她更易于阅读和理解。我在之前的例子(JSBin)里增加了最精简版本的Observable
实现
# Operators
:也是函数
Operator
在RxJS
像是这样一种函数,她接收一个Observable
,然后返回一个新的Observable
,当subscribe
返回的那个新的Observable
时,她内部会自动subscribe
前一个Observable
。我们来简单实现一个独立的Operator
,看这儿JSBin:
function map(source, project) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(project(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return source.subscribe(mapObserver);
});
}
关于Operator
最重要也最值得注意的是:当你subscribe
她返回的的那个新的Observable
时,她创建了一个mapObserver
来做最终工作,并将其与前一个Observer
链了起来。构建Operator
链式结构,简单点说就是创建了一个模板在Subscription
时把Observers
链在一起。
# 改进Observable
:让Operator
的链式帅起来
如果把Operator
都写成如上那种独立的函数,我们链式代码会逐渐变丑:
map(map(myObservable, (x) => x + 1), (x) => x + 2);
现在对着上面的代码,想象一下有5、6个嵌套着的Operator
,再加上更多、更复杂的参数。基本上就没法儿看了
你也可以试下Texas Toland提议的简单版管道实现,合并压缩一个数组的Operator
并生成一个最终的Observable
,不过这意味着要写更复杂的Operator
,上代码:JSBin。其实写完后你会发现,代码也不怎么漂亮:
pipe(myObservable, map(x => x + 1), map(x => x + 2));
理论上,我们想将代码用更自然的方式链起来:
myObservable.map(x => x + 1).map(x => x + 2);
所幸,我们已经有了这样一个Observable
类,我们可以利用她在不增加复杂度的情况下完成多Operators
的链式结构,不过之前的例子没有是用到牛逼的prototype,下面我们采用prototype方式再次实现一下Observable
,(JSBin):
Observable.prototype.map = function (project) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(project(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return this.subscribe(mapObserver);
});
};
现在我们终于有了一个还不错的实现。这样实现还有其他好处,例如:可以写子类继承Observable
(比方说:封装Promise
或静态值),然后在子类中重写某些内容以优化程序。
# 嫌长别看:Observable
就是函数,她接受Observer
作为参数,又返回一个函数
牢记,读完了上面所有内容,所有的设计都是基于一个简单函数。Observable
就是函数,她接受Observer
作为参数,又返回一个函数。再没别的了。如果你写了一个函数,接受一个Observer
作为参数,又返回一个函数,那么,她是异步的、还是同步的?都不是。她就是个函数。任何函数的行为都依赖于她具体的实现内容。所以当你处理一个Observable
时,就把她当成一个普通函数,里面没有什么黑魔法。当你要构建Operator
链时,你做的其实就是生成一个函数将一堆Observers
链在一起,然后让真正的数据依次穿过他们。
注意:上面我们给出的
Observable
实例实现依旧返回一个函数,但RxJS
和es-observable
返回的是Subscription
对象。Subscription
对象是更好的设计,但我恐怕得专门写篇关于她的文章。我在这里返回一个撤销订阅unsubscribe
的函数仅仅是为了保持例子在一个简单易懂的范围。