
| import java.util.Arrays;
import io.reactivex.Flowable; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.ObservableOperator; import io.reactivex.ObservableSource; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Action; import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.schedulers.Schedulers;
/** * Created by qssq on 2018/11/14 [email protected] */
public class Main {
public static void print(String str) { System.out.println("Main.print:" + str); }
public void print_obj(String str) { System.out.println("Main.print_obj:" + str); }
public static void main(String[] args) { System.out.println("hllll"); // testObservableSimple();
/* Observable.fromArray(new byte[]{11111}) .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) .filter((Func1) (file) -> { file.getName().endsWith(".png") }) .map((Func1) (file) -> { getBitmapFromFile(file) }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });*/ // testObservable1();
/* Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onSubscribe(Subscription s) {
}
@Override public void onNext(Course course) { }
@Override public void onError(Throwable t) {
}
@Override public void onComplete() {
} };*/
// testOneObjectMultiChild(); // testLift(); // testSmiple();
try {
Thread.sleep(8000);
} catch (Exception e) {
}
}
private static void testLift() {//据说也属于变黄。 Observable.just(1, 2, 3, 50, 60)//打印 1 2 3 4, .lift(new ObservableOperator<String, Integer>() { //lift 举起 鼓舞 如果没有使用list,那么 这interger 最后接受的也只能是int,而用这个就实现了变换。 泛型 左边 为 要转换的类型,右边则表示之前的类型 这里必须写成integer,因为之前fajust返回的以及是integer了。
@Override public Observer<? super Integer> apply(Observer<? super String> observer) throws Exception { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { observer.onSubscribe(d); }
@Override public void onNext(Integer integer) {
observer.onNext("数字10" + integer + "的16进制为;" + Integer.toHexString(integer)); }
@Override public void onError(Throwable e) {
observer.onError(e); }
@Override public void onComplete() {
observer.onComplete(); } }; } }) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 导致不打印 .observeOn(Schedulers.newThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Consumer<String>() { @Override public void accept(String o) throws Exception { System.out.println("lift结果:" + o); } }); }
private static void testOneObjectMultiChild() { //遍历 一对多 所有学生的所学的课程 ,默认是 英语 数学, 张三除外。一对多关系。 map和 flatMap的区别。 Student[] students = new Student[]{Student.newInstance("张三").setCourses(new Course[]{Course.newInstance("德语"), Course.newInstance("日语")}), Student.newInstance("李四"), Student.newInstance("王老五"), Student.newInstance("某某")};
Observable.fromArray(students) .flatMap(new Function<Student, ObservableSource<Course>>() { @Override public ObservableSource<Course> apply(Student student) throws Exception { return Observable.fromArray(student.getCourses());//flatMap是1对多, 而map只能1对1 返回。 } }) .subscribe(new Consumer<Course>() { @Override public void accept(Course course) throws Exception { System.out.println("课程:" + course.getName()); } }); }
public static class Course { public String getName() { return name; }
public Course(String name) { this.name = name; }
public Course() { }
public void setName(String name) { this.name = name; }
public static Course newInstance(String name) { return new Course(name); }
String name; }
public static class Student { private Course[] courses = new Course[]{Course.newInstance("英语"), Course.newInstance("数学")};
public String getName() { return name; }
public Student() { }
private String name;
public Student(String name) { this.name = name; }
public static Student newInstance(String name) { return new Student(name); }
public static Student newInstance(String name, Course[] courses) { return new Student(name).setCourses(courses); }
public Student setCourses(Course[] courses) { this.courses = courses; return this; }
public Course[] getCourses() {
return courses; }
}
private static void testSmiple() { Flowable.just("Hello world").subscribe(System.out::println);//表示调用System.out.println来接受字符串 () Flowable.just("Hello world").subscribe(new Main()::print_obj);//自己定义的方法 非静态 Flowable.just("Hello world").subscribe(Main::print);//自己定义的方法,为静态 Flowable.just("Hello world").subscribe(new Consumer<String>() {//这种写法等价。 @Override public void accept(String s) throws Exception { System.out.println("" + s); } });
Flowable.just("Hello world").subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println("" + s); } }); /* Flowable.fromCallable(new Callable<Object>() { @Override public Object call() throws Exception { return null; } });*/ //lamba 貌似度一样的执行,模拟背压了??
Flowable.fromCallable(() -> { Thread.sleep(5320); // imitate expensive computation 模拟耗时 昂贵的计算。 return "耗时不啊 ,啊啊啊啊啊啊"; }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.single()) .subscribe(System.out::println, Throwable::printStackTrace);
//范围打印 也可以实现,学生一对多? Flowable.range(1, 10) .parallel()//自动推导 int .runOn(Schedulers.computation()) .map(v -> v * v) .sequential()//顺序执行 .blockingSubscribe(System.out::println);
// Flowable.
//遍历数组。 String[] names = new String[]{"333", "555", "6666"}; Observable.fromArray(names) .subscribe(new Consumer<String>() {//消费者,也就是 观察者 @Override public void accept(String s) throws Exception { System.out.println(" arr:" + s); } }); //有问题,不能指定 订阅线程, 是因为过早退出了 Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 导致不打印 .observeOn(Schedulers.newThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("," + integer); } }); }
private static void testObservableSimple() { Observable.create(new ObservableOnSubscribe<StringBuffer>() {//这里的泛型代表开始发布的泛型。 @Override public void subscribe(ObservableEmitter<StringBuffer> emitter) throws Exception { // System.out.println("1、有人订阅了,我要发布," + "," + Thread.currentThread().getName()); emitter.onNext(new StringBuffer("很好爱爱")); emitter.onComplete(); // 如果调用了,这个,那么下面的方法都不会继续调用了, 这个应该放到最后面,另外不调用的话也不影响逻辑正常执行,只是不会回调onComplete // emitter.onError(new Throwable("测试异常onError")); } }).map(new Function<StringBuffer, StringBuffer>() { @Override public StringBuffer apply(StringBuffer stringBuffer) throws Exception { System.out.println("1.1、AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA,收到" + stringBuffer + "," + Thread.currentThread().getName()); return new StringBuffer("map---------->"); } }) .subscribeOn(Schedulers.newThread())//不能插入这个,否则不会走 ,1.2.3 .observeOn(Schedulers.newThread())//监听着执行在什么线程 .subscribe(new Consumer<StringBuffer>() { @Override public void accept(StringBuffer stringBuffer) throws Exception { System.out.println("2、处理结果:收到字符串:" + stringBuffer + "," + Thread.currentThread().getName()); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { System.err.println("2、处理结果:收到异常:" + throwable + "," + Thread.currentThread().getName()); } }, new Action() { @Override public void run() throws Exception { System.out.println("3、收到onCompletion:" + "," + Thread.currentThread().getName());
} }); }
private static void testObservable1() { //执行完毕自动执行 作用是从1-4 不断地发布,总共发布2次,都会走1 ,2 ,3, 4 Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { System.out.println("subscribe 方法调用,我发布一个时间 onNext 传递 你好 " + e + "," + Thread.currentThread().getName()); e.onNext("你好"); //发送事件 System.out.println("subscribe 方法调用,我继续发布一个时间 onNext 传递 你好 " + e + "," + Thread.currentThread().getName()); e.onNext("你到底好不好"); //发送事件 // e.onComplete();//某些文章说要调用,感觉没作用 // e.onComplete(); // e.onComplete(); System.out.println("subscribe 任务下发完毕"); } }).map(new Function<String, byte[]>() { // 左边为接受的参数,右边为返回值 @Override public byte[] apply(String str) throws Exception { System.out.println("1、apply 收到字符串" + str + ",我返回结果数组: " + str + "," + Thread.currentThread().getName()); return new byte[]{}; } }).map(new Function<byte[], String>() { // 输入字节码数组,保存文件,输出文件路径
@Override public String apply(byte[] byteArray) throws Exception {
System.out.println("2、apply arr 收到刚刚传递的数组 " + Arrays.toString(byteArray) + "我再次返回字符串: 哈哈哈 " + Thread.currentThread().getName()); return "哈哈哈哈"; } }).map(new Function<String, Integer>() { //输入class路径,jar uvf命令替换jar @Override public Integer apply(String s) throws Exception { System.out.println("3、apply 收到字符串" + s + "iinteger 我返回结果标记为1 " + Thread.currentThread().getName()); /* if (s != "a") { throw new RuntimeException("走 错误标记吧少年.."); }*/ return 1; } }).map(new Function<Integer, Float>() {
@Override public Float apply(Integer integer) { System.out.println("3.1、apply 收到int " + integer + " iinteger 我返回结果标记为 5 " + Thread.currentThread().getName()); return 5.0f;//如果这里第二个泛型 为float,那么 订阅 也应该是泛型。 } }).subscribe(new Consumer<Float>() {//这里是订阅,订阅有一个成功和失败的回调。链式编程看习惯了就好了 e subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) @Override public void accept(Float status) throws Exception {//表示一个onNext onMap 等执行完毕但是 System.out.println("4、result:accept 收到int值:" + status + "," + Thread.currentThread().getName()); } }, new Consumer<Throwable>() {//表示的是上下发布了错误或者 抛出了错误。 @Override public void accept(Throwable throwable) throws Exception {
System.err.println("4、accept error:" + throwable.toString() + " ," + Thread.currentThread().getName()); } });
}
}
|