简书链接:RxJavaAndroid初探和常用实战代码 文章字数:308,阅读全文大约需要1分钟
下面的代码是基于RX1.0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 //创建被观察者 也就是事件发布者 Subscriber 因此 SubscriberOn实际上是控制call回调应该所处线程 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { //我的回调在哪个线程由 .subscribeOn(AndroidSchedulers.mainThread() 发布者在哪个线程方法控制的 Log.w(TAG, "onCall" + Thread.currentThread().getName()); for (int i = 0; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } });//不订阅 call就不会触发 //观察回调在io线程 订阅回调在主线程 observable.observeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() { @Override public void onCompleted() { Log.w(TAG, "Observable completed"); } @Override public void onError(Throwable e) { Log.w(TAG, "Oh,no! Something wrong happened!"); } @Override public void onNext(Integer item) { //这里的观察方法回调在哪个线程由observeOn 控制的 另外 不订阅 就不会产生回调 Log.w(TAG, "Item is " + item + ",Thread:" + Thread.currentThread().getName()); } }); 如果不写 observeOn 和 subscribeOn则默认在调用者所在线程, s W/Example1Activity: onCallRxCachedThreadScheduler-1 10-24 16:42:04.960 8526-8613/kurtis.rx.androidexamples W/Example1Activity: Item is 0,Thread:RxCachedThreadScheduler-2
例子2 简单的适配器 与耗时请求更新模板代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private void createObservable() { Observable<List<String>> tvShowObservable = Observable.fromCallable(new Callable<List<String>>() { @Override public List<String> call() { Log.w(TAG, "我当前所在线程非ui线程" + Thread.currentThread().getName()); return mRestClient.getFavoriteTvShows();//这里调用了sleep 因为使用了 subscribeOn } }); mTvShowSubscription = tvShowObservable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( new Observer<List<String>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(List<String> tvShows) {//当onCall完成之后将调用这个。 displayTvShows(tvShows); } }); } @Override protected void onDestroy() { super.onDestroy(); if (mTvShowSubscription != null && !mTvShowSubscription.isUnsubscribed()) { mTvShowSubscription.unsubscribe(); } }
错误与成功回调的模板代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void createSingle() { Single<List<String>> tvShowSingle = Single.fromCallable(new Callable<List<String>>() { @Override public List<String> call() throws Exception { /** * Uncomment me (and comment out the line below) to see what happens when an error occurs. * * return RestClient.getFavoriteTvShowsWithException(); */ return mRestClient.getFavoriteTvShows();// 使用 getFavoriteTvShowsWithException方法 由于抛出了异常 所以会毁掉onError } }); mTvShowSubscription = tvShowSingle .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new SingleSubscriber<List<String>>() { @Override public void onSuccess(List<String> tvShows) { displayTvShows(tvShows); } @Override public void onError(Throwable error) { displayErrorMessage(); } }); }
需要手动调用才会触发回调的 家伙
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void createCounterEmitter() { mCounterEmitter = PublishSubject.create(); mCounterEmitter.subscribe(new Observer<Integer>() { @Override public void onCompleted() { Log.w(TAG,"onCompleted:"); } @Override public void onError(Throwable e) { Log.w(TAG,"onError:"+e); } @Override public void onNext(Integer integer) { Log.w(TAG,"onNext:"+integer); mCounterDisplay.setText(String.valueOf(integer)); } }); } private void onIncrementButtonClick() { mCounter++; Log.w(TAG,"request call onNext:"+mCounter); mCounterEmitter.onNext(mCounter); }
参考链接
http://www.jianshu.com/p/669eda5dc5a4 http://blog.csdn.net/chen_zhang_yu/article/details/52900725