1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
| import java.util.Arrays;
import io.reactivex.Flowable; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.ObservableOperator; import io.reactivex.ObservableSource; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Action; import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.schedulers.Schedulers;
/** * Created by qssq on 2018/11/14 [email protected] */
public class Main {
public static void print(String str) { System.out.println("Main.print:" + str); }
public void print_obj(String str) { System.out.println("Main.print_obj:" + str); }
public static void main(String[] args) { System.out.println("hllll"); // testObservableSimple();
/* Observable.fromArray(new byte[]{11111}) .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) .filter((Func1) (file) -> { file.getName().endsWith(".png") }) .map((Func1) (file) -> { getBitmapFromFile(file) }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });*/ // testObservable1();
/* Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onSubscribe(Subscription s) {
}
@Override public void onNext(Course course) { }
@Override public void onError(Throwable t) {
}
@Override public void onComplete() {
} };*/
// testOneObjectMultiChild(); // testLift(); // testSmiple();
try {
Thread.sleep(8000);
} catch (Exception e) {
}
}
private static void testLift() {//据说也属于变黄。 Observable.just(1, 2, 3, 50, 60)//打印 1 2 3 4, .lift(new ObservableOperator<String, Integer>() { //lift 举起 鼓舞 如果没有使用list,那么 这interger 最后接受的也只能是int,而用这个就实现了变换。 泛型 左边 为 要转换的类型,右边则表示之前的类型 这里必须写成integer,因为之前fajust返回的以及是integer了。
@Override public Observer<? super Integer> apply(Observer<? super String> observer) throws Exception { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { observer.onSubscribe(d); }
@Override public void onNext(Integer integer) {
observer.onNext("数字10" + integer + "的16进制为;" + Integer.toHexString(integer)); }
@Override public void onError(Throwable e) {
observer.onError(e); }
@Override public void onComplete() {
observer.onComplete(); } }; } }) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 导致不打印 .observeOn(Schedulers.newThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Consumer<String>() { @Override public void accept(String o) throws Exception { System.out.println("lift结果:" + o); } }); }
private static void testOneObjectMultiChild() { //遍历 一对多 所有学生的所学的课程 ,默认是 英语 数学, 张三除外。一对多关系。 map和 flatMap的区别。 Student[] students = new Student[]{Student.newInstance("张三").setCourses(new Course[]{Course.newInstance("德语"), Course.newInstance("日语")}), Student.newInstance("李四"), Student.newInstance("王老五"), Student.newInstance("某某")};
Observable.fromArray(students) .flatMap(new Function<Student, ObservableSource<Course>>() { @Override public ObservableSource<Course> apply(Student student) throws Exception { return Observable.fromArray(student.getCourses());//flatMap是1对多, 而map只能1对1 返回。 } }) .subscribe(new Consumer<Course>() { @Override public void accept(Course course) throws Exception { System.out.println("课程:" + course.getName()); } }); }
public static class Course { public String getName() { return name; }
public Course(String name) { this.name = name; }
public Course() { }
public void setName(String name) { this.name = name; }
public static Course newInstance(String name) { return new Course(name); }
String name; }
public static class Student { private Course[] courses = new Course[]{Course.newInstance("英语"), Course.newInstance("数学")};
public String getName() { return name; }
public Student() { }
private String name;
public Student(String name) { this.name = name; }
public static Student newInstance(String name) { return new Student(name); }
public static Student newInstance(String name, Course[] courses) { return new Student(name).setCourses(courses); }
public Student setCourses(Course[] courses) { this.courses = courses; return this; }
public Course[] getCourses() {
return courses; }
}
private static void testSmiple() { Flowable.just("Hello world").subscribe(System.out::println);//表示调用System.out.println来接受字符串 () Flowable.just("Hello world").subscribe(new Main()::print_obj);//自己定义的方法 非静态 Flowable.just("Hello world").subscribe(Main::print);//自己定义的方法,为静态 Flowable.just("Hello world").subscribe(new Consumer<String>() {//这种写法等价。 @Override public void accept(String s) throws Exception { System.out.println("" + s); } });
Flowable.just("Hello world").subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println("" + s); } }); /* Flowable.fromCallable(new Callable<Object>() { @Override public Object call() throws Exception { return null; } });*/ //lamba 貌似度一样的执行,模拟背压了??
Flowable.fromCallable(() -> { Thread.sleep(5320); // imitate expensive computation 模拟耗时 昂贵的计算。 return "耗时不啊 ,啊啊啊啊啊啊"; }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.single()) .subscribe(System.out::println, Throwable::printStackTrace);
//范围打印 也可以实现,学生一对多? Flowable.range(1, 10) .parallel()//自动推导 int .runOn(Schedulers.computation()) .map(v -> v * v) .sequential()//顺序执行 .blockingSubscribe(System.out::println);
// Flowable.
//遍历数组。 String[] names = new String[]{"333", "555", "6666"}; Observable.fromArray(names) .subscribe(new Consumer<String>() {//消费者,也就是 观察者 @Override public void accept(String s) throws Exception { System.out.println(" arr:" + s); } }); //有问题,不能指定 订阅线程, 是因为过早退出了 Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 导致不打印 .observeOn(Schedulers.newThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("," + integer); } }); }
private static void testObservableSimple() { Observable.create(new ObservableOnSubscribe<StringBuffer>() {//这里的泛型代表开始发布的泛型。 @Override public void subscribe(ObservableEmitter<StringBuffer> emitter) throws Exception { // System.out.println("1、有人订阅了,我要发布," + "," + Thread.currentThread().getName()); emitter.onNext(new StringBuffer("很好爱爱")); emitter.onComplete(); // 如果调用了,这个,那么下面的方法都不会继续调用了, 这个应该放到最后面,另外不调用的话也不影响逻辑正常执行,只是不会回调onComplete // emitter.onError(new Throwable("测试异常onError")); } }).map(new Function<StringBuffer, StringBuffer>() { @Override public StringBuffer apply(StringBuffer stringBuffer) throws Exception { System.out.println("1.1、AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA,收到" + stringBuffer + "," + Thread.currentThread().getName()); return new StringBuffer("map---------->"); } }) .subscribeOn(Schedulers.newThread())//不能插入这个,否则不会走 ,1.2.3 .observeOn(Schedulers.newThread())//监听着执行在什么线程 .subscribe(new Consumer<StringBuffer>() { @Override public void accept(StringBuffer stringBuffer) throws Exception { System.out.println("2、处理结果:收到字符串:" + stringBuffer + "," + Thread.currentThread().getName()); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { System.err.println("2、处理结果:收到异常:" + throwable + "," + Thread.currentThread().getName()); } }, new Action() { @Override public void run() throws Exception { System.out.println("3、收到onCompletion:" + "," + Thread.currentThread().getName());
} }); }
private static void testObservable1() { //执行完毕自动执行 作用是从1-4 不断地发布,总共发布2次,都会走1 ,2 ,3, 4 Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { System.out.println("subscribe 方法调用,我发布一个时间 onNext 传递 你好 " + e + "," + Thread.currentThread().getName()); e.onNext("你好"); //发送事件 System.out.println("subscribe 方法调用,我继续发布一个时间 onNext 传递 你好 " + e + "," + Thread.currentThread().getName()); e.onNext("你到底好不好"); //发送事件 // e.onComplete();//某些文章说要调用,感觉没作用 // e.onComplete(); // e.onComplete(); System.out.println("subscribe 任务下发完毕"); } }).map(new Function<String, byte[]>() { // 左边为接受的参数,右边为返回值 @Override public byte[] apply(String str) throws Exception { System.out.println("1、apply 收到字符串" + str + ",我返回结果数组: " + str + "," + Thread.currentThread().getName()); return new byte[]{}; } }).map(new Function<byte[], String>() { // 输入字节码数组,保存文件,输出文件路径
@Override public String apply(byte[] byteArray) throws Exception {
System.out.println("2、apply arr 收到刚刚传递的数组 " + Arrays.toString(byteArray) + "我再次返回字符串: 哈哈哈 " + Thread.currentThread().getName()); return "哈哈哈哈"; } }).map(new Function<String, Integer>() { //输入class路径,jar uvf命令替换jar @Override public Integer apply(String s) throws Exception { System.out.println("3、apply 收到字符串" + s + "iinteger 我返回结果标记为1 " + Thread.currentThread().getName()); /* if (s != "a") { throw new RuntimeException("走 错误标记吧少年.."); }*/ return 1; } }).map(new Function<Integer, Float>() {
@Override public Float apply(Integer integer) { System.out.println("3.1、apply 收到int " + integer + " iinteger 我返回结果标记为 5 " + Thread.currentThread().getName()); return 5.0f;//如果这里第二个泛型 为float,那么 订阅 也应该是泛型。 } }).subscribe(new Consumer<Float>() {//这里是订阅,订阅有一个成功和失败的回调。链式编程看习惯了就好了 e subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) @Override public void accept(Float status) throws Exception {//表示一个onNext onMap 等执行完毕但是 System.out.println("4、result:accept 收到int值:" + status + "," + Thread.currentThread().getName()); } }, new Consumer<Throwable>() {//表示的是上下发布了错误或者 抛出了错误。 @Override public void accept(Throwable throwable) throws Exception {
System.err.println("4、accept error:" + throwable.toString() + " ," + Thread.currentThread().getName()); } });
}
}
|