rxjava merge concatzip和merge的区别

RxJava简单使用 - 简书
RxJava简单使用
关于Rxjava的文章国内已经不少了 比如扔物线的
还有一波操作符详解
下午的时候捣腾了一下 这里写下自己的见解,rxjava 官网的介绍 自己简单的总结一下,就是处理异步的,比线程 handler asynctask 使用简单 使代码逻辑变得简单,重要的是随着业务越来越复杂的时候他还是能保持简洁。Observable:被观察者
Observer:
一般使用Subscriber这个抽象实现类Call :事件上例子:
//创建 被观察者
Observable.create(newObservable.OnSubscribe() {
public voidcall(Subscriber subscriber) {
subscriber.onNext("hello word!"); // 发送事件
subscriber.onCompleted();
}).subscribe(new Subscriber() {
//订阅观察者
public void
onCompleted() {
Log.e(TAG,"onCompleted");
public void
onError(Throwable e) {
Log.e(TAG,e.getMessage());
public void
onNext(String s) {
Log.e(TAG,s);
这是一个基本的例子,可以看到在rxjava中是被观察者订阅观察者
而不是观察者订阅被观察者 从而实现链式调用。 是的 这看起来并没有什么卵用...说说在项目实际的使用吧,上家公司有个这样的需求,一个listview 的数据 来自于2个接口,按照套路来讲的话 写2个请求 第一个接口请求成功的回调方法 再去请求第二个接口,嗯
大概就是这样
代码可想而知如何写到 各种回调嵌套,下面用rxjava 来 演示下这样的需求
final List& listList =newArrayList&&();
Observable& observable1 = Observable.create(newObservable.OnSubscribe&() {
public void
call(Subscriber& subscriber) {
List list =newArrayList&&();
for(inti =0;i &5;i++) {
list.add("this is observable1 merge :"+ i);
subscriber.onNext(list);
subscriber.onNext(list);
subscriber.onCompleted();
Observable& observable2 = Observable.create(new
Observable.OnSubscribe&() {
public void
call(Subscriber& subscriber) {
List list =new
ArrayList&&();
for(inti =0;i &5;i++) {
list.add("this is observable2 merge :"+ i);
subscriber.onNext(list);
subscriber.onNext(list);
subscriber.onCompleted();
Observable.merge(observable1,observable2).subscribe(newSubscriber&() {
public void
onCompleted() {
onCompleted"+listList.toString());
public void
onError(Throwable e) {
l(e.getMessage());
public void
onNext(List strings) {
listList.add(strings);
好了 代码就差不多是这样,可以看到 我写了2个被观察者
call代码在项目中 换成请求接口代码
建议使用retrofit,retrofit已支持rxjava ,在这里我使用merge这个方法 将2个 请求的数据 合并成一个 或者zip 是的
用rxjava 就是这么 简单。 好了 再来看个 我想大家都有这样的业务逻辑 请求接口需要依赖另外一个接口的数据
在rxjava中是这么写的
Observable.create(new
Observable.OnSubscribe() {
public void
call(Subscriber subscriber) {
subscriber.onNext("load data");
}).flatMap(newFunc1&() {
Observablecall(String s) {
Observable.create(newObservable.OnSubscribe() {
public voidcall(Subscriber subscriber) {
subscriber.onNext("load
}).subscribe(new
Subscriber() {
public void
onCompleted() {
public void
onError(Throwable e) {
public void
onNext(String s) {
首先创建一个被观察者
在call方法里请求网络
数据回来之后
发送事件onNext() 在这里我使用了一个flatMap 的东西
这个东西是什么呢?字面意思是平铺是吧? 是吧 他就是做数据处理的
返回一个 Observable 继续做下一步操作
在事件触发之前 我把数据 换成了字符串类型,所以在观察者收到 的数据是字符串,rxjava中 可以随意的转换数据,更多操作符文头有链接 这里不多提,还有一个重要的概念就是 线程切换subscribeOn(Schedulers.io())
//被观察者处于什么线程observeOn(AndroidSchedulers.mainThread()) // 观察者收到事件处于什么的线程嗯先写到这... 未完。
一枚长沙的Android开发小屌丝Rxjava(11)
一、CombineLatest
CombineLatest操作符可以将2~9个Observable发射的数据组装起来然后再发射出来。不过还有两个前提:
所有的Observable都发射过数据。满足条件1的时候任何一个Observable发射一个数据,就将所有Observable最新发射的数据按照提供的函数组装起来发射出去。
Rxjava实现CombineLast操作符可以让我们直接将组装的Observable作为参数传值,也可以将所有的Observable装在一个List里面穿进去。
下面我们创建几个Observable对象,分别直接传值和使用List传值将其组装起来
private Observable&Integer& createObserver(int index) {
return Observable.create(new Observable.OnSubscribe&Integer&() {
public void call(Subscriber&? super Integer& subscriber) {
for (int i = 1; i & 6; i++) {
subscriber.onNext(i * index);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}).subscribeOn(Schedulers.newThread());
private Observable&Integer& combineLatestObserver() {
return bineLatest(createObserver(1), createObserver(2), (num1, num2) -& {
log(&left:& + num1 + & right:& + num2);
return num1 + num2;
List&Observable&Integer&& list = new ArrayList&&();
private Observable&Integer& combineListObserver() {
for (int i = 1; i & 5; i++) {
list.add(createObserver(i));
return bineLatest(list, args -& {
int temp = 0;
for (Object i : args) {
temp += (Integer)
对其进行订阅
mLButton.setText(&combineList&);
mLButton.setOnClickListener(e -& combineListObserver().subscribe(i -& log(&combineList:& + i)));
mRButton.setText(&CombineLatest&);
mRButton.setOnClickListener(e -& combineLatestObserver().subscribe(i -& log(&CombineLatest:& + i)));
运行结果如下
Join操作符根据时间窗口来组合两个Observable发射的数据,每个Observable都有一个自己的时间窗口,要组合的时候,在这个时间窗口内的数据都有有效的,可以拿来组合。
Rxjava还实现了groupJoin,基本和join相同,只是最后组合函数的参数不同。
使用join操作符需要4个参数,分别是:
源Observable所要组合的目标Observable一个函数,就收从源Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了源Observable发射出来数据的有效期一个函数,就收从目标Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了目标Observable发射出来数据的有效期一个函数,接收从源Observable和目标Observable发射来的数据,并返回最终组合完的数据。
下面我们使用join和groupJoin操作符分别来组合两个Observable对象
private Observable&String& createObserver() {
return Observable.create(new Observable.OnSubscribe&String&() {
public void call(Subscriber&? super String& subscriber) {
for (int i = 1; i & 5; i++) {
subscriber.onNext(&Right-& + i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}).subscribeOn(Schedulers.newThread());
private Observable&String& joinObserver() {
return Observable.just(&Left-&).join(createObserver(),
integer -& Observable.timer(3000, TimeUnit.MILLISECONDS),
integer -& Observable.timer(2000, TimeUnit.MILLISECONDS),
(i, j) -& i + j
private Observable&Observable&String&& groupJoinObserver() {
return Observable.just(&Left-&)
.groupJoin(createObserver(),
s -& Observable.timer(3000, TimeUnit.MILLISECONDS),
s -& Observable.timer(2000, TimeUnit.MILLISECONDS),
(s, stringObservable) -& stringObservable.map(str -& s + str));
分别进行订阅
mLButton.setText(&join&);
mLButton.setOnClickListener(e -& joinObserver().subscribe(i -& log(&join:& + i + &\n&)));
mRButton.setText(&groupJoin&);
mRButton.setOnClickListener(e -& groupJoinObserver().subscribe(i -& i.subscribe(j -& log(&groupJoin:& + j + &\n&))));
运行结果如下,可以看到虽然目标Observable发射了4个数据,但是源Observable只发射了一个有效期为3秒的数据,所以最终的组合结果也只有3个数据。
三、Merege
Merge操作符将多个Observable发射的数据整合起来发射,就如同是一个Observable发射的数据一样。但是其发射的数据有可能是交错的,如果想要没有交错,可以使用concat操作符。
当某一个Observable发出onError的时候,merge的过程会被停止并将错误分发给Subscriber,如果不想让错误终止merge的过程,可以使用MeregeDelayError操作符,会将错误在merge结束后再分发。
下面我们分别使用merge和mergeDelayError操作符来进行merge操作。
private Observable&Integer& mergeObserver() {
return Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5, 6));
private Observable&Integer& mergeDelayErrorObserver() {
return Observable.mergeDelayError(Observable.create(new Observable.OnSubscribe&Integer&() {
public void call(Subscriber&? super Integer& subscriber) {
for (int i = 0; i & 5; i++) {
if (i == 3) {
subscriber.onError(new Throwable(&error&));
subscriber.onNext(i);
}), Observable.create(new Observable.OnSubscribe&Integer&() {
public void call(Subscriber&? super Integer& subscriber) {
for (int i = 0; i & 5; i++) {
subscriber.onNext(5 + i);
subscriber.onCompleted();
分别对其订阅
mLButton.setText(&Merge&);
mLButton.setOnClickListener(e -& mergeObserver().subscribe(i -& log(&Merge:& + i)));
mRButton.setText(&mergeDelayError&);
mRButton.setOnClickListener(e -& mergeDelayErrorObserver().subscribe(new Subscriber&Integer&() {
public void onCompleted() {
log(&onCompleted&);
public void onError(Throwable e) {
log(&mergeDelayError:& + e);
public void onNext(Integer integer) {
log(&mergeDelayError:& + integer);
运行结果如下:
四、StartWith、Switch
StartWith操作符会在源Observable发射的数据前面插上一些数据。不仅仅只可以插入一些数据,还可以将Iterable和Observable插入进入。如果插入的是Observable,则这个Observable发射的数据会插入到
源Observable发射数据的前面。
switch操作符在Rxjava上的实现为switchOnNext,用来将一个发射多个小Observable的源Observable转化为一个Observable,然后发射这多个小Observable所发射的数据。
需要注意的就是,如果一个小的Observable正在发射数据的时候,源Observable又发射出一个新的小Observable,则前一个Observable发射的数据会被抛弃,直接发射新
的小Observable所发射的数据。可以看示意图中的黄色圆圈就被丢弃了。
下面使用startWith和switchOnNext操作符来组合两个Observable
private Observable&Integer& startWithObserver() {
return Observable.just(1, 2, 3).startWith(-1, 0);
private Observable&String& switchObserver() {
return Observable.switchOnNext(Observable.create(
new Observable.OnSubscribe&Observable&String&&() {
public void call(Subscriber&? super Observable&String&& subscriber) {
for (int i = 1; i & 3; i++) {
subscriber.onNext(createObserver(i));
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
private Observable&String& createObserver(int index) {
return Observable.create(new Observable.OnSubscribe&String&() {
public void call(Subscriber&? super String& subscriber) {
for (int i = 1; i & 5; i++) {
subscriber.onNext(index + &-& + i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}).subscribeOn(Schedulers.newThread());
分别进行订阅
mLButton.setText(&StartWith&);
mLButton.setOnClickListener(e -& startWithObserver().subscribe(i -& log(&StartWith:& + i)));
mRButton.setText(&switch&);
mRButton.setOnClickListener(e -& switchObserver().subscribe(i -& log(&switch:& + i)));
运行结果如下,可以看到startwith将-1和0插入到前面。使用siwtch的时候第一个小Observable只发射出了两个数据,第二个小Observable就被源Observable发射出来了,所以其接下来的两个数据被丢弃。
Zip操作符将多个Observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。
Rxjava实现了zip和zipWith两个操作符。
下面我们使用zip和zipWith操作符来组合数据
private Observable&String& zipWithObserver() {
return createObserver(2).zipWith(createObserver(3), (s, s2) -& s + &-& + s2);
private Observable&String& zipWithIterableObserver() {
return Observable
.zip(createObserver(2), createObserver(3), createObserver(4), (s, s2, s3) -& s + &-& + s2 + &-& + s3);
private Observable&String& createObserver(int index) {
return Observable.create(new Observable.OnSubscribe&String&() {
public void call(Subscriber&? super String& subscriber) {
for (int i = 1; i &= i++) {
log(&emitted:& + index + &-& + i);
subscriber.onNext(index + &-& + i);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}).subscribeOn(Schedulers.newThread());
分别进行订阅
mLButton.setText(&zipWith&);
mLButton.setOnClickListener(e -& zipWithObserver().subscribe(i -& log(&zipWith:& + i + &\n&)));
mRButton.setText(&zip&);
mRButton.setOnClickListener(e -& zipWithIterableObserver().subscribe(i -& log(&zip:& + i + &\n&)));
运行结果如下,可以看到,最终都发射出了两个数据,因为createObserver(2)所创建的Observable只会发射两个数据,所以其他Observable多余发射的数据都被丢弃了。
Combning的操作符就到这了,本文的demo程序见
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:20764次
排名:千里之外
原创:12篇
转载:17篇
(1)(1)(1)(1)(1)(2)(9)(2)(4)(3)(4)2080人阅读
rxjava(10)
& & & & 当某界面内容来源不同,但需同时显示出来时
& & & & eg1: & &一部分数据来自本地,一部分来自网络
Observable.zip(
queryContactsFromLocation(),
queryContactsForNet(),
new Func2&List&Contacter&, List&Contacter&, List&Contacter&&() {
public List&Contacter& call(List&Contacter& contacters, List&Contacter& contacters2) {
contacters.addAll(contacters2);
return contacters;}
}).compose(this.&List&Contacter&&bindToLifecycle())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1&List&Contacter&&() {
public void call(List&Contacter& contacters) {
initPage(contacters);
/** * 模拟手机本地联系人查询 */private Observable&List&Contacter&& queryContactsFromLocation() {
return Observable.create(new Observable.OnSubscribe&List&Contacter&&() {
public void call(Subscriber&? super List&Contacter&& subscriber) {
ArrayList&Contacter& contacters = new ArrayList&&();
contacters.add(new Contacter(&location:张三&));
contacters.add(new Contacter(&location:李四&));
contacters.add(new Contacter(&location:王五&));
subscriber.onNext(contacters);
subscriber.onCompleted();
/** * 模拟网络联系人列表 */private Observable&List&Contacter&& queryContactsForNet() {
return Observable.create(new Observable.OnSubscribe&List&Contacter&&() {
public void call(Subscriber&? super List&Contacter&& subscriber) {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
ArrayList&Contacter& contacters = new ArrayList&&();
contacters.add(new Contacter(&net:Zeus&));
contacters.add(new Contacter(&net:Athena&));
contacters.add(new Contacter(&net:Prometheus&));
subscriber.onNext(contacters);
subscriber.onCompleted();
效果如下:
& & & & eg2: & &当然对于来源不同、类型不同的数据我们也可以使用Zip的,只需将两种Fun2的call()方法中先暂存起来,最后在订阅回调中分离使用即可,常见的应用场所比如下面这个界面。
& & 顶部是Html组成的Web页面,底部是原生的评论页面,数据分别来自两个不同的接口,为了保证界面的完整显示,在这里我们可以使用Zip。
Observable.zip(
mArticleProtocol.getNewsDetail(params_detail),
mCommentProtocol.getCommentList(params_comm),
new Func2&ArticleDetailBean, CommentListBean, List&() {
public List call(ArticleDetailBean articleDetailBean, CommentListBean commentBean) {
List list = new ArrayList();
list.add(articleDetailBean);
list.add(commentBean);
return list;
.observeOn(AndroidSchedulers.mainThread())
.compose(this.&List&bindUntilEvent(ActivityEvent.DESTROY))
.subscribe(
new Action1&List&() {
public void call(List list) {
initPage(list);
new Action1&Throwable&() {
public void call(Throwable throwable) {
mPageLayout.setPageState(PageStateLayout.STATE_ERROR);
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:43810次
排名:千里之外
原创:20篇
评论:19条
(9)(1)(10)RxJava处理嵌套请求 - 简书
RxJava处理嵌套请求
互联网应用开发中由于请求网络数据频繁,往往后面一个请求的参数是前面一个请求的结果,于是经常需要在前面一个请求的响应中去发送第二个请求,从而造成“请求嵌套”的问题。如果层次比较多,代码可读性和效率都是问题。本文首先从感性上介绍下RxJava,然后讲解如何通过RxJava中的flatMap操作符来处理“嵌套请求”的问题
RxJava简单介绍
嵌套请求举例
运用flatMap
map和flatMap
RxJava与Retrofit配合解决嵌套请求
博客园对应
RxJava简单介绍
这里并不打算详细介绍RxJava的用法和原理,这方面的文章已经很多了。这里仅仅阐述本人对于RxJava的感性上的理解。先上一个图:
Rxjava overview
我们都知道RxJava是基于观察者模式的,但是和传统的观察者模式又有很大的区别。传统的观察者模式更注重订阅和发布这个动作,而RxJava的重点在于数据的“流动”。如果我们把RxJava中的Observable看做一个盒子,那么Observable就是把数据或者事件给装进了这个易于拿取的盒子里面,让订阅者(或者下一级别的盒子)可以拿到而处理。这样一来,原来静态的数据/事件就被流动起来了。
我们知道人类会在河流中建设大坝,其实我们可以把RxJava中的filter/map/merge等Oberservable操作符看做成数据流中的大坝,经过这个操作符的操作后,大坝数据流被过滤被合并被处理,从而灵活的对数据的流动进行管制,让最终的使用者灵活的拿到。
以上就是我对RxJava的理解,深入的用法和原理大家请自行看网上的文章。
嵌套请求举例
这里开始进入正题,开始举一个嵌套请求的例子。比如我们下列接口:
api/students/getAll (传入班级的id获得班级的学生数组,返回值是list&Student&)
api/courses/getAll (传入Student的id获得这个学生所上的课程,返回值是List&Course&)
我们最终的目的是要打印班上所有同学分别所上的课程(大学,同班级每个学生选上的课不一样),按照传统Volley的做法,代码大概是这样子(Volley已经被封装过)
private void getAllStudents(String id) {
BaseRequest baseRequest = new BaseRequest();
baseRequest.setClassId(id);
String url = AppConfig.SERVER_URL + "api/students/getAll";
final GsonRequest request = new GsonRequest&&(url, baseRequest, Response.class, new Response.Listener&Response&() {
public void onResponse(Response response) {
if (response.getStatus() & 0) {
List&Student& studentList = response.getData();
for (Student student : studentList) {
}, new Response.ErrorListener() {
public void onErrorResponse(VolleyError error) {
MyVolley.startRequest(request);
private void getAllCourses(String id) {
BaseRequest baseRequest = new BaseRequest();
baseRequest.setStudentId(id);
String url = AppConfig.SERVER_URL + "api/courses/getAll";
final GsonRequest request = new GsonRequest&&(url, baseRequest, Response.class, new Response.Listener&Response&() {
public void onResponse(Response response) {
if (response.getStatus() & 0) {
List&Course& courseList = response.getData();
for (Course course : courseList) {
}, new Response.ErrorListener() {
public void onErrorResponse(VolleyError error) {
MyVolley.startRequest(request);
显然第一个请求的响应中获得的数据是一个List,正对每一个List中的item要再次发送第二个请求,在第二个请求中获得最终的结果。这就是一个嵌套请求。这会有两个问题:
目前来看并不复杂,如果嵌套层次多了,会造成代码越来越混乱
写出来的重复代码太多
运用flatMap
现在我们可以放出RxJava大法了,flatMap是一个Observable的操作符,接受一个Func1闭包,这个闭包的第一个函数是待操作的上一个数据流中的数据类型,第二个是这个flatMap操作完成后返回的数据类型的被封装的Observable。说白了就是讲一个多级数列“拍扁”成了一个一级数列。按照上面的列子,flatMap将接受student后然后获取course的这个二维过程给线性化了,变成了一个可观测的连续的数据流。于是代码是:
ConnectionBase.getApiService2()
.getStudents(101)
.flatMap(new Func1&Student, Observable&Course&&() {
public Observable&Course& call(Student student) {
return ConnectionBase.getApiService2().getAllCourse(student.getId());
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1&Course&() {
public void call(Course course) {
//use the Course
是不是代码简洁的让你看不懂了?别急,这里面的getStutent和getAllCourse是ConnectionBase.getApiService2()的两个方法,他集成了Retrofit2用来将请求的网络数据转化成Observable,最后一节将介绍,这里先不关注。我们所要关注的是以上代码的流程。首先getStudent传入了班级id(101)返回了Observable&Student&,然后链式调用flatMap操作符对这个Observable&Student&进行变换处理,针对每一个发射出来的Student进行再次请求 ConnectionBase.getApiService2().getAllCourse从而返回Observable&Course&,最后对这个 ConnectionBase.getApiService2().getAllCourse进行订阅,即subscribe方法,再Action1这个闭包的回调中使用course。
flatMap的作用就是对传入的对象进行处理,返回下一级所要的对象的Observable包装。
FuncX和ActionX的区别。FuncX包装的是有返回值的方法,用于Observable的变换、组合等等;ActionX用于包装无返回值的方法,用于subscribe方法的闭包参数。Func1有两个入参,前者是原始的参数类型,后者是返回值类型;而Action1只有一个入参,就是传入的被消费的数据类型。
subscribeOn(Schedulers.io()).observeOn(AndroidScheduler.mainThread())是最常用的方式,后台读取数据,主线程更新界面。subScribeOn指在哪个线程发射数据,observeOn是指在哪里消费数据。由于最终的Course要刷新界面,必须要在主线程上调用更新view的方法,所以observeOn(AndroidScheduler.mainThread())是至关重要的。
map和flatMap
运用flatMap的地方也是可以用map的,但是是有区别的。先看下map操作符的用法:
ConnectionBase.getApiService2()
.getStudents(101)
.map(new Func1&Student&, Course&() {
public Course call(Student student) {
return conventStudentToCourse();// has problem
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1&Course&() {
public void call(Course course) {
//use the Course
可以看到map也是接受一个Func1闭包,但是这个闭包的第二个参数即返回值参数类型并不是一个被包装的Observable,而是实际的原始类型,由于call的返回值是Course,所以conventStudentToCourse这里就不能用Retrofit2的方式返回一个Observable了。
所以这里是有一个问题的,对于这种嵌套的网络请求,由于接到上端数据流到处理后将结果数据放入下端数据流是一个异步的过程,而conventStudentToCourse这种直接将Student转化为Course是没法做到异步的,因为没有回调方法。那么这种情况,最好还是用flatMap并通过retrofit的方式来获取Observable。要知道,Rxjava的一个精髓就是“异步”。
那么到底map和flatMap有什么区别,或者说什么时候使用map什么时候使用flatMap呢?flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。
首先,如果你需要将一个类型的对象经过处理(非异步)直接转化成下一个类型,推荐用map,否则的话就用flatMap。其次,如果你需要在处理中加入容错的机制(特别是你自己封装基于RxJava的网络请求框架),推荐用flatMap。比如将一个File[] jsonFile中每个File转换成String,用map的话代码为:
Observable.from(jsonFile).map(new Func1&File, String&() {
@Override public String call(File file) {
return new Gson().toJson(new FileReader(file), Object.class);
} catch (FileNotFoundException e) {
// So Exception. What to do ?
// Not good :(
可以看到这里在出现错误的时候直接抛出异常,这样的处理其实并不好,特别如果你自己封装框架,这个异常不大好去抓取。
如果用flatMap,由于flatMap的闭包返回值是一个Observable,所以我们可以在这个闭包的call中通过Observable.create的方式来创建Observable,而要知道create方法是可以控制数据流下端的Subscriber的,即可以调用onNext/onCompete/onError方法。如果出现异常,我们直接调用subscribe.onError即可,封装框架也很好感知。代码大致如下:
Observable.from(jsonFile).flatMap(new Func1&File, Observable&String&&() {
@Override public Observable&String& call(final File file) {
return Observable.create(new Observable.OnSubscribe&String&() {
@Override public void call(Subscriber&? super String& subscriber) {
String json = new Gson().toJson(new FileReader(file), Object.class);
subscriber.onNext(json);
subscriber.onCompleted();
} catch (FileNotFoundException e) {
subscriber.onError(e);
map操作符通常也用于处理结构化的服务端响应数据,比如下列返回的JSON数据就是一段典型的响应数据
"message":"操作成功",
"status":1,
"noVisitCount":0,
"planCount":0,
"visitedCount":0
在map的闭包中,我们可以先判断status进行统一的出错或者正确(返回data的内容)处理,一般来说,data的内容都是处理成一个泛型
RxJava与Retrofit配合解决嵌套请求
这里该讨论Retrofit了。可以说Retrofit就是为了RxJava而生的。如果你的项目之前在网络请求框架用的是Volley或者自己封装Http请求和TCP/IP,而现在你看到了Retrofit这个框架后想使用起来,我可以负责任的跟你说,如果你的项目中没有使用RxJava的话,使用Retrofit和Volley是没有区别的!要用Retrofit的话,就最好或者说强烈建议也使用RxJava进行编程。
Retrofit有callback和Observable两种模式,前者就像传统的Volley一样,有successs和fail的回调方法,我们在success回调方法中处理结果;而Observable模式是将请求回来的数据由Retrofit框架自动的帮你加了一个盒子,即自动帮你装配成了含有这个数据的Observable,供你使用RxJava的操作符随意灵活的进行变换。
callback模式的Retrofit是这样建立的:
retrofit = new Retrofit.Builder()
.baseUrl(SERVER_URL)
.addConverterFactory(GsonConverterFactory.create(gson))
Observable模式是这样子建立的:
retrofit2 = new Retrofit.Builder()
.baseUrl(SERVER_URL)
.addConverterFactory(GsonConverterFactory.create(gson))
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
即addCallAdapterFactory这个方法在起作用,在RxJavaCallAdapterFactory的源码注释中可以看到这么一句话:
Response wrapped body (e.g., {@code Observable&Response&User&&}) calls {@code onNext} with a {@link Response} object for all HTTP responses and calls {@code onError} with {@link IOException} for network errors
即它将返回值body为包裹上了一层“Observable”
千米网移动开发工程师
/soaringEveryday/

我要回帖

更多关于 rxjava merge concat 的文章

 

随机推荐