简书链接:RxJavaAndroid初探和常用实战代码
文章字数:308,阅读全文大约需要1分钟

下面的代码是基于RX1.0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//创建被观察者 也就是事件发布者 Subscriber 因此 SubscriberOn实际上是控制call回调应该所处线程
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
//我的回调在哪个线程由 .subscribeOn(AndroidSchedulers.mainThread() 发布者在哪个线程方法控制的
Log.w(TAG, "onCall" + Thread.currentThread().getName());
for (int i = 0; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
});//不订阅 call就不会触发
//观察回调在io线程 订阅回调在主线程
observable.observeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.w(TAG, "Observable completed");
}

@Override
public void onError(Throwable e) {
Log.w(TAG, "Oh,no! Something wrong happened!");
}

@Override
public void onNext(Integer item) {
//这里的观察方法回调在哪个线程由observeOn 控制的 另外 不订阅 就不会产生回调
Log.w(TAG, "Item is " + item + ",Thread:" + Thread.currentThread().getName());
}
});

如果不写 observeOn 和 subscribeOn则默认在调用者所在线程,

s W/Example1Activity: onCallRxCachedThreadScheduler-1
10-24 16:42:04.960 8526-8613/kurtis.rx.androidexamples W/Example1Activity: Item is 0,Thread:RxCachedThreadScheduler-2

例子2 简单的适配器 与耗时请求更新模板代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void createObservable() {
Observable<List<String>> tvShowObservable = Observable.fromCallable(new Callable<List<String>>() {
@Override
public List<String> call() {
Log.w(TAG, "我当前所在线程非ui线程" + Thread.currentThread().getName());
return mRestClient.getFavoriteTvShows();//这里调用了sleep 因为使用了 subscribeOn
}
});

mTvShowSubscription = tvShowObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
new Observer<List<String>>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(List<String> tvShows) {//当onCall完成之后将调用这个。
displayTvShows(tvShows);
}
});

}

@Override
protected void onDestroy() {
super.onDestroy();

if (mTvShowSubscription != null && !mTvShowSubscription.isUnsubscribed()) {
mTvShowSubscription.unsubscribe();
}
}

错误与成功回调的模板代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void createSingle() {
Single<List<String>> tvShowSingle = Single.fromCallable(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
/**
* Uncomment me (and comment out the line below) to see what happens when an error occurs.
*
* return RestClient.getFavoriteTvShowsWithException();
*/
return mRestClient.getFavoriteTvShows();// 使用 getFavoriteTvShowsWithException方法 由于抛出了异常 所以会毁掉onError
}
});

mTvShowSubscription = tvShowSingle
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleSubscriber<List<String>>() {
@Override
public void onSuccess(List<String> tvShows) {
displayTvShows(tvShows);
}

@Override
public void onError(Throwable error) {
displayErrorMessage();
}
});
}

需要手动调用才会触发回调的 家伙

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void createCounterEmitter() {
mCounterEmitter = PublishSubject.create();
mCounterEmitter.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.w(TAG,"onCompleted:");
}

@Override
public void onError(Throwable e) {
Log.w(TAG,"onError:"+e);
}

@Override
public void onNext(Integer integer) {
Log.w(TAG,"onNext:"+integer);
mCounterDisplay.setText(String.valueOf(integer));
}
});
}


private void onIncrementButtonClick() {
mCounter++;
Log.w(TAG,"request call onNext:"+mCounter);
mCounterEmitter.onNext(mCounter);

}

参考链接

http://www.jianshu.com/p/669eda5dc5a4
http://blog.csdn.net/chen_zhang_yu/article/details/52900725