求游助唱的【sunshine】当我唱起这首歌吉他谱的MP3下载,最好是百度云

框架源码分析(2)
首先看一下RxJava简单使用
//被观察者,它决定何时触发事件及触发事件order
Observable observable = Observable.create(new Observable.OnSubscribe&String&() {
public void call(Subscriber&? super String& subscriber) {
subscriber.onNext(&Hello&);
subscriber.onNext(&world&);
subscriber.onCompleted();
}).map(new Func1&String, String&() {//onNext调用前做的中间处理(即数据过滤转换)
public String call(String t) {
return t+&mapAction&;
observable.subscribe(subscriber);//消息订阅,观察者
Subscriber&String& subscriber = new Subscriber&String&() {
public void onNext(String s) {
println(&nextCallback--&&+s);
public void onCompleted() {
println(&onCompleted&);
public void onError(Throwable e) {
println(&onError&);
};输出结果:
nextCallback--&HellomapAction
nextCallback--&worldmapAction
onCompleted
从例子中看出RxJava主要组成:
Observable:被观察者,被观察者本身
OnSubscribe:通知观察者执行哪些行为
Subscriber:观察者,通过实现对应方法做具体处理
一:订阅过程
public final Subscription subscribe(Subscriber&? super T& subscriber) {
return Observable.subscribe(subscriber, this);
static &T& Subscription subscribe(Subscriber&? super T& subscriber, Observable&T& observable) {
// validate and proceed
// new Subscriber so onStart it
subscriber.onStart();
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber&T&(subscriber);
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
// if an unhandled error occurs executing the onSubscribe we will propagate it
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException(&Error occurred attempting to subscribe [& + e.getMessage() + &] and then again while trying to pass to onError.&, e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
return Subscriptions.unsubscribed();
subscribe.onStart()默认空(重写做一些初始化操作),对subscriber包装成SafeSubscriber,这个SafeSubscriber对原subscriber的一系列方法做了更完善的处理,包括:onError()与onCompleted()只有一个被执行,一旦onError()或者onCompleted()被调用,将从订阅列表SubscriptionList中移除该subscriber将不再调用onNext()等。这里封装为SafeSubscriber后,调onSubscribe.call(),并将subscriber传入,我们看到observerable,onSubscriber,subcreber,被传给RxJavaHooks中,把三者装在里面
* Hook to call before the child subscriber is subscribed to the OnSubscribe action.
@SuppressWarnings({ &rawtypes&, &unchecked& })
public static &T& Observable.OnSubscribe&T& onObservableStart(Observable&T& instance, Observable.OnSubscribe&T& onSubscribe) {
Func2&Observable, Observable.OnSubscribe, Observable.OnSubscribe& f = onObservableS
if (f != null) {
return f.call(instance, onSubscribe);
return onS
* Hook to call before the Observable.subscribe() method is about to return a Subscription.
* @param subscription the original subscription
* @return the original or alternative subscription that will be returned
public static Subscription onObservableReturn(Subscription subscription) {
Func1&Subscription, Subscription& f = onObservableR
if (f != null) {
return f.call(subscription);
这里存储subscriber,obseverable,管理回调。这样就完成了一次订阅。
二:数据转换
收到Observable的消息之前我们有可能会对数据流进行处理,例如map()、flatMap()、deBounce()、buffer()等方法,
这里使用了map()方法,它接收了observeable的数据并将通过该方法将数据进行转换后的新数据发出去,即做了中间转化
public final &R& Observable&R& map(Func1&? super T, ? extends R& func) {
return create(new OnSubscribeMap&T, R&(this, func));
}具体操作交给内部类MapSubscriber
static final class MapSubscriber&T, R& extends Subscriber&T& {
final Subscriber&? super R&
final Func1&? super T, ? extends R&
public void onNext(T t) {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
actual.onNext(result);
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
actual.onError(e);
public void onCompleted() {
if (done) {
actual.onCompleted();
public void setProducer(Producer p) {
actual.setProducer(p);
MapSubscriber实现Subscriber,持有传入的Subscriber,通过Func1的mapper.call(t)进行转换后再传递给subscriber onNext(),
public interface Func1&T, R& extends Function {
R call(T t);//通过参数t转换为类型T
三:任务调度(scheduler)
通过使用subscribeOn()、observeOn()方法传入对应的Scheduler去指定每个操作应该运行在何种线程之中
Observable.create(...)
&& &&& &...
&& &&&& .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
&& &&& &...
&& &&&& .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
&& &&&& .subscribe(...)
public final Observable&T& subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable&T&)this).scalarScheduleOn(scheduler);
return create(new OperatorSubscribeOn&T&(this, scheduler));
创建了一个新的Observable,并为新的Observable创建了新的计划表OperatorSubscribeOn对象,新的计划表保存了原始Observable对象和调度器scheduler
public final class OperatorSubscribeOn&T& implements OnSubscribe&T& {
final Observable&T&//原observable
public void call(final Subscriber&? super T& subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
//使用传入的scheduler创建Worker对象,调用schedule方法切换线程
inner.schedule(new Action0() {
public void call() {
final Thread t = Thread.currentThread();
//创建了新的观察者
Subscriber&T& s = new Subscriber&T&(subscriber) {
public void onNext(T t) {
subscriber.onNext(t);
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
inner.schedule(new Action0() {
public void call() {
p.request(n);
source.unsafeSubscribe(s);
可以看出调度又scheduler Worker执行
Scheduler io = hook.getIOScheduler();
if (io != null) {
ioScheduler =
ioScheduler = RxJavaSchedulersHook.createIoScheduler();
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException(&threadFactory == null&);
return new CachedThreadScheduler(threadFactory);
public Worker createWorker() {
return new EventLoopWorker(pool.get());
final AtomicReference&CachedWorkerPool&
这里的pool是一个原子变量引用AtomicReference,所持有的则是CachedWorkerPool,因而这个pool是用来保存worker的缓存池,
我们从缓存池里拿到需要的worker并作了一层封装成为EventLoopWorker:最后调用NewThreadWorker 的scheduleActual
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
executor =
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null);
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
return scheduleActual(action, delayTime, unit);
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);//
ScheduledAction run = new ScheduledAction(decoratedAction);//将传进来的Action分给ScheduledAction
if (delayTime &= 0) {
f = executor.submit(run);//提交到线程池执行
f = executor.schedule(run, delayTime, unit);
run.add(f);
}Scheduled实现Runable接口
public void run() {
lazySet(Thread.currentThread());
action.call();//调用action call方法,实现线程切换
} catch (OnErrorNotImplementedException e) {
signalError(new IllegalStateException(&Exception thrown on Scheduler.Worker thread. Add `onError` handling.&, e));
} catch (Throwable e) {
signalError(new IllegalStateException(&Fatal Exception thrown on Scheduler.Worker thread.&, e));
} finally {
unsubscribe();
再看看observeOn
public final Observable&T& observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable&T&)this).scalarScheduleOn(scheduler);
return lift(new OperatorObserveOn&T&(scheduler, delayError, bufferSize));
public final class OperatorObserveOn&T& implements Operator&T, T& {
private final S
 public Subscriber&? super T& call(Subscriber&? super T& child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
} else {//创建代理观察者,接收上一级Obsevable传递的数据
ObserveOnSubscriber&T& parent = new ObserveOnSubscriber&T&(scheduler, child, delayError, bufferSize);
parent.init();
//代理观察者,实现Action,调用Scheduler schedule方法实现线程调度
static final class ObserveOnSubscriber&T& extends Subscriber&T& implements Action0 {
final Subscriber&? super T&
final Scheduler.Worker recursiveS
public ObserveOnSubscriber(Scheduler scheduler, Subscriber&? super T& child, boolean delayError, int bufferSize) {
this.child =
this.recursiveScheduler = scheduler.createWorker();
// signal that this is an async operator capable of receiving this many
request(calculatedSize);
public void onNext(final T t) {
schedule();
public void onCompleted() {
schedule();
public void onError(final Throwable e) {
schedule();
//任务调度,交给线程池回调Action
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
// only execute this from schedule()
public void call() {
未完待续。。。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:541856次
积分:5404
积分:5404
排名:第3835名
原创:73篇
译文:10篇
评论:207条
阅读:4450
阅读:313083443人阅读
读书笔记(49)
本文翻译来自–&
感觉RxJava最近风生水起,不学习一下都不好意思了,洒家也是初学RxJava,也是感觉代码好像更复杂更难懂了,看了一篇外文感同身受,简单翻译一下。本文简单介绍使用RxJava优势所在。但可能需要有一点RxJava基础,推荐先看一下抛物线的那篇经典的匠心写作。
—–华丽分割线,译文开始———
Reactive Extensions (Rx) 是一系列接口和方法,为开发者提供了一种易懂且迅速简单易维护的方法。RxJava就是干这事儿的,提供一系列tools来帮你写出简洁的代码。
老实说,一开始我认为RxJava 写的代码理解起来很困难,并且引入一个库,单单就是为了用用这种新式的api,这困扰到了我。后来,我懂了。以传统的编码方式,随着app的发展,我需要重构代码、一遍一遍的重复样板代码,以满足用户不断变更的新需求,这让我苦不堪言。
我做的大量工作,其实是改写相关方法和接口,就是因为需求的变更(这是开发与产品间那些血案的原罪)或者需要改变展示的信息亦或是需要改变处理信息数据..这很抓狂。另外,这种代码让其他来维护的人来理解,通常是很耗时的。
举个栗子:我们需要从数据库获取一组用户的链表数据,并展示出来。我们可以用AsyncTask后台查询数据库,获得的结果给Ui的适配器展示出来。简单示例代码:
public class SampleTask extends AsyncTask&Void,Void,List&Users&& {
private final SampleAdapter mA
public SampleTask(SampleAdapter sampleAdapter) {
mAdapter = sampleA
protected List&Users& doInBackground(Void... voids) {
List&Users& users = getUsersFromDatabase();
protected void onPostExecute(List&Users& users) {
super.onPostExecute(products);
if(users == null) {
showEmptyUsersMessageView();
for(User user : users){
mAdapter.add(user);
mAdapter.notifyDataSetChanged();
现在有个新需求,要求只显示非guest的user,我们处理的方法是,在添加到adapter前加个条件判断是不是guset,或者改变数据库查询的条件。更有甚者,你又被要求从数据库中获取另外的其他信息,跟user一并在这个adapter中显示出来呢?
这就是我们为什么要用RxJava了,把我们从这个泥潭中拉出来。换个姿势,我们Rx代码是这样子(假设您已学习过Rx基础用法):
public Observable&List&User&& fetchUsersFromDatabase() {
return Observable.create(new Observable.OnSubscribe&List&User&(){
public void call(Subscriber&? super List&User&& subscriber){
subscriber.onNext(getUserList());
subscriber.onCompleted();
像这样被调用:
fetchUsersFromDatabase()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber&List&User&&() {
public void onCompleted() {
public void onError(Throwable e) {
public void onNext(List&User& users) {
开始改需求了哈
怎么不显示guests呢,RxJava分分钟过滤掉这种不速之客:
fetchUsersFromDatabase()
.filter(new Func1&User, Boolean&() {
public Boolean call(User user) {
return !user.isGuest();
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber&User&() {
public void onCompleted() {
public void onError(Throwable e) {
public void onNext(User user) {
传统的方式,即便是个简单的变更,为了保持优雅的接口化编程,我们也得创建新接口,重构代码来实现过滤。但是使用RxJava让这一切变得优雅了,我们只需要一个被观察者用来获取所有的信息,让后你就可以尽情的用这些来过滤获取你想要的数据。
可能你又会说了,ok,这是很好很易读的结构,但是这似乎使代码量变多了呢。well you are right,但是这就是闪耀的时候了,这个库为我们兼容了以使用java8 lambda表达式,方法引用等等。
帮我们简化代码如下:
fetchUsersFromDatabase()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(value -& {
},error -& {
这个问题完美搞定,然后你又开始问了,我需要增加另外的查询结果和user一同显示在这个adapter中怎么破。这真不是事儿:
fetchUsersFromDatabase()
.zipWith(fetchSomethingElseFromDatabase(), (users, somethingElse) -& {
.subscribe( o -& {
如上,我们可以轻松组合数据库查出来的其他数据和users给一个adapter一同显示。是不是更易维护,代码少,易读,清晰?
如果要更深入的学习RXJava可以看下面这篇文章,我看后受益匪浅。
另外,这篇教程 也帮我在RxJava路上进阶了很多。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:231775次
积分:3242
积分:3242
排名:第8201名
原创:75篇
评论:33条
为了让这个id看起来像是个做技术的,我把这个模块之前情怀的流露忍痛去掉了。要做个正经的人!
(1)(1)(9)(3)(6)(4)(6)(8)(21)(18)我们为什么要在Android中使用RxJava
博客专家
我们为什么要在Android中使用RxJava
本文翻译来自–&
感觉RxJava最近风生水起,不学习一下都不好意思了,洒家也是初学RxJava,也是感觉代码好像更复杂更难懂了,看了一篇外文感同身受,简单翻译一下。本文简单介绍使用RxJava优势所在。但可能需要有一点RxJava基础,推荐先看一下抛物线的那篇经典的匠心写作。
—–华丽分割线,译文开始———
Reactive Extensions (Rx) 是一系列接口和方法,为开发者提供了一种易懂且迅速简单易维护的方法。RxJava就是干这事儿的,提供一系列tools来帮你写出简洁的代码。
老实说,一开始我认为RxJava 写的代码理解起来很困难,并且引入一个库,单单就是为了用用这种新式的api,这困扰到了我。后来,我懂了。以传统的编码方式,随着app的发展,我需要重构代码、一遍一遍的重复样板代码,以满足用户不断变更的新需求,这让我苦不堪言。
我做的大量工作,其实是改写相关方法和接口,就是因为需求的变更(这是开发与产品间那些血案的原罪)或者需要改变展示的信息亦或是需要改变处理信息数据..这很抓狂。另外,这种代码让其他来维护的人来理解,通常是很耗时的。
举个栗子:我们需要从数据库获取一组用户的链表数据,并展示出来。我们可以用AsyncTask后台查询数据库,获得的结果给Ui的适配器展示出来。简单示例代码:
public class SampleTask extends AsyncTask&Void,Void,List&Users&& {
private final SampleAdapter mA
public SampleTask(SampleAdapter sampleAdapter) {
mAdapter = sampleA
protected List&Users& doInBackground(Void... voids) {
List&Users& users = getUsersFromDatabase();
protected void onPostExecute(List&Users& users) {
super.onPostExecute(products);
if(users == null) {
showEmptyUsersMessageView();
for(User user : users){
mAdapter.add(user);
mAdapter.notifyDataSetChanged();
现在有个新需求,要求只显示非guest的user,我们处理的方法是,在添加到adapter前加个条件判断是不是guset,或者改变数据库查询的条件。更有甚者,你又被要求从数据库中获取另外的其他信息,跟user一并在这个adapter中显示出来呢?
这就是我们为什么要用RxJava了,把我们从这个泥潭中拉出来。换个姿势,我们Rx代码是这样子(假设您已学习过Rx基础用法):
public Observable&List&User&& fetchUsersFromDatabase() {
return Observable.create(new Observable.OnSubscribe&List&User&(){
public void call(Subscriber&? super List&User&& subscriber){
subscriber.onNext(getUserList());
subscriber.onCompleted();
像这样被调用:
fetchUsersFromDatabase()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber&List&User&&() {
public void onCompleted() {
public void onError(Throwable e) {
public void onNext(List&User& users) {
开始改需求了哈
怎么不显示guests呢,RxJava分分钟过滤掉这种不速之客:
fetchUsersFromDatabase()
.filter(new Func1&User, Boolean&() {
public Boolean call(User user) {
return !user.isGuest();
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber&User&() {
public void onCompleted() {
public void onError(Throwable e) {
public void onNext(User user) {
传统的方式,即便是个简单的变更,为了保持优雅的接口化编程,我们也得创建新接口,重构代码来实现过滤。但是使用RxJava让这一切变得优雅了,我们只需要一个被观察者用来获取所有的信息,让后你就可以尽情的用这些来过滤获取你想要的数据。
可能你又会说了,ok,这是很好很易读的结构,但是这似乎使代码量变多了呢。well you are right,但是这就是闪耀的时候了,这个库为我们兼容了以使用java8 lambda表达式,方法引用等等。
帮我们简化代码如下:
fetchUsersFromDatabase()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(value -& {
},error -& {
这个问题完美搞定,然后你又开始问了,我需要增加另外的查询结果和user一同显示在这个adapter中怎么破。这真不是事儿:
fetchUsersFromDatabase()
.zipWith(fetchSomethingElseFromDatabase(), (users, somethingElse) -& {
.subscribe( o -& {
如上,我们可以轻松组合数据库查出来的其他数据和users给一个adapter一同显示。是不是更易维护,代码少,易读,清晰?
如果要更深入的学习RXJava可以看下面这篇文章,我看后受益匪浅。
另外,这篇教程 也帮我在RxJava路上进阶了很多。
我的热门文章
10
即使是一小步也想与你分享下次自动登录
关注移动互联网和移动APP开发工具、开发框架、测试工具、微信开发、Android源码、Android开源类库以及各种开源组件的IT科技网站
现在的位置:
RxJava 常见误区(一):过度使用 Subject
本文首发:
转载请注明出处
准备写这篇文章的时候看了下 RxJava 在 Github 上已经 12000+ 个 star 了,可见火爆程度,自己使用 RxJava 也已经有一小段时间。最初是在社区对 RxJava 一片赞扬之声下,开始使用 RxJava 来代替项目中一些简单异步请求,到后来才开始接触一些高级玩法,这中间阅读别人的代码加上自己踩的坑,慢慢积累了一些经验,很多都是新手容易犯的错误和 RxJava 容易被误解的地方。这些内容一篇文章写不完,所以我打算写成一个系列,这篇文章是这个系列的第一篇。
谨慎使用Subject
Subject既是Observable也是Observer,由于它自己本身是Observer,所以项目中任何地方都可以调用它的onNext方法(只要能获得该 Subject 的引用)。看起来很好对不对?比起Observable.create, Observable.from, Observable.just方便多了,这三个工厂方法都有一个特点,那就是所构建出来的 Observable 发射的元素是确定的,甚至在很多例子中,待发射的元素就像常量一样在编译期就已经可以确定。我在一开始学习这些入门的小例子的时候心里也在想,实际情况哪有这样简单:用户与 UI 交互的事件,移动设备网络类型的改变( WIFI 与蜂窝网络的切换),服务器推送消息的到达,这些事件何时发生和产生的数量都是在运行时才能得知,怎么可能用这些工厂方法简单地就发射几个固定的值。
直到我遇见了Subject。我可以先创建一个一开始什么元素都不发射的Observable(Subject是Observable的子类),并且同时创建对应的Subscriber订阅这个Observable,然后在我觉得某个 Ready 的时机,调用这个Subject对象的onNext方法,向它的Subscriber发射元素。逻辑简洁,并且足够灵活,代码如下所示:
PublishSubject&String& subject = PublishSubject.create();
subject.map(String::length)
.subscribe(System.out::println);
subject.onNext("Puppy");
subject.onCompleted();
使用 Subject 可能导致错过真正关心的事件
到目前看来,一切都顺理成章,对比Observable,Subject优势明显,可以按需在合适的时机发射元素,似乎是Subject更能满足日常任务需求,更激进一点,干脆就用Subject来代替所有的Observable吧。实际上,我也这么做过,但是很快就遇到了问题。举个例子,代码如下:
PublishSubject&String& operation = PublishSubject.create();
.subscribe(new Subscriber&String&() {
public void onCompleted() {
System.out.println("completed");
public void onError(Throwable e) {
public void onNext(String s) {
System.out.println(s);
operation.onNext("Foo");
operation.onNext("Bar");
operation.onCompleted();
这段代码很简单,按照预期,它的输出为:
稍微改一下,使用 RxJava 的调度器Scheduler指定operation对象从 IO 线程发射元素,代码如下(本文中的代码都是从main函数启动运行的):
PublishSubject&String& operation = PublishSubject.create();
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber&String&() {
public void onCompleted() {
System.out.println("completed");
public void onError(Throwable e) {
public void onNext(String s) {
System.out.println(s);
operation.onNext("Foo");
operation.onNext("Bar");
operation.onCompleted();
sleep(2000);
以上代码实际输出的结果为:
上面这段代码中,除了加上调度器以外,最后还增加了一行代码使当前线程休眠 2 秒,原因是operation对象改从 IO 线程发射元素以后,main 线程由于运行到最后一行直接退出了,导致整个进程结束,此时 IO 线程还没有开始发射元素,所以这 2 秒是用来等待 IO 线程启动起来并把该做的事情做完。
经过改动后的代码并没有接收到Foo和Bar,如果把最后一行sleep(2000)去掉,那么 Console 将不会输出任何内容。这便是我们需要谨慎使用Subject的第一个理由: 使用 Subject 可能导致错过真正关心的事件。
在RxJava中,Observable可以被分为Hot Observable与Cold Observable,引用《Learning Reactive Programming with Java 8》中一个形象的比喻(翻译后的意思):
我们可以这样认为,Cold Observable在每次被订阅的时候为每一个Subscriber单独发送可供使用的所有元素,而Hot Observable始终处于运行状态当中,在它运行的过程中,向它的订阅者发射元素(发送广播、事件),我们可以把Hot Observable比喻成一个电台,听众从某个时刻收听这个电台开始就可以听到此时播放的节目以及之后的节目,但是无法听到电台此前播放的节目,而Cold Observable就像音乐 CD ,人们购买 CD 的时间可能前后有差距,但是收听 CD 时都是从第一个曲目开始播放的。也就是说同一张 CD ,每个人收听到的内容都是一样的, 无论收听时间早或晚。
Subjcet是属于Hot Observable的。Cold Observable可以转化为Hot Observable, 但是反过来却不行。回过头来解释上面的例子为什么最后只输出了completed: 因为operation对象发射元素的线程被指派到了 IO 线程,相应的Subscriber也工作在 IO 线程,而 IO 线程第一次被Scheduler调用,还没起来(正在初始化),发射前两个元素Foo,Bar是在主线程,主线程的这两个元素往 IO 线程转发的过程中由于 IO 线程还没有起来,就被丢弃了(电台即使没有一个听众,照样可以播音)。complete信号比较特殊,在Reactive X的设计中,该信号优先级非常高,所以总是可以被优先捕获,不过这是另外一个话题。
所以使用Subject的时候,我们必须小心翼翼地设计程序,确保消息发送的时机是在Subscriber已经Ready的时候,否则我们就很容易错过我们关心的事件,当代码今后面临重构的时候,其他的程序员也必须知道这个逻辑,否则就很容易引入 Bug 。如果我们不希望错过任何事件,那么我们应该尽可能使用Cold Observable,上面的例子如果operation对象使用Observable.just, Observable.from来构造,就不会有这种问题了。
其实,错过事件这种情况一般发生在临界条件下,比如我刚刚声明一个Subscriber并且希望立即发送一个事件给它。这时候最好不要使用Subject而是使用Observable.create(OnSubscribe)。上面有问题的代码改成下面这样, 就可以正常工作了:
Observable&String& operation = Observable.create(subscriber -& {
subscriber.onNext("Foo");
subscriber.onNext("Bar");
subscriber.onCompleted();
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber&String&() {
public void onCompleted() {
System.out.println("completed");
public void onError(Throwable e) {
public void onNext(String s) {
System.out.println(s);
sleep(2000);
Subjcet 不是线程安全的
使用Subject的第二个问题便是它 不是线程安全的 ,如果在不同的线程调用它的onNext方法,很有可能造成竞态条件(race conditions),我们应该尽可能避免这种情况的出现,因为除非在代码中写足够详细的注释,否则日后维护这段代码的程序员很可能莫名其妙地踩了坑。如果你认为你确实有必要使用Subject, 那么请把它转化为SerializedSubject,它可以保证如果多个线程同时调用onNext方法,依然是线程安全的。
SerializedSubject&Integer,Integer& subject =
PublishSubject.&Integer&create().toSerialized();
Subject 使事件的发送变得不可预知
最后一个我们应该谨慎对待Subject的原因就是它 让事件的发送变得不可预知。由于Observable.create使用的例子上面已经给出,再看另外两个工厂方法Observable.just和Observable.from的例子:
Observable&String& values = Observable.just("Foo", "Bar");
Observable myObservable = Observable.from(new String[]{"Foo","Bar"});
无论是Observable.create, Observable.from 还是 Observable.just , 这些 Cold Observable 都有一个显著的优点就是数据的来源可预知,我知道将会发送哪些数据,这些数据是什么类型。但是Subject就不一样,我如果创建一个Subject,那么代码任何地方只要能 Get 到这个引用,就可以随意使用它发射元素,滥用的后果导致代码越来越难以维护,我不知道其他人是否在某个我不知道的地方发射了我不知道的元素,我相信谁都不愿意维护这样的代码。这是一种反模式,就和 C 语言当初模块化的理念尚未深入人心的时候全局变量带来的灾难一样。
也许看到这里你会想,说了半天好像又回到起点了,Subject带给编程的灵活性不推荐用,为了这些理由又要重新用那三个不灵活的工厂方法,确实不能满足需求啊。我们回顾一下之前提到过的编程中经常遇到的实际情况:
用户与 UI 交互的事件
移动设备网络类型的改变( WIFI 与蜂窝网络的切换)
服务器推送消息的到达
其实这些事件往往都是以注册监听器的接口提供给程序员的,我们完全可以使用Observable.create这个工厂方法来创建Observable:
final class ViewClickOnSubscribe implements Observable.OnSubscribe&Void& {
ViewClickOnSubscribe(View view) {
this.view =
@Override public void call(final Subscriber&? super Void& subscriber) {
verifyMainThread();
View.OnClickListener listener = new View.OnClickListener() {
@Override public void onClick(View v) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(null);
view.setOnClickListener(listener);
subscriber.add(new MainThreadSubscription() {
@Override protected void onUnsubscribe() {
view.setOnClickListener(null);
以上代码来自的 Android 项目
,目的是将 Android UI 上的用户与控件交互产生的事件转化为Observable提供给程序员。上面的代码思路很简单,就是当有一个Subscriber想要订阅View的点击事件的时候,就为这个View在 Android Framework 里注册一个点击的回调(view.setOnClickListener(listener)), 每当点击事件来临的时候就去调用Subscriber的onNext方法。
我们再对比一下另一种不那么好的写法:
PublishSubject&String& subject = PublishSubject.create();
View.OnClickListener listener = new View.OnClickListener() {
@Override public void onClick(View v) {
subject.onNext(null);
view.setOnClickListener(listener);
这里的subject还只是整个项目局部的代码,我们并不知道其他地方有没有把subject对象给怎么样,潜在的风险就是我们刚刚讨论的 可能会错过临界情况下的事件、 线程不安全、 事件来源不可预知。
我们已经了解到了Subject给我们带来的灵活性以及风险,所以在实际项目中使用的时候我推荐更多地使用Observable提供的3个工厂方法,而慎重使用Subject,其实90%的情况都可以使用那3个工厂方法解决,如果你确定要使用Subject,那么确保:1. 这是一个 Hot Observable 且你有对应措施保证不会错过临界的事件;2. 有对应的线程安全措施;3. 模块化代码,确保事件的发送源在掌控中,事件的发送完全可预期。对了,另外加上必要的注释:)
【上篇】【下篇】

我要回帖

更多关于 sunshine girl原唱 的文章

 

随机推荐