rxjava是什么 为什么说 是响应式的

RxJava在安卓上应用
RxJava在安卓上应用
编辑日期: 字体:
如果你做过Android(和Java)的开发,很有可能已经听说过RxJava了。RxJava是由Netflix开发的响应式扩展(Reactive Extensions)的Java实现。引用,Reactive Extensions是这样一个第三方库:它结合了可观察集合和LINQ式查询以达到异步和基于事件的编程效果。Netflix将这个库托管到了Github上,支持Java6以上的版本并且使它可以用于Android App开发。
本篇是介绍RxJava和Android的系列文章的第一篇,将会介绍如何在Android中使用RxJava observables(基于Square的Retrofit组件)创建REST API客户端。
我们从添加所需的库文件开始。如果你用Maven的话,只需将下面的dependencies(依赖库)加到pom.xml中即可:
&dependency&
&groupId&com.squareup.retrofit&/groupId&
&artifactId&retrofit&/artifactId&
&version&1.2.2&/version&
&/dependency&
&dependency&
&groupId&com.netflix.rxjava&/groupId&
&artifactId&rxjava-android&/artifactId&
&version&0.14.6&/version&
&/dependency&
在本文中,我们将用气象地图开放平台(OpenWeatherMap) API作为演示示例。 是一个免费的天气数据API,非常易于配置和使用,调用时只需传入位置信息(城市名或者是地理坐标)作为参数即可,具体效果请参见这个。它默认传输的是JSON格式的数据(但也可以配置为XML或HTML格式)。精度和温度单位也是可以配置的,更多详情请看。
通常要实现调用一个API需要如下这几个步骤(每个步骤都有一堆公式化代码):
创建所需的模型类(必要时,添加上注解)。
实现请求—回应管理的网络层代码,并带错误处理。
用后台线程实现请求调用(一般是用异步任务的形式实现),用一个回调函数(Callback Function)来实现在UI线程上呈现回应信息。
创建模型类
第一步我们可以依靠一些类似的JSON-POJO生成工具(半)自动化完成。OpenWeather API的模型类如下:
public class WeatherData {
public List&Weather&
public static class Coordinates {
public static class Local {
public static class Weather {
public static class Main {
public double temp_
public double temp_
public double sea_
public double grnd_
public static class Wind {
public static class Rain {
public static class Cloud {
用Retrofit实现网络调用
第二步中网络调用的实现通常我们需要写一大堆公式化的代码,但如果用Square公司的来实现的话将大大减少代码量。只需要创建一个接口类(用注释来描述整个请求),然后用RestAdapter.Builder来创建客户端就行了。Retrofit也可以用来完成JSON的序列化与反序列化。
private interface ApiManagerService {
@GET("/weather")
WeatherData getWeather(@Query("q") String place, @Query("units") String units);
上面的示例中我们可以看到,方法前的注释是由一个HTTP方法(我们这里用的是GET,当然你也可以按需要用Retrofit实现POST、 PUT、DELETE和HEAD方法)和一个相对路径(基本路径是由RestAdapter.Builder提供的)。@Query注释用于组装请求参 数,我们这有两个参数,一个是place(代表位置),另一个是units计量单位。
我们来看一个具体的调用示例(实际代码中应该把这个调用放到一个非UI线程里)。这段代码还是比较容易理解的:
final RestAdapter restAdapter = new RestAdapter.Builder()
.setServer("http://api.openweathermap.org/data/2.5")
final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);
final WeatherData weatherData = apiManager.getWeather("Budapest,hu", "metric");
怎么样,很简单吧,你只需要很少的代码就实现了整个调用过程,这就是Retrofit的威力,要了解更多,请点击。
用RxJava实现响应式编程
现在我们就进入第三步了:RxJava部分!我们这里示例将用它来实现异步的请求调用。但这并不是RxJava所有的功能,以下对RxJava的介绍引用自Netflix的Github 知识库:
RxJava 是一个在Java虚拟机上实现的响应式扩展库:提供了基于observable序列实现的异步调用及基于事件编程。
它扩展了观察者模式,支持数据、事件序列并允许你合并序列,无需关心底层的线程处理、同步、线程安全、并发数据结构和非阻塞I/O处理。
它支持Java5及更高版本,并支持其他一些基于JVM的语言,如Groovy、Clojure和Scala。
我们假设你已经对RxJava有一些了解。如果没有的话,强烈建议先看看和Netflix在的前几页。
在最后的这个示例中,我们将实现一个API 管理器负责生成observable对象,并完成多并发调用(每个调用都请求同一个地址,但参数不同)。
首先我们需要将前面创建的接口类,换为这个类:
public class ApiManager {
private interface ApiManagerService {
@GET(&/weather&)
WeatherData getWeather(@Query(&q&) String place, @Query(&units&) String units);
private static final RestAdapter restAdapter = new RestAdapter.Builder()
.setServer(&http://api.openweathermap.org/data/2.5&)
private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);
public static Observable&WeatherData& getWeatherData(final String city) {
return Observable.create(new Observable.OnSubscribeFunc&WeatherData&() {
public Subscription onSubscribe(Observer&? super WeatherData& observer) {
observer.onNext(apiManager.getWeather(city, &metric&));
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
return Subscriptions.empty();
}).subscribeOn(Schedulers.threadPoolForIO());
我们先来看下getWeatherData()这个方法,它调用了Observable.create()方法并向方法传入一个 Observable.OnSubscribeFunc的实现,以此得到一个Observable对象并返回。并且一旦Observable对象被订阅 (subscribed)后就会开始工作。Observable每次处理的结果都会当作参数传给onNext()方法。因为我们这里只是想实现网络请求的 并发调用,所以只需要让每个Observable对象中调用一次请求即可。代码最后调用onComplete()方法。这里的subscribeOn() 方法很重要,它决定了程序将选用哪种线程。这里调用的是Schedulers.threadPoolForIO(),此线程用于优化IO和网络性能相关的 工作。
最后一步是要实现这个API调用。下面的代码实现了并发网络请求,每个请求都使用不同的调用参数异步调用同一个url:
Observable.from(cities)
.mapMany(new Func1&String, Observable&WeatherData&&() {
public Observable&WeatherData& call(String s) {
return ApiManager.getWeatherData(s);
.subscribeOn(Schedulers.threadPoolForIO())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1&WeatherData&() {
public void call(WeatherData weatherData) {
// do your work
Observable.from()方法将城市名称数组转化为一个observable对象,将数组里的字符串提供给不同的线程。然后mapMany()方法将会把前者提供的每一个字符串都转化为observable对象(译注:新对象包含的是weatherData对象数据)。这里的转化通过调用ApiManager.getWeatherData()完成。
这里还是注册在I/O线程池上。在Android系统上,如果需要把结果展示在UI上,就必须把数据发布给UI线程处理。因为我们知道,在 Android上只有最原始的那个创建界面的线程才可以操作界面。这里只需要用observeOn()方法调用 AndroidSchedulers.mainThread()即可。subscribe()方法的调用将触发observable对象,我们可以在这里 处理observable对象发出的结果。
这个示例展示了RxJava强大的功能。如果没有Rx,我们需要创建N个线程去调用请求,然后通过异步方式把处理结果交给UI线程。使用Rx只需编写很少的代码就完成工作,使用它强大的功能创建、合并、过滤和转化observable对象。
RxJava可以在开发安卓App时,作为一个强大的处理并发的工具使用。虽然要熟悉它还是需要一些时间,但是磨刀不误砍柴工,一旦掌握了它,将给 你带来很大帮助。响应式扩展库是个很好的想法,我们把它用于安卓程序的开发,已经用了好几个礼拜了(在不久的将来,我们产品的异步任务处理将完全基于它完 成)。越是了解它,你就越会爱上它。
还想看点其他资料不?看看吧,它讲的是RxJava如何进行错误处理。
本文固定链接:
转载请注明:
作者:leehom
本博客主要是把自己的经验记录于此,方便自己以后查阅及其他遇到类似问题的朋友参考。如果你有觉得不错的文章,可以注册会员发布文章或者邮箱发给我文章地址,谢谢!
如果觉得文章还不错,请麻烦点下广告,算是赞助下本站服务器费用,谢谢!
您可能还会对这些文章感兴趣!了解RxJava之响应式好处(三)
了解RxJava之响应式好处(三)
原文链接:
在中我浏览了RxJava的基本结构,并且介绍了map操作符。在中,我介绍了RxJava操作符的强大之处。不过你仍旧很固执,因为这不足以打动你。接下来介绍RxJava其他的优势。
0x00 错误处理
直到此时,我们还没有介绍onComplete()和onError()。他们标志着Observable停止发射事件的时机和原因(成功完成或者未知错误)。
我们的初始Subscriber能够监听onComplete() 和 onError()回调。让我们用它做点什么:
Observable.just("Hello, world!")
.map(s -& potentialException(s))
.map(s -& anotherPotentialException(s))
.subscribe(new Subscriber&String&() {
public void onNext(String s) { System.out.println(s); }
public void onCompleted() { System.out.println("Completed!"); }
public void onError(Throwable e) { System.out.println("Ouch!"); }
potentialException() 和 anotherPotentialException()两个方法都能够抛出异常,每个Observable遇到onCompleted() 或 onError()回调时都会终止。因此,程序的可能输出”Completed!”或者”Ouch!”(如果抛出异常)。
下面有几点小提示:
只要有异常抛出onError()就会调用
这简化了错误处理。我可以在单一的地方处理错误。
操作符不需要处理异常
你可以把它交给Subscriber来决定如何处理异常,因为异常的onError()。
你知道Subscriber 何时停止接收事件
了解任务何时完成,有助于的代码流程。(虽然一个Observable 可能永远不会完成。)。
我觉得这种模式比传统的错误处理更简单。传统方式是通过这个回调,你必须在每个回调中进行错误处理,这不但导致了重复的代码,而且每个回调必须知道如何处理错误,意味着你回调与调用者紧耦合。
采用响应式模式,Observable甚至不知道如何处理错误。操作符不需要处理错误。把错误处理交给Subscriber吧。
0x02 调度器(Schedulers)
你有一个进行长时间网络请求的App,所有你必须工作在其他线程中。不过,可以遇到点小麻烦!
多线程Android应用程序是困难的,因为你必须确保代码运行正确的线程;搞砸了,你的应用程序可能会崩溃。这里典型的异常就是在主线程外修改视图。
可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制,subscribeOn允许我们在指定的线程创建数据,observeOn允许我们在指定的线程发射的数据。
myObservableServices.retrieveImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -& myImageView.setImageBitmap(bitmap))
这一切多么简单?一切都在发射到Subscriber之前运行在I/O线程。最后,在我的主线程操作视图。
强大的是subscribeOn() 和 observeOn() 可以添加到任何Observable。我们不必担心Observable或者前面的操作符做过什么,我只需把他放在每个线程之后。
有点类似AsyncTask ,我必须仔细设计我的代码,当它需要并发的时候。RxJava,保留相同的代码,只是在并发的时候添加这些操作符。
0x03 订阅(Subscriptions)
有件事情,我一直隐瞒着你。当我调用Observable.subscribe()函数返回Subscription,这代表着Observable 和你的 Subscriber建立的连接:
Subscription subscription = Observable.just("Hello, World!")
.subscribe(s -& System.out.println(s))
你可以在之后通过Subscription分离建立的连接:
subscription.unsubscribe()
System.out.println("Unsubscribed=" + subscription.isUnsubscribed())
// Outputs "Unsubscribed=true"
RxJava处理终止操作看起来多么棒,如果你使用的复杂的链式操作,使用unsubscribe 终端执行,无需考虑其他工作。
请记住,这些文章是RxJava导论。相比我提到的还有更多东西等着你(例如backpressure)。我也不是所有代码都使用响应式,只要足够复杂的代码,我想分解为简单的逻辑。
最初,我计划以这篇文章作为这个系列的总结,但是有些要求在Android中使用RxJava的实例,所以就有了第四部分。我希望这个介绍能让你开始使用RxJava。如果你想了解更多,我建议阅读RxJava官方维基。记住:无限可能。
我的热门文章
即使是一小步也想与你分享RxJava 的使用与理解(一) - 推酷
RxJava 的使用与理解(一)
ReactiveX编程简称Rx编程,又叫响应式编程、响应式扩展,英文为Reactive Extensions。可以查看官方网站
,就像其网站说的”Expertise makes better software.”,响应式编程的目标是提供一致的编程接口, 帮助开发者更方便的处理异步数据流,使软件开发更高效、更简洁。Rx是一个多语言的实现,已经支持多种语言包括Java、Swift、C++、.NET、JavaScript、Ruby、Groovy、Scala等等,支持的库包括:
、Rx.NET、RxJS、RXRuby等等,真是屌炸天。在Android上我们添加
库就可以,
一种更接地气的扩展。下面让我们通过
去使用、理解
Rx使用观察者模式
创建:Rx可以方便的创建事件流和数据流
组合:Rx使用查询式的操作符组合和变换数据流
监听:Rx可以订阅任何可观察的数据流并执行操作
Rx使代码简化
函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题
先看简单的例子,通过RxJava将Integer类型转成String类型
private void funcDemo() {
Observable.OnSubscribe&Integer& onSubscribe1 = new Observable.OnSubscribe&Integer&() {
public void call(Subscriber&? super Integer& subscriber) {
subscriber.onNext(100);
Func1&Integer, String& func1 = new Func1&Integer, String&() {
public String call(Integer integer) {
return String.valueOf(integer);
Subscriber&String& subscriber1 = new Subscriber&String&() {
public void onCompleted() {
public void onError(Throwable e) {
public void onNext(String s) {
Log.d(&onNext(): &, s);
Observable.create(onSubscribe1)
.map(func1)
.subscribe(subscriber1);
// 将上面分解成三步执行
// Observable&Integer& observable1 = Observable.create(onSubscribe1);
// Observable&String& observable2 = observable1.map(func1);
// observable2.subscribe(subscriber1);
主要关注这里面生成的四个对象:observable1、onSubscribe1、func1、subscriber1,如果对相应的单词不清晰,移步下面的备注。这是典型的的被观察者与观察者的关系,或者叫被订阅者与订阅者的关系;下面理一下他们的角色:
observable1:被观察者,是subscriber1要订阅的对象。
onSubscribe1:被观察者的行为,是subscriber1要订阅的行为,subscribe()时被执行。
subscriber1:订阅者,是抽象类实现了Observer接口,可以叫观察者,其实就是观察者的角色。
// 分解成三步执行的代码
Observable&Integer& observable1 = Observable.create(onSubscribe1);
Observable&String& observable2 = observable1.map(func1);
observable2.subscribe(subscriber1);
这段代码的意思就是:创建一个被观察者observable1,给被观察者指定其所发布的行为(onSubscribe1来实现); 指定观察者时,只需指定相应的观察回调即可;在完成订阅的操作时,是先调用subscriber1的onStart方法, 之后通过订阅行为onSubscribe1来调用subscriber1完成相应的订阅操作;最后若出现异常则会回调subscriber1的onError方法。
换句话说就是:被观察者发布的行为是传递Integer类型的数值100,map()变换后,观察者收到了String类型的字符串”100”。
简单点说就是:观察者订阅了被观察者要发布的行为。
现在结合源码,一步步看它到底是怎么执行的!
第一步 Observable.create(onSubscribe1)
public static &T& Observable&T& create(OnSubscribe&T& f) {
return new Observable&T&(hook.onCreate(f));
protected Observable(OnSubscribe&T& f) {
this.onSubscribe =
这里面泛型比较多,先忽略
泛型符号;hook又是什么东东?翻译过来就是“钩子”,到底是什么钩子呢,查看源码后知道, `hook` 是一个 proxy 对象, 仅仅用作调试的时候可以插入一些测试代码的,那也先忽略。
所以Observable.create(onSubscribe1)干了两件事,第一 new 一个 observable1 对象,第二将 new 出的 onSubscribe1 对象通过 Observable 的构造函数赋值给它的成员变量 onSubscribe。
第二步 observable1.map(func1)
先看func1对象,RxJava有一系列的(Func+数字)的接口和一系列(Action+数字)接口,这些接口中都只有一个call方法,其中(Func+数字)接口的call方法都有返回值,而(Action+数字)接口的call方法都没有返回值,后面的那个数字表示call方法接受几个泛型类型的参数。看一下Func1和Action1的源码:
* Represents a function with one argument.
public interface Func1&T, R& extends Function {
R call(T t);
* A one-argument action.
public interface Action1&T& extends Action {
void call(T t);
Func1 和 Action(Action1继承Action) 都继承 Function 接口,Func1 接收一个 T 泛型类型的参数,call 回调后,返回一个 R 泛型类型的值,是一种“变换”函数接口,我们可以在 call 回调中处理这种“变换”的需求。接下来看看 map(func1) 干了神马,上源码。。。
public final &R& Observable&R& map(Func1&? super T, ? extends R& func) {
return lift(new OperatorMap&T, R&(func));
public final &R& Observable&R& lift(final Operator&? extends R, ? super T& operator) {
return new Observable&R&(new OnSubscribe&R&() {
public void call(Subscriber&? super R& o) {
// 核心代码
Subscriber&? super T& subscriber2 = hook.onLift(operator).call(o);
subscriber2.onStart();
onSubscribe1.call(subscriber2);
map()函数里直接调用的lift()函数,先看看OperatorMap和lift()函数的参数Operator是什么玩意,再上源码。。。
* Operator function for lifting into an Observable.
public interface Operator&R, T& extends Func1&Subscriber&? super R&, Subscriber&? super T&& {
// cover for generics insanity
public final class OperatorMap&T, R& implements Operator&R, T& {
final Func1&? super T, ? extends R&
public OperatorMap(Func1&? super T, ? extends R& transformer) {
this.transformer =
public Subscriber&? super T& call(final Subscriber&? super R& o) {
return new Subscriber&T&(o) {
public void onCompleted() {
o.onCompleted();
public void onError(Throwable e) {
o.onError(e);
public void onNext(T t) {
o.onNext(transformer.call(t));
Operator 继承 Func1,Operator 和 Func1 都是一种“变换”接口,比如输入Integer类型参数经过处理返回String类型值, OperatorMap 继承 Operator,但它的参数又和Operator相反,难道经过OperatorMap又把String类型变换成Integer类型值啦, 其实 OperatorMap 类有个Func1属性transformer(transformer就是func1),执行o.onNext(transformer.call(t))就将func1变换的结果传递下去了。
lift()返回的是个新的被观察者对象observable2,同时创建一个新的OnSubscribe对象,暂时标记为onSubscribe2; 在onSubscribe2回调中调用hook.onLift(operator).call(o),变换后并生成新的观察者subscriber2对象, 新的观察者subscriber2对象被绑定到原来创建onSubscribe1对象上,可以理解为subscriber2已经订阅到经过变换后的 observable1要发布的行为,最后这种变化后的行为继续被发送到subscriber1观察者那里。
最后一步 subscribe(subscriber1)
subscriber1开始订阅被观察者的行为,也可以说被观察者的行为subscribe()时被执行;我们可以在观察者的回调中处理我们最终得到的结果; 在执行订阅后返回了Subscription对象,里面包含两个方法:
public interface Subscription {
// 取消订阅
void unsubscribe();
// 订阅是否被取消
boolean isUnsubscribed();
最后总结下吧,为了了解RxJava的运行机制;我使用一个简单的函数,实现的功能就是通过RxJava将Integer类型转成String类型并打印出来,并结合源码理解其执行过程;这里面泛型、接口的回调和各种角色的变换确实不好理解;接下来我会将RxJava和Retrofit2结合起来使用, 所以
就这样产生啦。
参考文章:
observable:
adj. 显著的;觉察得到的;看得见的
n. [物] 可观察量;感觉到的事物
observer:n. 观察者;[天] 观测者;遵守者
subscribe:
vi. 订阅;捐款;认购;赞成;签署
vt. 签署;赞成;捐助
subscriber:n. 订户;签署者;捐献者
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致35168人阅读
RxJava(6)
在第一篇中,我介绍了RxJava的基础知识。第二篇中,我向你展示了操作符的强大。但是你可能仍然没被说服。这篇里面,我讲向你展示RxJava的其他的一些好处,相信这篇足够让你去使用Rxjava.
到目前为止,我们都没怎么介绍onComplete()和onError()函数。这两个函数用来通知订阅者,被观察的对象将停止发送数据以及为什么停止(成功的完成或者出错了)。
下面的代码展示了怎么使用这两个函数:
potentialException(s))
.map(s -> anotherPotentialException(s))
.subscribe(new Subscriber() {
public void onNext(String s) { System.out.println(s); }
public void onCompleted() { System.out.println(&Completed!&); }
public void onError(Throwable e) { System.out.println(&Ouch!&); }
});" data-snippet-id="ext.f374b507efce50bd0eabfa" data-snippet-saved="false" data-csrftoken="IcHbADBS-1FEWP1RD93JUWrRFagOr-qvNVSc" data-codota-status="done">Observable.just("Hello, world!")
.map(s -& potentialException(s))
.map(s -& anotherPotentialException(s))
.subscribe(new Subscriber&String&() {
public void onNext(String s) { System.out.println(s); }
public void onCompleted() { System.out.println("Completed!"); }
public void onError(Throwable e) { System.out.println("Ouch!"); }
代码中的potentialException() 和 anotherPotentialException()有可能会抛出异常。每一个Observerable对象在终结的时候都会调用onCompleted()或者onError()方法,所以Demo中会打印”Completed!”或者”Ouch!”。
这种模式有以下几个优点:
1.只要有异常发生onError()一定会被调用
这极大的简化了错误处理。只需要在一个地方处理错误即可以。
2.操作符不需要处理异常
将异常处理交给订阅者来做,Observerable的操作符调用链中一旦有一个抛出了异常,就会直接执行onError()方法。
3.你能够知道什么时候订阅者已经接收了全部的数据。
知道什么时候任务结束能够帮助简化代码的流程。(虽然有可能Observable对象永远不会结束)
我觉得这种错误处理方式比传统的错误处理更简单。传统的错误处理中,通常是在每个回调中处理错误。这不仅导致了重复的代码,并且意味着每个回调都必须知道如何处理错误,你的回调代码将和调用者紧耦合在一起。
使用RxJava,Observable对象根本不需要知道如何处理错误!操作符也不需要处理错误状态-一旦发生错误,就会跳过当前和后续的操作符。所有的错误处理都交给订阅者来做。
假设你编写的Android app需要从网络请求数据(感觉这是必备的了,还有单机么?)。网络请求需要花费较长的时间,因此你打算在另外一个线程中加载数据。那么问题来了!
编写多线程的Android应用程序是很难的,因为你必须确保代码在正确的线程中运行,否则的话可能会导致app崩溃。最常见的就是在非主线程更新UI。
使用RxJava,你可以使用subscribeOn()指定观察者代码运行的线程,使用observerOn()指定订阅者运行的线程:
myImageView.setImageBitmap(bitmap));" data-snippet-id="ext.1fda0bba9bf" data-snippet-saved="false" data-csrftoken="NpcEFc9m-K19xPir04VAl2okXGHJ6CKxgEcY" data-codota-status="done">myObservableServices.retrieveImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -& myImageView.setImageBitmap(bitmap));
是不是很简单?任何在我的Subscriber前面执行的代码都是在I/O线程中运行。最后,操作view的代码在主线程中运行.
最棒的是我可以把subscribeOn()和observerOn()添加到任何Observable对象上。这两个也是操作符!。我不需要关心Observable对象以及它上面有哪些操作符。仅仅运用这两个操作符就可以实现在不同的线程中调度。
如果使用AsyncTask或者其他类似的,我将不得不仔细设计我的代码,找出需要并发执行的部分。使用RxJava,我可以保持代码不变,仅仅在需要并发的时候调用这两个操作符就可以。
订阅(Subscriptions)
当调用Observable.subscribe(),会返回一个Subscription对象。这个对象代表了被观察者和订阅者之间的联系。
System.out.println(s));" data-snippet-id="ext.69a22e2d6aa4f17ec3f44e9a4b90adae" data-snippet-saved="false" data-csrftoken="csuDz2NT-Yw8U12GgBNMbx5RbKVCmpsA6fHY" data-codota-status="done">ubscription subscription = Observable.just("Hello, World!")
.subscribe(s -& System.out.println(s));
你可以在后面使用这个Subscription对象来操作被观察者和订阅者之间的联系.
subscription.unsubscribe();
System.out.println("Unsubscribed=" + subscription.isUnsubscribed());
RxJava的另外一个好处就是它处理unsubscribing的时候,会停止整个调用链。如果你使用了一串很复杂的操作符,调用unsubscribe将会在他当前执行的地方终止。不需要做任何额外的工作!
记住这个系列仅仅是对RxJava的一个入门介绍。RxJava中有更多的我没介绍的功能等你探索(比如backpressure)。当然我也不是所有的代码都使用响应式的方式–仅仅当代码复杂到我想将它分解成简单的逻辑的时候,我才使用响应式代码。
最初,我的计划是这篇文章作为这个系列的总结,但是我收到许多请求我介绍在Android中使用RxJava,所以你可以继续阅读第四篇了。我希望这个介绍能让你开始使用RxJava。如果你想学到更多,我建议你阅读RxJava的官方wiki。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:522893次
积分:2227
积分:2227
排名:第12066名
原创:15篇
译文:12篇
评论:270条
(1)(1)(4)(1)(1)(3)(2)(1)(3)(5)(2)(3)

我要回帖

更多关于 rxjava github 的文章

 

随机推荐